- 1. Přehled
- Ring Buffer
- 2.1. Jak to funguje
- 2.2. Prázdné a plné vyrovnávací paměti
- 2.2. Vyrovnávací paměť je prázdná. Výhody a nevýhody
- Implementace v jazyce Java
- 3.1 Implementace v jazyce Java. Inicializace
- 3.2. Nabídka
- 3.3. Poll
- Problém producent-konzument
- 4.1. Volatilní sekvenční pole
- 4.2. V případě, že je pole writeSequence volatilní, může být jeho hodnota vždy změněna. Producent
- 4.3. Konzument
- Závěr
1. Přehled
V tomto tutoriálu se naučíme implementovat Ring Buffer v Javě.
Ring Buffer
Ring Buffer (nebo také Circular Buffer) je ohraničená kruhová datová struktura, která se používá pro vyrovnávací paměť dat mezi dvěma nebo více vlákny. Když do kruhové vyrovnávací paměti stále zapisujeme, obtéká se, jakmile dosáhne svého konce.
2.1. Jak to funguje
Kroužková vyrovnávací paměť je implementována pomocí pole pevné velikosti, které se na hranicích obtéká.
Kromě pole sleduje tři věci:
- nejbližší volné místo ve vyrovnávací paměti pro vložení prvku,
- nejbližší nepřečtený prvek ve vyrovnávací paměti,
- a konec pole – bod, ve kterém se vyrovnávací paměť obtéká na začátek pole
Mechanika, jakým způsobem kruhová vyrovnávací paměť tyto požadavky řeší, se liší podle implementace. Například heslo na Wikipedii na toto téma ukazuje metodu využívající čtyřbodové ukazatele.
Přístup si vypůjčíme z Disruptorovy implementace kruhového bufferu pomocí sekvencí.
První věc, kterou potřebujeme znát, je kapacita – pevná maximální velikost bufferu. Dále použijeme dvě monotónně rostoucí sekvence:
- Sekvence zápisu: začíná na -1, při vložení prvku se zvětší o 1
- Sekvence čtení: začíná na 0, při spotřebování prvku se zvětší o 1
Sekvenci můžeme namapovat na index v poli pomocí operace mod:
arrayIndex = sequence % capacity
Operace mod obtéká sekvenci kolem hranic a odvozuje tak slot v bufferu:
Podívejme se, jak bychom vložili prvek:
buffer = element
Před vložením prvku sekvenci předem inkrementujeme.
Pro spotřebování prvku provedeme post-inkrementaci:
element = buffer
V tomto případě provedeme post-inkrementaci sekvence. Konzumace prvku jej neodstraní z vyrovnávací paměti – pouze zůstane v poli, dokud nebude přepsán.
Při obtékání pole začneme přepisovat data ve vyrovnávací paměti. Pokud je buffer plný, můžeme se rozhodnout, že buď přepíšeme nejstarší data bez ohledu na to, zda je čtenář spotřeboval, nebo zabráníme přepsání dat, která nebyla přečtena.
Pokud si čtenář může dovolit vynechat mezilehlé nebo staré hodnoty (například burzovní kurzovní lístek), můžeme data přepsat, aniž bychom čekali na jejich spotřebování. Na druhou stranu, pokud musí čtečka spotřebovat všechny hodnoty (například u transakcí elektronického obchodu), měli bychom počkat (blokovat/busy-wait), dokud nebude ve vyrovnávací paměti volný slot.
Vyrovnávací paměť je plná, pokud je její velikost rovna její kapacitě, přičemž její velikost je rovna počtu nepřečtených prvků:
size = (writeSequence - readSequence) + 1isFull = (size == capacity)
Pokud sekvence zápisu zaostává za sekvencí čtení, je vyrovnávací paměť prázdná:
isEmpty = writeSequence < readSequence
Vyrovnávací paměť vrací nulovou hodnotu, pokud je prázdná.
Kroužková vyrovnávací paměť je efektivní vyrovnávací paměť FIFO. Používá pole s pevnou velikostí, které lze předem alokovat, a umožňuje efektivní vzor přístupu do paměti. Všechny operace s vyrovnávací pamětí mají konstantní čas O(1), včetně spotřebování prvku, protože nevyžaduje posunování prvků.
Na druhé straně je určení správné velikosti kruhové vyrovnávací paměti kritické. Například operace zápisu mohou být dlouho blokovány, pokud je vyrovnávací paměť nedostatečně velká a čtení je pomalé. Můžeme použít dynamické dimenzování, ale to by vyžadovalo přesouvání dat a přišli bychom o většinu výše popsaných výhod.
Implementace v jazyce Java
Teď, když jsme pochopili, jak kruhová vyrovnávací paměť funguje, přistoupíme k její implementaci v jazyce Java.
3.1 Implementace v jazyce Java. Inicializace
Nejprve definujme konstruktor, který inicializuje vyrovnávací paměť s předem definovanou kapacitou:
public CircularBuffer(int capacity) { this.capacity = (capacity < 1) ? DEFAULT_CAPACITY : capacity; this.data = (E) new Object; this.readSequence = 0; this.writeSequence = -1;}
Vytvoří prázdnou vyrovnávací paměť a inicializuje pole sekvence, jak bylo popsáno v předchozí části.
3.2. Nabídka
Dále budeme implementovat operaci nabídka, která vloží prvek do vyrovnávací paměti na nejbližší volné místo a při úspěchu vrátí hodnotu true. Vrátí false, pokud buffer nenajde prázdný slot, to znamená, že nemůžeme přepsat nepřečtené hodnoty.
Implementujme metodu offer v jazyce Java:
public boolean offer(E element) { boolean isFull = (writeSequence - readSequence) + 1 == capacity; if (!isFull) { int nextWriteSeq = writeSequence + 1; data = element; writeSequence++; return true; } return false;}
Takže inkrementujeme posloupnost zápisu a počítáme index v poli pro další volný slot. Poté zapíšeme data do vyrovnávací paměti a uložíme aktualizovanou sekvenci zápisu.
Vyzkoušíme si to:
@Testpublic void givenCircularBuffer_whenAnElementIsEnqueued_thenSizeIsOne() { CircularBuffer buffer = new CircularBuffer<>(defaultCapacity); assertTrue(buffer.offer("Square")); assertEquals(1, buffer.size());}
3.3. Poll
Nakonec implementujeme operaci poll, která načte a odstraní další nepřečtený prvek. Operace poll neodstraní prvek, ale zvýší pořadí čtení.
Implementujeme ji:
public E poll() { boolean isEmpty = writeSequence < readSequence; if (!isEmpty) { E nextValue = data; readSequence++; return nextValue; } return null;}
Zde čteme data v aktuálním pořadí čtení výpočtem indexu v poli. Poté inkrementujeme posloupnost a vracíme hodnotu, pokud vyrovnávací paměť není prázdná.
Vyzkoušejme si to:
@Testpublic void givenCircularBuffer_whenAnElementIsDequeued_thenElementMatchesEnqueuedElement() { CircularBuffer buffer = new CircularBuffer<>(defaultCapacity); buffer.offer("Triangle"); String shape = buffer.poll(); assertEquals("Triangle", shape);}
Problém producent-konzument
Povídali jsme si o použití kruhové vyrovnávací paměti pro výměnu dat mezi dvěma nebo více vlákny, což je příklad synchronizačního problému zvaného problém producent-konzument. V Javě můžeme problém producent-konzument řešit různými způsoby pomocí semaforů, ohraničených front, kruhových vyrovnávacích pamětí atd.
Implementujme řešení založené na kruhové vyrovnávací paměti.
4.1. Volatilní sekvenční pole
Naše implementace kruhové vyrovnávací paměti není bezpečná pro vlákna. Udělejme ji thread-safe pro jednoduchý případ jednoho producenta a jednoho konzumenta.
Producent zapisuje data do vyrovnávací paměti a inkrementuje writeSequence, zatímco konzument pouze čte z vyrovnávací paměti a inkrementuje readSequence. Záložní pole je tedy bez kontentů a můžeme se obejít bez jakékoli synchronizace.
Ještě ale musíme zajistit, aby spotřebitel viděl poslední hodnotu pole writeSequence (viditelnost) a aby se writeSequence neaktualizovala dříve, než jsou data ve vyrovnávací paměti skutečně k dispozici (uspořádání).
Kroužkovou vyrovnávací paměť můžeme v tomto případě učinit souběžnou a bezzámkovou tím, že pole sekvence učiníme volatilními:
private volatile int writeSequence = -1, readSequence = 0;
V metodě offer zápis do volatilního pole writeSequence zaručuje, že k zápisu do vyrovnávací paměti dojde před aktualizací sekvence. Záruka volatilní viditelnosti zároveň zaručuje, že konzument vždy uvidí nejnovější hodnotu pole writeSequence.
4.2. V případě, že je pole writeSequence volatilní, může být jeho hodnota vždy změněna. Producent
Implementujme jednoduchý producentský Runnable, který zapisuje do kruhového bufferu:
public void run() { for (int i = 0; i < items.length;) { if (buffer.offer(items)) { System.out.println("Produced: " + items); i++; } }}
Vlákno producenta by čekalo na prázdný slot ve smyčce (busy-waiting).
4.3. Konzument
Implementujeme konzumenta Callable, který čte z vyrovnávací paměti:
public T call() { T items = (T) new Object; for (int i = 0; i < items.length;) { T item = buffer.poll(); if (item != null) { items = item; System.out.println("Consumed: " + item); } } return items;}
Vlákno konzumenta pokračuje bez tisku, pokud z vyrovnávací paměti obdrží nulovou hodnotu.
Napíšeme kód našeho ovladače:
executorService.submit(new Thread(new Producer<String>(buffer)));executorService.submit(new Thread(new Consumer<String>(buffer)));
Spuštěním našeho programu producent-konzument vznikne výstup jako níže:
Produced: CircleProduced: Triangle Consumed: CircleProduced: Rectangle Consumed: Triangle Consumed: RectangleProduced: SquareProduced: Rhombus Consumed: SquareProduced: Trapezoid Consumed: Rhombus Consumed: TrapezoidProduced: PentagonProduced: PentagramProduced: Hexagon Consumed: Pentagon Consumed: PentagramProduced: Hexagram Consumed: Hexagon Consumed: Hexagram
Závěr
V tomto tutoriálu jsme se naučili implementovat Ring Buffer a prozkoumali jsme, jak jej lze použít k řešení problému producent-konzument.
.