The LMAX Disruptor Codebase
One of the biggest game changers and used in everything from trading systems, logging frameworks, video processing systems and databases, the LMAX Disruptor codebase uses a number of interesting techniques which have evolved over time.
The LMAX Disruptor Codebase
The Disruptor codebase has been around since late 2012 or early 2013. It's definitely changed a lot since the original paper was published and it's interesting to see how the abstractions and various techniques have evolved with time.
These notes are my own attempt to understand how the codebase now works. I can't say I understand every part of it, I still have lots to learn (I will probably cover fencing, sob, lob separately), but here we go!
Interesting Techniques and Highlights
Some of the highlights/techniques used in the codebase:
- Use of lock free techniques, namely memory barriers/fences.
- Use of cache line padding.
- Efficient modulus operations using powers of 2 and bitmasking. Validating power of 2 sizes using the population count.
- Sequence caching.
- Use of Atomic Reference Field updators.
- Action scope reduction e.g. on the first time round, don't bother doing x.
- Use of
assert
to enforce threading policy. - Validation of concurrency correctness using JCStress tests.
- Microbenchmarking with JMH.
- Perftests with Gradle sourceset (see gradle/perf.gradle).
Disruptor Class
The Disruptor class encapsulates a number of operations. It allows for the creation and chaining of EventProcessors and publishing of events via EventTranslators.
EventHandlers are wrapped into an EventProcessor (using delegation) which is what the ConsumerRepository actually contains.
ConsumerRepository
Holds EventProcessors (the EventProcessorInfo wraps the EventProcessor and any associated sequence barrier) and ConsumerInfo in IdentityHashMaps.
Map<EventHandlerIdentity, EventProcessorInfo> eventProcessorInfoByEventHandler;
Map<Sequence, ConsumerInfo> eventProcessorInfoBySequence;
When we add()
to the ConsumerRepo, a gating sequence is created for that particular EventProcessor. This gating sequence is added to the RingBuffer and an EventHandlerGroup returned.
When we call start()
on the Disruptor, the ConsumerRepo is passed the Disruptor's ThreadFactory. The EventProcessor is a Runnable and this Threadfactory is used to start processing each EventProcessor on a Thread.
RingBuffer
A cursored event sequencer that provides write functionality.
The RingBuffer implements 3 interface (excuse my UML without dashed lines):
- Cursored - a provider of a cursor position
- EventSink - write interface via EventTranslators
- EventSequencer
EventSequencer
A sequenced data provider.
The EventSequencer implements 2 interfaces:
- DataProvider - a provider to get by a sequence number
- Sequenced - operations for a thing that is sequenced (capacity, size,
next()
andpublish()
ops)
How Are Entries Initialized?
The RingBufferFields class uses an EventFactory to fill an Object[]
The array is sized to a power of 2 and this is checked by using the population count (Integer.bitCount()
) e.g. 64 in binary is 1000000
and only has a single 1 in the bit pattern.
At the same time an index mask is created which allows us to do the modulus operation using a bitwise &.
The index mask is bufferSize-1 which sets all bits below that power of 2 position to be equal to 1. We can do this because the bufferSize is known to be a power of 2. For example 64-1=63 which in binary is 0111111
When we call get(int i)
we get the element at:
entries[(sequence & indexMask) + BUFFER_PAD]
EventHandlers
The EventHandlerBase has lifecycle events and throws a Throwable whereas the EventHandler throws an Exception (a reminder: Exception extends Throwable).
EventHandlers are processed by EventProcessors:
BatchEventProcessor
Wraps an EventHandler, feeding it events and handling batch semantics in a tight loop.
The SequenceBarrier used by the BatchEventProcessor is a barrier on the RingBuffer's cursor - the event loop or duty cycle waits on the RingBuffer's cursor to move using this SequenceBarrier.
The BatchEventProcessor also it's own instance of a Sequence which it uses to keep track of how much it has published to the EventHandler and is updated as part of it's duty cycle.
Stylized Duty Cycle
// Executed in a tight loop
// Wait for published events
var availableSequence = sequenceBarrierOnRBCursor.wait(nextSeq);
var eobSequence = calculateEOBSequence();
// Notify EventHandler of the start of a batch
handleBatchStart();
while(nextSeq <= eobSequence) {
// Push events to the EventHandler
var event = dataProvider.get(nextSeq);
eventHandler.onEvent(event, ....);
nextSeq++:
}
eventProcessorSequence.set(eobSeq);
Sequencers
The RingBuffer has a producerSequencer
which can be single or multi producer.
The AbstractSequencer manages gating sequences and ownership of current cursor.
Sequencers are cursored and sequenced.
The RingBuffer is cursored (directly) and sequenced (indirectly by being an EventSequencer).
Sequencer Functionality
- Checking for available capacity
- Checking what the remaining capacity is
- Getting the highest published sequence
- Checking if a sequence is available (within a window of the buffersize to the last published position)
The core methods of a sequencer are next()
and publish(...)
next()
- increment the cursor and wait for slow consumers to prevent over-write.publish(...)
- set the cursor to a specific sequence and signal waiting EventProcessors that the cursor has been updated.
Single Producer Sequencer
@Override
public long next(final int n)
{
assert sameThread() : "Accessed by two threads - use ProducerType.MULTI!";
if (n < 1 || n > bufferSize)
{
throw new IllegalArgumentException("n must be > 0 and < bufferSize");
}
// current sequence or position
// should be monotonically increasing
long nextValue = this.nextValue;
// we want the range between nextValue and nextValue + n
long nextSequence = nextValue + n;
// only negative or 0 when we have not been through
// the buffer once completely
long wrapPoint = nextSequence - bufferSize;
// this is the last known minimum sequence of all consumers - it indicates
// the position of the consumer which is furthest behind the current position/cursor
// It should never be negative
long cachedGatingSequence = this.cachedValue;
// As the cachedGatingSequence can not be strictly negative
// the wrap point being greater than the cachedGatingSequence
// means the wrap point is >0, which means we've been through
// all slots in the Ringbuffer at least once
// The cachedGatingSeuqnce being > nextValue means the last known most behind
// consumer is consuming from a point which is greater than the current
// monotonically increasing (assumed) position?
if (wrapPoint > cachedGatingSequence || cachedGatingSequence > nextValue)
{
// ADVANCE THE CURSOR
//
// Update the cursor and ensure visibility of the updated value
// We only update this cursor value if we've been through the buffer once
// whereas in a multi producer sequencer we have to update it before
// we start wrapping around the buffer. The cursor is also updated
// on the publish() method to provide cross thread visibility to the latest
// published position. The cursor is also used in this Sequencer implementation
// via isAvailable() to check whether a particular sequence is available given it's
// relationship with the cursor and the buffer size (a window of possible published entries).
// We update the cursor with the value we currently know as the nextValue
// which is different to the nextSequence - the nextSequence is the highest
// sequence or entry into which a caller intends to publish on
cursor.setVolatile(nextValue); // StoreLoad fence
long minSequence;
// WAIT FOR CONSUMERS TO CATCH UP TO PREVENT WRITING OVER UNREAD ENTRIES
//
// Wait for the consumers to catch up to the wrap point - the wrap point
// is the minimum allowed position of the slowest consumer, we must not
// allow the slowest consumer to fall behind the wrap point without waiting for it
// to catch up.
while (wrapPoint > (minSequence = Util.getMinimumSequence(gatingSequences, nextValue)))
{
LockSupport.parkNanos(1L); // TODO: Use waitStrategy to spin?
}
this.cachedValue = minSequence;
}
this.nextValue = nextSequence;
return nextSequence;
}
Sequence Barriers
Track the latest publish position and the position of dependent consumers.
The ProcessingSequenceBarrier handles both the scenario where we only want to wait on a cursor and when we want to wait on both the cursor and a set of dependent sequences.
waitFor(...)
- returns the sequence you request waiting on on, once dependent consumers are ready- Then it checks the highest published sequence for availability - via var handles using a marker array.
Wait Strategies
How to wait. How to make the EventProcessor wait on a cursor. The cursor sequence is notified upon updates.
BusySpinWait
while(dependentSequence < sequenceBeingWaitedFor){
Thread.onSpinWait();
}
Different wait strategies may use things like the cursor position, like in the BlockingWaitStrategy.
*BlockingWaitStrategy
All of these wait strategies check both the cursor and dependent sequences:
// Check publish
if(cursor.get() < sequence){
// synchronize on a mutex
// recheck condition in a tight loop
// waiting on the mutex post
// check in the loop
}
// Check consumers
// now check dependent sequences
// but just spinwait, not using a
// mutex.wait() in a tight loop