1. Visão Geral
Neste tutorial, aprenderemos como implementar um buffer de anel em Java.
Buffer de anel
Buffer de anel (ou Circular Buffer) é uma estrutura de dados circular limitada que é usada para armazenar dados em buffer entre dois ou mais threads. Enquanto continuamos a escrever num buffer de anéis, ele se envolve ao chegar ao fim.
2.1. How It Works
A Ring Buffer é implementado usando um array de tamanho fixo que se enrola nos limites.
Uma parte do array, ele mantém o controle de três coisas:
- o próximo slot disponível no buffer para inserir um elemento,
- o próximo elemento não lido no buffer,
- e o fim do array – o ponto no qual o buffer se envolve até o início do array
A mecânica de como um buffer em anel lida com esses requisitos varia com a implementação. Por exemplo, a entrada da Wikipédia sobre o assunto mostra um método usando quatro pontos.
A aproximação da implementação do buffer em anel do Disruptor será emprestada usando sequências.
A primeira coisa que precisamos saber é a capacidade – o tamanho máximo fixo do buffer. A seguir, vamos usar duas sequências monotonicamente crescentes:
- Escrever sequência: começando em -1, incrementos de 1 à medida que inserimos um elemento
- Ler sequência: começando em 0, incrementos de 1 à medida que consumimos um elemento
Podemos mapear uma sequência para um índice no array usando uma operação mod:
arrayIndex = sequence % capacity
A operação mod envolve a seqüência ao redor dos limites para derivar um slot no buffer:
Vejamos como inseriríamos um elemento:
buffer = element
Estamos pré-increvendo a seqüência antes de inserir um elemento.
Para consumir um elemento fazemos um pós-incremento:
element = buffer
Neste caso, fazemos um pós-incremento na sequência. Consumir um elemento não o remove do buffer – ele apenas permanece no array até ser sobrescrito.
2.2. Buffers Vazios e Cheios
Quando nos envolvemos no array, vamos começar a sobrescrever os dados no buffer. Se o buffer estiver cheio, podemos escolher entre sobrescrever os dados mais antigos, independentemente de o leitor os ter consumido ou evitar a sobrescrita dos dados que não foram lidos.
Se o leitor puder dar-se ao luxo de perder os valores intermédios ou antigos (por exemplo, um ticker do preço das acções), podemos sobrescrever os dados sem esperar que sejam consumidos. Por outro lado, se o leitor tiver de consumir todos os valores (como nas transacções de comércio electrónico), devemos aguardar (bloquear/atacarregar) até que o buffer tenha um espaço disponível.
O buffer está cheio se o tamanho do buffer for igual à sua capacidade, onde o seu tamanho é igual ao número de elementos não lidos:
size = (writeSequence - readSequence) + 1isFull = (size == capacity)
Se a sequência de escrita ficar atrás da sequência lida, o buffer está vazio:
isEmpty = writeSequence < readSequence
O buffer retorna um valor nulo se estiver vazio.
2.2. Vantagens e Desvantagens
Um buffer de anel é um buffer FIFO eficiente. Ele usa um array de tamanho fixo que pode ser pré-alocado antecipadamente e permite um padrão eficiente de acesso à memória. Todas as operações do buffer são de tempo O(1) constante, incluindo o consumo de um elemento, já que não requer um deslocamento de elementos.
No lado oposto, determinar o tamanho correto do buffer de anéis é crítico. Por exemplo, as operações de escrita podem bloquear por um longo tempo se o buffer estiver subdimensionado e as leituras forem lentas. Podemos usar o dimensionamento dinâmico, mas seria necessário mover os dados e vamos perder a maioria das vantagens discutidas acima.
Implantação em Java
Agora compreendemos como funciona um buffer de anéis, vamos proceder para implementá-lo em Java.
3.1. Inicialização
Primeiro, vamos definir um construtor que inicializa o buffer com uma capacidade pré-definida:
public CircularBuffer(int capacity) { this.capacity = (capacity < 1) ? DEFAULT_CAPACITY : capacity; this.data = (E) new Object; this.readSequence = 0; this.writeSequence = -1;}
Esta irá criar um buffer vazio e inicializar os campos de sequência como discutido na secção anterior.
3.2. Oferta
Próximo, vamos implementar a operação de oferta que insere um elemento no buffer no próximo slot disponível e retorna verdadeiro sobre o sucesso. Ele retorna false se o buffer não conseguir encontrar um slot vazio, ou seja, não podemos sobrescrever valores não lidos.
Deixamos implementar o método de oferta em Java:
public boolean offer(E element) { boolean isFull = (writeSequence - readSequence) + 1 == capacity; if (!isFull) { int nextWriteSeq = writeSequence + 1; data = element; writeSequence++; return true; } return false;}
Então, estamos incrementando a sequência de escrita e calculando o índice no array para o próximo slot disponível. Depois, estamos a escrever os dados no buffer e a armazenar a sequência de escrita actualizada.
Vamos experimentar:
@Testpublic void givenCircularBuffer_whenAnElementIsEnqueued_thenSizeIsOne() { CircularBuffer buffer = new CircularBuffer<>(defaultCapacity); assertTrue(buffer.offer("Square")); assertEquals(1, buffer.size());}
3.3. Enquete
Finalmente, vamos implementar a operação de enquete que recupera e remove o próximo elemento não lido. A operação de pesquisa não remove o elemento mas incrementa a sequência de leitura.
Ponhamos em prática:
public E poll() { boolean isEmpty = writeSequence < readSequence; if (!isEmpty) { E nextValue = data; readSequence++; return nextValue; } return null;}
Aqui, estamos a ler os dados na sequência de leitura actual através do cálculo do índice no array. Então, estamos incrementando a seqüência e retornando o valor, se o buffer não estiver vazio.
>
Vamos testá-lo:
@Testpublic void givenCircularBuffer_whenAnElementIsDequeued_thenElementMatchesEnqueuedElement() { CircularBuffer buffer = new CircularBuffer<>(defaultCapacity); buffer.offer("Triangle"); String shape = buffer.poll(); assertEquals("Triangle", shape);}
Produtor-Consumidor Problema
Falamos sobre o uso de um buffer de anel para troca de dados entre dois ou mais threads, que é um exemplo de um problema de sincronização chamado problema Produtor-Consumidor. Em Java, podemos resolver o problema produtor-consumidor de várias maneiras usando semáforos, filas delimitadas, ring buffers, etc.
>
Deixamos implementar uma solução baseada num ring buffer.
4.1. Campos de Sequência Voláteis
A nossa implementação do ring buffer não é thread-safe. Vamos torná-lo seguro para o caso simples de um único produtor e consumidor.
O produtor escreve os dados no buffer e incrementa a sequência de escrita, enquanto o consumidor só lê a partir do buffer e incrementa a sequência de leitura. Assim, o array de suporte é livre de contenção e podemos escapar sem qualquer sincronização.
Mas ainda precisamos garantir que o consumidor possa ver o último valor do campo writeSequence (visibilidade) e que o writeSequence não seja atualizado antes que os dados estejam realmente disponíveis no buffer (ordenação).
Podemos fazer com que o buffer de anéis seja simultâneo e sem bloqueio neste caso tornando os campos de sequência voláteis:
private volatile int writeSequence = -1, readSequence = 0;
No método de oferta, uma writeSequence para o campo volátil writeSequence garante que as escritas no buffer aconteçam antes da actualização da sequência. Ao mesmo tempo, a garantia de visibilidade volátil garante que o consumidor verá sempre o último valor de writeSequence.
4.2. Produtor
Deixe implementar um produtor simples Executável que escreve no buffer de anéis:
public void run() { for (int i = 0; i < items.length;) { if (buffer.offer(items)) { System.out.println("Produced: " + items); i++; } }}
A thread do produtor esperaria por um slot vazio em um loop (busy-waiting).
4.3. Consumidor
Implementaremos um consumidor Callable que lê a partir do buffer:
public T call() { T items = (T) new Object; for (int i = 0; i < items.length;) { T item = buffer.poll(); if (item != null) { items = item; System.out.println("Consumed: " + item); } } return items;}
O thread do consumidor continua sem imprimir se receber um valor nulo do buffer.
Vamos escrever o nosso código de driver:
executorService.submit(new Thread(new Producer<String>(buffer)));executorService.submit(new Thread(new Consumer<String>(buffer)));
Executar o nosso programa produtor-consumidor produz output como abaixo:
Produced: CircleProduced: Triangle Consumed: CircleProduced: Rectangle Consumed: Triangle Consumed: RectangleProduced: SquareProduced: Rhombus Consumed: SquareProduced: Trapezoid Consumed: Rhombus Consumed: TrapezoidProduced: PentagonProduced: PentagramProduced: Hexagon Consumed: Pentagon Consumed: PentagramProduced: Hexagram Consumed: Hexagon Consumed: Hexagram
Conclusão
Neste tutorial, aprendemos como implementar um buffer de anel e exploramos como ele pode ser usado para resolver o problema produtor-consumidor.