1. Prezentare generală
În acest tutorial, vom învăța cum să implementăm un Ring Buffer în Java.
Ring Buffer
Ring Buffer (sau Circular Buffer) este o structură de date circulară delimitată care este utilizată pentru tamponarea datelor între două sau mai multe fire de execuție. Pe măsură ce continuăm să scriem într-un buffer inelar, acesta se înfășoară pe măsură ce ajunge la capăt.
2.1. Cum funcționează
Un buffer inelar este implementat folosind o matrice de dimensiune fixă care se înfășoară la limite.
În afară de matrice, acesta ține evidența a trei lucruri:
- următorul slot disponibil în buffer pentru a introduce un element,
- următorul element necitit din buffer,
- și sfârșitul array-ului – punctul în care buffer-ul se înfășoară până la începutul array-ului
Mecanica modului în care un buffer în inel tratează aceste cerințe variază în funcție de implementare. De exemplu, intrarea Wikipedia pe această temă prezintă o metodă care folosește patru puncte.
Vom împrumuta abordarea din implementarea lui Disruptor a bufferului inelar folosind secvențe.
Primul lucru pe care trebuie să-l știm este capacitatea – dimensiunea maximă fixă a bufferului. În continuare, vom folosi două secvențe cu creștere monotonă:
- Secvență de scriere: începe de la -1, crește cu 1 pe măsură ce introducem un element
- Secvență de citire: începe de la 0, crește cu 1 pe măsură ce consumăm un element
Potem să mapăm o secvență pe un index din matrice folosind o operație mod:
arrayIndex = sequence % capacity
Operația de mod înfășoară secvența în jurul limitelor pentru a obține un slot în memoria tampon:
Să vedem cum introducem un element:
buffer = element
Primimăm secvența înainte de a introduce un element.
Pentru a consuma un element, efectuăm o post-increment:
element = buffer
În acest caz, efectuăm o post-increment asupra secvenței. Consumarea unui element nu îl elimină din buffer – acesta rămâne în matrice până când este suprascris.
2.2. Tampoane goale și pline
Pe măsură ce ne înfășurăm în jurul matricei, vom începe să suprascriem datele din buffer. Dacă bufferul este plin, putem alege fie să suprascriem cele mai vechi date, indiferent dacă cititorul le-a consumat, fie să împiedicăm suprascrierea datelor care nu au fost citite.
Dacă cititorul își poate permite să rateze valorile intermediare sau vechi (de exemplu, un ticker cu prețul acțiunilor), putem suprascrie datele fără să așteptăm ca acestea să fie consumate. Pe de altă parte, dacă cititorul trebuie să consume toate valorile (ca în cazul tranzacțiilor de comerț electronic), ar trebui să așteptăm (block/busy-wait) până când bufferul are un slot disponibil.
Bufferul este plin dacă dimensiunea bufferului este egală cu capacitatea sa, unde dimensiunea sa este egală cu numărul de elemente necitite:
size = (writeSequence - readSequence) + 1isFull = (size == capacity)
Dacă secvența de scriere rămâne în urma secvenței de citire, bufferul este gol:
isEmpty = writeSequence < readSequence
Bufferul returnează o valoare nulă dacă este gol.
2.2. Avantaje și dezavantaje
Un buffer în inel este un buffer FIFO eficient. Folosește o matrice de dimensiune fixă care poate fi pre-alocată în avans și permite un model eficient de acces la memorie. Toate operațiile bufferului sunt în timp constant O(1), inclusiv consumarea unui element, deoarece nu necesită o deplasare a elementelor.
Pe de altă parte, determinarea dimensiunii corecte a bufferului în inel este critică. De exemplu, operațiile de scriere se pot bloca mult timp dacă bufferul este subdimensionat, iar citirile sunt lente. Putem folosi dimensionarea dinamică, dar aceasta ar necesita mutarea datelor și vom pierde majoritatea avantajelor discutate mai sus.
Implementare în Java
Acum că am înțeles cum funcționează un buffer în inel, să trecem la implementarea lui în Java.
3.1. Inițializare
În primul rând, să definim un constructor care să inițializeze bufferul cu o capacitate predefinită:
public CircularBuffer(int capacity) { this.capacity = (capacity < 1) ? DEFAULT_CAPACITY : capacity; this.data = (E) new Object; this.readSequence = 0; this.writeSequence = -1;}
Aceasta va crea un buffer gol și va inițializa câmpurile de secvență așa cum am discutat în secțiunea anterioară.
3.2. Ofertă
În continuare, vom implementa operația de ofertă care inserează un element în buffer la următorul slot disponibil și returnează true în caz de succes. Aceasta returnează false dacă bufferul nu poate găsi un slot gol, adică nu putem suprascrie valorile necitite.
Să implementăm metoda de ofertare în Java:
public boolean offer(E element) { boolean isFull = (writeSequence - readSequence) + 1 == capacity; if (!isFull) { int nextWriteSeq = writeSequence + 1; data = element; writeSequence++; return true; } return false;}
Acum, incrementăm secvența de scriere și calculăm indexul în matrice pentru următorul slot disponibil. Apoi, scriem datele în buffer și stocăm secvența de scriere actualizată.
Să o încercăm:
@Testpublic void givenCircularBuffer_whenAnElementIsEnqueued_thenSizeIsOne() { CircularBuffer buffer = new CircularBuffer<>(defaultCapacity); assertTrue(buffer.offer("Square")); assertEquals(1, buffer.size());}
3.3. Poll
În cele din urmă, vom implementa operația poll care recuperează și elimină următorul element necitit. Operația de interogare nu elimină elementul, ci incrementează secvența de citire.
Să o implementăm:
public E poll() { boolean isEmpty = writeSequence < readSequence; if (!isEmpty) { E nextValue = data; readSequence++; return nextValue; } return null;}
Aici, citim datele la secvența de citire curentă prin calcularea indexului în matrice. Apoi, incrementăm secvența și returnăm valoarea, dacă bufferul nu este gol.
Să testăm:
@Testpublic void givenCircularBuffer_whenAnElementIsDequeued_thenElementMatchesEnqueuedElement() { CircularBuffer buffer = new CircularBuffer<>(defaultCapacity); buffer.offer("Triangle"); String shape = buffer.poll(); assertEquals("Triangle", shape);}
Problema Producător-Consumator
Am vorbit despre utilizarea unui buffer în inel pentru schimbul de date între două sau mai multe fire de execuție, care este un exemplu de problemă de sincronizare numită problema Producător-Consumator. În Java, putem rezolva problema producător-consumator în diferite moduri, folosind semafoare, cozi delimitate, tampoane inelare etc.
Să implementăm o soluție bazată pe un tampon inelar.
4.1. Câmpuri de secvență volatile
Implementarea noastră a tamponului inelar nu este thread-safe. Haideți să o facem thread-safe pentru cazul simplu cu un singur producător și un singur consumator.
Producătorul scrie date în buffer și incrementează writeSequence, în timp ce consumatorul citește doar din buffer și incrementează readSequence. Așadar, matricea de suport este lipsită de contenție și putem scăpa fără nicio sincronizare.
Dar tot trebuie să ne asigurăm că consumatorul poate vedea cea mai recentă valoare a câmpului writeSequence (vizibilitate) și că writeSequence nu este actualizat înainte ca datele să fie efectiv disponibile în buffer (ordonare).
În acest caz, putem face ca bufferul inelar să fie concurent și fără blocaje, făcând câmpurile secvenței volatile:
private volatile int writeSequence = -1, readSequence = 0;
În metoda de ofertare, o scriere în câmpul volatil writeSequence garantează că scrierile în buffer au loc înainte de actualizarea secvenței. În același timp, garanția vizibilității volatile asigură faptul că consumatorul va vedea întotdeauna cea mai recentă valoare a writeSequence.
4.2. Producător
Să implementăm un Runnable producător simplu care scrie în bufferul inelului:
public void run() { for (int i = 0; i < items.length;) { if (buffer.offer(items)) { System.out.println("Produced: " + items); i++; } }}
Hirul producătorului ar aștepta un slot gol într-o buclă (busy-waiting).
4.3. Consumator
Vom implementa un consumator Callable care citește din buffer:
public T call() { T items = (T) new Object; for (int i = 0; i < items.length;) { T item = buffer.poll(); if (item != null) { items = item; System.out.println("Consumed: " + item); } } return items;}
Firul consumator continuă fără să tipărească dacă primește o valoare nulă din buffer.
Să scriem codul driverului nostru:
executorService.submit(new Thread(new Producer<String>(buffer)));executorService.submit(new Thread(new Consumer<String>(buffer)));
Executarea programului nostru producător-consumator produce o ieșire ca mai jos:
Produced: CircleProduced: Triangle Consumed: CircleProduced: Rectangle Consumed: Triangle Consumed: RectangleProduced: SquareProduced: Rhombus Consumed: SquareProduced: Trapezoid Consumed: Rhombus Consumed: TrapezoidProduced: PentagonProduced: PentagramProduced: Hexagon Consumed: Pentagon Consumed: PentagramProduced: Hexagram Consumed: Hexagon Consumed: Hexagram
Concluzie
În acest tutorial, am învățat cum să implementăm un Ring Buffer și am explorat modul în care acesta poate fi folosit pentru a rezolva problema producător-consumator.
.