Intro to Aeron Part 2
Some introductory notes I took on Aeron whilst reading their documentation on Aeron Transport. Covers Publications, Subscriptions, backpressure, log buffers, multi-destination cast.
Channels, Streams and Sessions
- Channel - an IP address and port
- Stream - a grouping of message flow within a channel
- Session - to uniquely identify a publisher
Ordering
An Image
is created as a function of the channel, stream and session. Ordering is effectively only at the session level, no total ordering across multiple sessions on the same channel and stream. This is because each session needs to be it's own file (log buffer) in order to obey the single writer principle.
Publications and Subscriptions
Publications
We send data via Publications
. The class provides non-blocking methods, offer (you pass in a buffer and it is copied to the underlying buffer) and tryClaim (you get a reference to a slice of the underlying buffer via a BufferClaim
) to do this.
Publications
without any matching subscriptions will not be able to send data - to allow sending of data without any subscribers, we use Spies.
Publication Types
Concurrent Publications
Allow for multiple senders. Thread safe.
Exclusive Publications
Not thread safe, single sender only. Less locking, better performance.
Offer and tryClaim
Offer
The offer() method supports fragmentation - payload that is beyond Math.min(term buffer size/8, 16MB)
is fragmented and it is the responsibility of the Subscriber to use a FragmentAssembler
to handle that on their side.
TryClaim
You get a BufferClaim
which reserves a section of the underlying log buffer and write directly into it, calling commit() on the claim once you're done. You need to be careful to use the offset provided from the BufferClaim
instead of 0. Does not support fragmentation.
Unblock Timeout
If you claim some of the log buffer but your thread crashes or stalls (or you disconnect), then no other threads will be able to use that section of the buffer because you did not commit() anything. There is a default unblock timeout of 15 seconds after which the media driver will unblock the claim, making the underlying section of the buffer available. This means whatever you need to do, you better do it within 15 seconds ;-)
Unblocking can also occur when a publication disconnects/closes with a claim in place. The claim works by setting something in the header (TODO: Check the details of this), so on unclaim the header is reset.
Edge case: what happens to reserved sections of the log buffer under fast writes and term rotation - can we end up with a reserved section within the dirty term preventing cleaning?
ConcurrentPublications
must use thread local BufferClaim
for safe tryClaim.
Response Codes
The response codes across offer and tryClaim are standardised:
>0
- the new position in the stream. For offer it means the log buffer has the data, for tryClaim no data has been written.-1
- No subscription connected-2
- Backpressured-3
- Admin action underway (term rotation)-4
- Publication closed, not accepting data-5
- You tried to send a massive message that exceeds the current configurable size limits of your log buffer
Backpressure
There are various policies for whether we want to allow publishing to "keep up" with the slowest or faster consumer. Subscribers communicate their position to the media driver. When you try to offer on a publication, you are notified of backpressure - from the log buffer filling up or from slow consumers.
The Aeron idiom is really one of "smart publishing" and is one of the main reasons I like it so much. It allows you to make much more sophisticated and data driven decisions in trading systems.
You can elect what to do under backpressure based on your business logic e.g. I have a net 0 position on EURUSD so just drop the quote as EURUSD publishes frequently (obviously OTW time and tolerance checks) vs I have a massive delta on EURUSD and I need to get the potentially aggressing risk reducing side of the quote out to market, so you better keep retrying.
If you retry on backpressure and you're consuming from other systems, your inability to process the input messages from those other systems will result in those input systems being backpressured as they're no longer able to publish too you. They in turn can adapt their publishing logic based on business needs.
Constructing Publications
You can construct Publications
either synchronously or asynchronously. The async API is useful for making publications in a duty cycle or anything that is sensitive to pauses. The synchronous API will block and can take a few seconds to return.
Example of synchronous API:
final Publication exclusivePublication
= aeron.addExclusivePublication(channel, streamId);
final Publication concurrentPublication
= aeron.addPublication(channel, streamId);
Example of async API:
long registrationId = aeron.asyncAddExclusivePublication(uri, streamId);
while (aeron.getExclusivePublication(registrationId) == null)
{
aeron.context().idleStrategy().idle();
}
publication = aeron.getExclusivePublication(registrationId);
Note: This looks like you can only add a single async publication at a time? You need to wait for the registrationId to become valid vs say passing in your own registrationId.
Subscriptions
Subscriptions must not be shared across multiple threads.
Subscriptions
allow you to get messages in your duty cycle by passing in a FragmentHandler
impl as well as the max number of messages you are willing to process.
Example of adding a subscription:
final Subscription subscription
= aeron.addSubscription(channel, streamId);
In your FragmentHandler
impl you shouldn't modify the buffer - it is backed by underlying buffers which can be cleaned up by Aeron. A typical pattern is to extract the content and pass it to something which does the processing e.g. copy into a RingBuffer or drop on a Queue.
Controlled Polling
We can do controlled polling by implementing a ControlledFragmentHandler
which returns an Action
from it's onFragment() method.
The returned Action
tells Aeron how to process the received fragments.
Consider some examples where you've received multiple messages and where you're polling multiple subscriptions (A and B channel market data) or looking for a particular message (an exec report for example).
You can then more finely tune what should be done with the remaining messages
Examples:
-
ABORT - return and process an exec report before processing other messages
-
BREAK - we've detected a trigger price has been hit and we don't care about any of the remaining fragments.
Log Buffers and Images
Log buffer - memory mapped file made up of 3 terms and a metadata section at the end of the file.
Term ID - an incrementing ID as each term completes a cycle
Ordering and Stream Reconstruction
The log buffer can also help with ordering - the header in each message is used to put the message in it's correct place in the log buffer.
-
rcv-hwm - the high watermark is the highest/largest/furthest point in the log buffer we've recieved. The largest position or sequence number.
-
rcv-pos - the completed position. The highest continguous position/sequence number i.e. without gaps. The next valid position or sequence.
The Subscription
is eligible for getting messages based on increments of the rcv-pos i.e. the next expected sequence/position only.
Positions
There are a number of positions used by Aeron. They can be categorised as:
pub-*
- Publication positionssub-*
- Subscriber positionssnd-*
- Media driver Sender positionsrcv-*
- Media driver Receiver positions.*-pos
- Positions, (sampled for Publications).*-lmt
- Limits.*-bpe
- Back pressure events (Sender).*-hwm
- High watermark (Receiver)
The pub-pos
is updated after the data is written to the term.
The snd-pos
is updated after the data sent.
The rcv-hwm
and rcv-pos
are only updated after the data has been placed in the local term buffer.
The sub-pos
is incremented after the message has been consumed by the Subscriber
poll().
MTU Matching
Publishers and Subscribers (or is it Media Driver Senders and Receivers?) must have matching MTU's as a lot of logic around efficient packing of messages is based on MTU length and also to handle fragmentation where messages span multiple MTUs.
Multi-Destination Cast
Allows for 1-n publication as Aeron native functionality - messages are sent one after the other, it doesn't rely on multicast to replicate the message.
Publication Setup
A conventional (non-MDC) Publication
is setup as follows:
final Publication exclusivePublication
= aeron.addExclusivePublication(channel, streamId);
We can compare this to the way the Publication
is setup for an MDC:
final var publicationChannel = "aeron:udp?control-mode=dynamic|control=" + host
+ ":" + controlChannelPort;
final var publication = aeron.addExclusivePublication(publicationChannel, 100);
We're now using aeron:udp?control-mode=dynamic|control=MDC_HOST:MDC_CONTROL_PORT
Where the MDC_HOST is the same host the publication is running on and the control port must be known to subscribers. This is because subscribers to an MDC are using the host:port of the Publisher to connect into it. This is because we may not know how many subscribers are going to be connecting to the publisher, so the publisher just publishes to a given port on a given interface.
Subscription Setup
The way we setup a conventional (non-MDC) subscription is:
final Subscription subscription
= aeron.addSubscription(channel, streamId);
Let's look at how we do this for an MDC subscription:
final mdcSubscription = aeron.addSubscription("aeron:udp?endpoint=" + host
+ ":0|control=" + mdcHost + ":" + mdcControlPort + "|control-mode=dynamic", 100);
We are now using aeron:udp?endpoint=LOCALHOST:0|control=MDC_HOST:MDC_CONTROL_PORT|control-mode=dynamic
The control=MDC_HOST:MDC_CONTROL_PORT
portion tells it how to connect to the remote MDC Publication.
The endpoint=LOCALHOST:0
defines which host and port the MDC Publication will actually publish the data too - in this example it's localhost on an ephemeral port.
MDC Flow Control
There are various policies which can be set to determine what should happen when subscribers start falling behind. Use the fc=
field in the URI
e.g. aeron:udp?control-mode=dynamic|control=MDC_HOST:MDC_CONTROL_PORT|fc=max
-
Max - only limited by the fastest subscriber, will allow for data loss in slow subscribers. This is the default.
-
Min - limited by the slowest subscriber, wait for slow consumers.
-
Tagged - limited by the slowest subscriber in a tagged group of subscribers.
Flow Control Group Size
We can set a group size that indicates the number of subscribers which are required to be connected for the publication to actually be connected. The group is then also subject to the flow control policy you've set.
For example, minimum of 5 subscribers without dropping data for slow consumers:
|fc=min,g:/5
Tagged Flow Control
Let's say we want to distribute market data to a bunch of subscribers. For some of these subscribers, we can tolerate data loss. For others we can't. Tagged flow control allows us to achieve this by specifying a numerical group ID in the publication URI (|fc=tagged,g:101
), and a matching group tag in the subscription URI (|control-mode=dynamic|gtag=101
).
Tooling and Support
Aeron Stat
Exposes data from the cnc file. Many counters exposed.
Error Stat
Prints all errors raised.
Stream Stat
A view of positions for each stream.
Backlog Stat
Details of publisher last sampled position and how many bytes a sender has left to send - these remaining bytes to be sent are indicative of backpressure.
Loss Stat
Logs all data loss events.
Log Inspector
Inspect log buffer files:
- If log buffer is connected
- Number of terms we've gone through
- State of the 3 terms in the log buffer
- Hex dump of term data
Aeron Agent
A Java agent to provide runtime, log info - for Transport, Archive and Cluster. More details.
Common Errors
See for more details.
-
Timeouts - driver timeouts, client liveness and client conductor timeouts. These are all configurable.
-
Active driver on restarts -
-Daeron.dir.delete.on.start=true
-
Counter buffer is full - you've tried to add more than 8192 entries through a resource leak
-
maxMessageLength is exceeded
Links and References
https://aeron.io/docs/aeron/overview/ - Aeron Documentation