Messaging in .Net Core with RabbitMQ - Publisher Confirm
- Date
- Authors
- Name
- Mehdi Hadeli
- @mehdihadeli
Introduction
Messaging is an essential part of modern distributed systems, and RabbitMQ is one of the most popular messaging brokers used by developers around the world. In this blog post, we'll explore how to implement publisher confirms with RabbitMQ and .NET Core and we will examine various strategies for using publisher confirms with RabbitMQ and discuss the advantages and drawbacks of each approach.
Publisher confirms are an important feature of RabbitMQ that allows publishers to confirm that messages have been successfully received by the broker, ensuring reliable delivery of messages in distributed systems. We'll cover the basics of setting up a RabbitMQ client in .NET Core, configuring publisher confirms, handling confirmations, and handling errors that may occur during messaging. By the end of this post, you'll have a solid understanding of how to implement publisher confirms with RabbitMQ and .NET Core, and how to build robust and reliable messaging systems.
In network communication, ensuring that a message is delivered successfully can be a challenge. Networks can fail in unpredictable ways, and detecting failures can take time. This can lead to a situation where a client assumes that a message has been successfully delivered to a server when in reality it has been lost or significantly delayed.
Traditionally, the only way to guarantee that a message is not lost is to use transactions. This involves making the channel transactional and then publishing each message or set of messages with a commit. However, this approach is not always practical. Transactions are heavyweight and can significantly decrease throughput, making them unsuitable for high-performance systems.
To address this challenge, a confirmation mechanism
was introduced in the AMQP protocol. Using this mechanism, a client can confirm that a message has been successfully delivered to the server (Message Broker) and processed.
In this article, we're going to use publisher confirmations to make sure published messages have safely reached the broker. We will use different approaches for handling publisher confirmations, and we will explain each of these approaches with some sample code separately.
Preparing Channel For Publisher Confirm
To enable publisher confirms
in RabbitMQ, you need to call the ConfirmSelect()
method on the channel
that you want to enable confirms
for. This method is used to configure the channel to expect publisher confirms
for all subsequent messages
published on that channel. It is important to note that this method needs to be called only once per channel
and not for every message that is published on that channel.
// Create a connection to RabbitMQ
var factory = new ConnectionFactory() { HostName = "localhost" };
using (var connection = factory.CreateConnection())
{
// Create a channel on the connection
using (var channel = connection.CreateModel())
{
// Enable publisher confirms on the channel
channel.ConfirmSelect();
//...
}
}
Once a channel is in confirm mode
, both the broker and the client count
messages (counting starts at 1
when we call channel.ConfirmSelect()
on channel for prepare channel for confirmation).
In publisher confirms, the Delivery Tag and Publisher Sequence Number are important concepts to understand:
- The
delivery tag
is used on the broker level, and it represents the sequence number that identifies the confirmed or rejected message and acts as a unique identifier for that message.TheDelivery Tag
on the server is based on the sequence of messages received from the publishers and is synced with the client'spushed sequence number
to ensure the confirmed message can be correlated between the client and server. - the
publisher sequence number
is used on the client level, and it is assigned by the channel to each message before it is published using thechannel.BasicPublish
method. This number is incremented for each subsequent message, andchannel.NextPublishSeqNo
property returns the publisher sequence number that will be assigned to thenext message
published on the channel. ThePublisher Sequence Number
is especially useful for correlating theDelivery Tag
in the confirmation message with the original message that was published. This helps the client keep track of the status of each message and take appropriate actions based on the confirmation received from the broker.
When the client publish a message with its publisher sequence number
to the broker, Once the broker receives the message, it assigns a delivery tag to the message based on sequence of the message published. The delivery tag is equivalent to the publisher's sequence number, and it helps the client keep track of the status of each message published on the channel.
The broker confirms messages as it handles them by sending a basic.ack
on the same channel with a delivery tag
field containing the sequence number of the confirmed message. This confirms that the message has been successfully received by the broker and is in the queue for delivery.
In exceptional cases where the broker is unable to handle messages successfully or a message is not confirmed within a certain timeout
period (we specify this in channel.WaitForConfirmsOrDie(timeout)
), the broker will send a basic.nack
. This indicates that the broker was unable to process the message and refuses responsibility for it. At that point, the client can assume that the message has been lost or failed to be processed and take appropriate action, such as republishing
the message or logging
the failure.
The broker can also set the multiple
field in the basic.ack
or basic.nack
message to indicate whether it's confirming a single message
or multiple messages
. If the multiple
field is set to true
, then all messages with a lower or equal sequence number
to the one in the delivery tag
field have been confirmed
.
When can messages be confirmed by the broker after publication?
The answer to this question depends on whether the message is routable
or unroutable
.
For unroutable messages
, the broker will issue a confirmation once the exchange verifies that the message won't route to any queue. This typically happens when the message is published with the mandatory
flag set. In this case, if the message cannot be delivered to any queue, the broker will send a basic.return
message to the client before sending the basic.ack
confirmation. If the message cannot be delivered and is not marked as mandatory, the broker will simply discard it without issuing any confirmation.
For routable messages
, the confirmation process is a bit more complex. The broker will issue a basic.ack
confirmation once the message has been accepted by all the queues it's intended to be routed to.
Publisher Confirm Syncrosnuslly
RabbitMQ offers two approaches for handling publisher confirmations: synchronous and asynchronous. In this post, we'll focus on the synchronous approach. With this approach, the client publishes a message and waits synchronously for its confirmation using the channel.WaitForConfirmsOrDie(timeout) method. However, there are two cases to consider:
- Publisher Confirm Single Message: In this approach, the client publishes a message with
channel.BasicPublish()
and immediately waits for the confirmation from the broker synchronously by blocking the current thread. - Publisher Confirm Batch Messages: In this approach, the client publishes multiple messages subsequently with
channel.BasicPublish()
or using publishing message in a batch withchannel.CreateBasicPublishBatch()
and then waits for confirmation for all the messages to be completed.
Publisher Confirm Single Message Syncrosnuslly
When publishing messages in RabbitMQ .NET, you can either publish them in batches
or individually
. Publishing messages individually provides more control over the publishing process and allows you to confirm
each message individually
. In the individual acknowledgment mode
, the broker sends a separate acknowledgment for each message published. This mode is suitable for low-throughput
scenarios where the number of messages being published is small
.
Here is an example of how to publish messages individually using WaitForConfirmsOrDie(timeout)
and BasicPublish()
:
public class SyncSinglePublisherConfirm : IPublisher
{
private readonly ILogger<SyncSinglePublisherConfirm> _logger;
private readonly RabbitMqOptions _rabbitmqOptions;
public SyncSinglePublisherConfirm(
IOptions<RabbitMqOptions> rabbitmqOptions,
ILogger<SyncSinglePublisherConfirm> logger
)
{
_logger = logger;
_rabbitmqOptions = rabbitmqOptions.Value;
}
public int TimeOut { get; set; } = 60;
public async Task PublishAsync(EnvelopMessage message)
{
await PublishAsync(new List<EnvelopMessage> { message });
}
public async Task PublishAsync(IEnumerable<EnvelopMessage> messages)
{
Queue<EnvelopMessage> unsuccessfulPublishedMessages = new Queue<EnvelopMessage>();
var factory = new ConnectionFactory
{
HostName = _rabbitmqOptions.Host,
UserName = _rabbitmqOptions.User,
Password = _rabbitmqOptions.Password
};
using var connection = factory.CreateConnection();
using var channel = connection.CreateModel();
// with calling `ConfirmSelect` on the channel `NextPublishSeqNo` will be set to '1'
channel.ConfirmSelect();
_logger.LogInformation(
$"Start SequenceNumber for 'ConfirmSelect' is: {channel.NextPublishSeqNo}"
);
channel.BasicAcks += (sender, ea) =>
{
_logger.LogInformation(
$"Message with delivery tag '{
ea.DeliveryTag
}' ack-ed, multiple is {
ea.Multiple
}."
);
};
channel.BasicNacks += (sender, ea) =>
{
_logger.LogInformation(
$"Message with delivery tag '{
ea.DeliveryTag
}' nack-ed, multiple is {
ea.Multiple
}."
);
};
var startTime = Stopwatch.GetTimestamp();
var messageList = messages.ToList();
foreach (var envelopMessage in messageList)
{
channel.QueueDeclare(
queue: envelopMessage.Message.GetType().Name.Underscore(),
durable: true,
exclusive: false,
autoDelete: false,
arguments: null
);
var properties = channel.CreateBasicProperties();
properties.Persistent = true;
properties.Headers = envelopMessage.Metadata;
properties.ContentType = "application/json";
properties.Type = TypeMapper.GetTypeName(envelopMessage.Message.GetType());
properties.MessageId = envelopMessage.Message.MessageId.ToString();
var currentSequenceNumber = channel.NextPublishSeqNo;
try
{
// After publishing publish message sequence number will be incremented
channel.BasicPublish(
exchange: string.Empty,
routingKey: envelopMessage.Message.GetType().Name.Underscore(),
basicProperties: properties,
body: Encoding.UTF8.GetBytes(
JsonConvert.SerializeObject(envelopMessage.Message)
)
);
var nextSequenceNumberAfterPublish = channel.NextPublishSeqNo;
_logger.LogInformation(
$"message with messageId: {
envelopMessage.Message.MessageId
} published, and current SequenceNumber is: {
currentSequenceNumber
}, next SequenceNumber after publishing is: {
nextSequenceNumberAfterPublish
}."
);
// single confirmation after each publish
channel.WaitForConfirmsOrDie(timeout: TimeSpan.FromSeconds(5));
_logger.LogInformation(
$"message with messageId: {
envelopMessage.Message.MessageId
}, and SequenceNumber is: {
currentSequenceNumber
} confirmed."
);
}
catch (Exception ex)
{
var nextSequenceNumberAfterPublish = channel.NextPublishSeqNo;
_logger.LogInformation(
$"message with messageId: {
envelopMessage.Message.MessageId
} failed, and current SequenceNumber is: {
currentSequenceNumber
}, next SequenceNumber after publishing is: {
nextSequenceNumberAfterPublish
}."
);
unsuccessfulPublishedMessages.Enqueue(envelopMessage);
}
}
if (unsuccessfulPublishedMessages.Any())
{
await PublishAsync(unsuccessfulPublishedMessages);
}
_logger.LogInformation("All published messages are confirmed");
var endTime = Stopwatch.GetTimestamp();
_logger.LogInformation(
$"Published {
messageList.Count
} messages and handled confirm asynchronously {
Stopwatch.GetElapsedTime(startTime, endTime).TotalMilliseconds
} ms"
);
}
}
Publisher confirm is not enabled by default on the channel so at first we need to enable publisher confirms
on the channel
level by calling ConfirmSelect()
, This method tells the channel to require a confirmation from the broker after each message is published. This method must be called on every channel that you expect to use publisher confirms. Confirms should be enabled just once, not for every message published.
using var channel = connection.CreateModel();
channel.ConfirmSelect();
For keep tracking client publish sequence number
, channel internally uses NextPublishSeqNo
and _pendingDeliveryTags
and _deliveryTagsCountdown
during BasicPublish
.
Once a channel is in confirm mode
with calling channel.ConfirmSelect()
, both the broker and the client count messages (counting starts at 1 on the first confirm.select).
Actually with calling ConfirmSelect()
method, NextPublishSeqNo
on the channel will be set to 1
.
For publishing messages, we loop through a set of messages and publish them one by one:
foreach (var envelopMessage in messageList)
{
var properties = channel.CreateBasicProperties();
properties.Persistent = true;
properties.Headers = envelopMessage.Metadata;
var currentSequenceNumber = channel.NextPublishSeqNo;
try
{
// After publishing publish message sequence number will be incremented
channel.BasicPublish(
exchange: string.Empty,
routingKey: typeof(T).Name.Underscore(),
basicProperties: properties,
body: Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(envelopMessage)));
var nextSequenceNumberAfterPublish = channel.NextPublishSeqNo;
_logger.LogInformation(
$"message with messageId: {
envelopMessage.Message.MessageId
} published, and current SequenceNumber is: {
currentSequenceNumber
}, next SequenceNumber after publishing is: {
nextSequenceNumberAfterPublish
}.");
// single confirmation after each publish
channel.WaitForConfirmsOrDie(timeout: TimeSpan.FromSeconds(5));
_logger.LogInformation(
$"message with messageId: {
envelopMessage.Message.MessageId
}, and SequenceNumber is: {
currentSequenceNumber
} confirmed."
);
}
catch (Exception ex)
{
var nextSequenceNumberAfterPublish = channel.NextPublishSeqNo;
_logger.LogInformation(
$"message with messageId: {envelopMessage.Message.MessageId} failed, and current SequenceNumber is: {
currentSequenceNumber
}, next SequenceNumber after publishing is: {
nextSequenceNumberAfterPublish
}.");
unsuccessfulPublishedMessages.Enqueue(envelopMessage);
}
}
But before publishing our first message, current publish sequence number
will be 1
because with preparing channel for publisher confirm with calling ConfirmSelect
, the NextPublishSeqNo
set to 1
.
But before publishing our first message, the current publish sequence number
will be 1
because, when preparing the channel for publisher confirmation
by calling ['ConfirmSelect'] (https://github.com/rabbitmq/rabbitmq-dotnet-client/blob/6.x/projects/RabbitMQ.Client/client/impl/ModelBase.cs#L1235), the "NextPublishSeqNo" is set to "1".
var currentSequenceNumber = channel.NextPublishSeqNo;
Next, we call BasicPublish()
for our first message with the first publish sequence number
(NextPublishSeqNo = 1), and now something happens internally within BasicPublish
that we describe for a clear understanding of the workflow:
- When we call BasicPublish, because our current
publish sequence number
, or NextPublishSeqNo, is1
, it adds thissequence number
to a wait for confirmation list of _pendingDeliveryTags, and then the publisher sequence number, or NextPublishSeqNo, will prepare for the next publish and will increase and become2
, also _deliveryTagsCountdown will increase by1
. This flow will repeat for the nextBasicPublish
calls as well (the sequence numbers will increase regardless of whether the message confirmation is successful or unsuccessful). - The message will be published to the broker, and we are waiting for a confirmation (which could be
ACK
orNACK
) from the broker with a HandleBasicAck or HandleBasicNack callback in the ModelBase class of the RabbitMQ client based on our confirmation, which isACK
for successful confirmation orNACK
for failed confirmation. Also, this callback from the broker raises BasicAcks or BasicNacks events on the channel. This confirmation callback by the broker happensasynchronously
, and if we want to ensure our publishing message is confirmed by the broker before continuing our code, we should block our thread by calling WaitForConfirmsOrDie(timeout) and wait for broker confirmation. - Inner broker confirmation callbacks besides raising channel BasicAcks and BasicNacks events, if our _pendingDeliveryTags collection that is waiting for confirmation is
not empty
and we have one or more messages to confirm, all of these messages depend on multiple (Multiple
is a boolean value, and if it istrue
, all messages with alower
orequal
sequence number areconfirmed
ornack-ed
and If it isfalse
, only one message isconfirmed
ornack-ed
) parameter and basic.ack, basic.nack from the broker will process and items will be removed from _pendingDeliveryTags or actually their state will beack-ed
ornack-ed
for each message. Then _deliveryTagsCountdown will decrease by1
for each acknowledged message and will be used by WaitForConfirmsOrDie to block thread until this countdown becomesempty
. If there is one nack-ed it will set _onlyAcksReceived tofalse
(will use byWaitForConfirmsOrDie
). - RabbitMQ will acknowledge all _pendingDeliveryTags up to and including the
delivery tag
supplied in the acknowledgement when themultiple
field is set totrue
and remove them from the _pendingDeliveryTags collection and decrease the _deliveryTagsCountdown by count of acknowledge messages. For instance, if we have _pendingDeliveryTags 5, 6, 7, and 8 that are not confirmed on the channel yet, all delivery tags from 5 to 8 will be acknowledged when an acknowledgement frame with delivery tag8
arrives to the channel withmultiple
equals totrue
. Otherwise, all messages with pending delivery tags 5, 6, and 7 would still be unacknowledged if multiple was set to false and Just a message with the pending delivery tag8
will acknowledge.
Now, if we use WaitForConfirmsOrDie after calling BasicPublish()
, WaitForConfirmsOrDie will block current thread and checks _deliveryTagsCountdown
with a specific timeout
, this method wait for given timeout until _deliveryTagsCountdown
becomes empty
. After that if _deliveryTagsCountdown
becomes empty and _onlyAcksReceived
is true
all messages confirmed, and we con release blocking thread and this method returns to the application but if _onlyAcksReceived is false
or we reached to timeout it throws IOException
exception.
We wait for confirmation of the first message from the broker. If confirmation from the broker is ack-ed
, we ensure our message is delivered successfully to the broker, then the WaitForConfirmsOrDie
method releases the blocking thread, and we will continue to publish other messages. But if the message is not confirmed within the timeout or if it is nack-ed
(meaning the broker could not take care of it for some reason), the WaitForConfirmsOrDie
will throw an IOException
exception. The handling of the exception usually consists of logging an error message and/or retrying to send the message. For handling nack-ed
messages that will throw an IOException
when we are calling WaitForConfirmsOrDie
, we catch the exception, log the exception, and add them to the unsuccessfulPublishedMessages
collection to process and re-publish them again.
Here, immediately after publishing we wait for confirmation, so the multiple
parameter on HandleBasicAck or HandleBasicNack will be false
. When we wait for confirmation after each publish, we don't let the broker do batch confirmation
asynchronously in-place. In batch confirmation broker will set the multiple
parameter on HandleBasicAck or HandleBasicNack to true
. We discuss batch confirmation
in the next section.
channel.BasicPublish(
exchange: string.Empty,
routingKey: typeof(T).Name.Underscore(),
basicProperties: properties,
body: Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(envelopMessage)));
var nextSequenceNumberAfterPublish = channel.NextPublishSeqNo; //2
// single confirmation after each publish
channel.WaitForConfirmsOrDie(timeout: TimeSpan.FromSeconds(5));
Also our next publish sequence number
here will be 2
.
The process will be repeated for all messages, one by one. Each message will be published, and we will immediately wait to receive an acknowledgment (ACK or NACK) from the broker to ensure that the message is delivered successfully. If a message fails to be delivered to the broker due to an exception or timeout, we typically catch the exception and, after logging the exception, add it to our unsuccessfulPublishedMessages
field to process and republish it again.
When we publish messages individually, we have more control over the delivery of each message, which can be useful when message ordering is important, but it is not very efficient, and it is better to use batch confirmation or an async message confirmation approach.
Publisher Confirm with Batch Messages Syncrosnuslly
Using individual acknowledgments
can be a performance problem in high-throughput cases where publishers need to send a lot of messages. The batch acknowledgment mode is an option offered by RabbitMQ to handle this. In this approach, instead of publish each message one by one and waiting for confirmation for each message, we publish a batch of messages, and then we wait for the whole batch to be confirmed by the broker and after that we publish the next batch.
Also, it can decrease the number of connections we need to publish messages compared to publishing them individually, and we can publish batch of messages in place when we use channel.CreateBasicPublishBatch
and channel.PublishBatch
instead of channel.BasicPublish
.
Waiting for confirmation of a batch of messages instead of waiting for confirmation of each message individually is more efficient
.
Here I implemented a batch confirmation with batch size 100 and using BasicPublish
:
public class SyncBatchPublisherConfirm : IPublisher
{
private readonly ILogger<SyncBatchPublisherConfirm> _logger;
private readonly RabbitMqOptions _rabbitmqOptions;
private readonly ConcurrentDictionary<ulong, EnvelopMessage> _messagesDeliveryTagsDictionary =
new();
public SyncBatchPublisherConfirm(
IOptions<RabbitMqOptions> rabbitmqOptions,
ILogger<SyncBatchPublisherConfirm> logger
)
{
_logger = logger;
_rabbitmqOptions = rabbitmqOptions.Value;
}
public int TimeOut { get; set; } = 60;
public int BatchSize { get; set; } = 100;
public async Task PublishAsync(EnvelopMessage message)
{
await PublishAsync(new List<EnvelopMessage> { message });
}
public async Task PublishAsync(IEnumerable<EnvelopMessage> envelopMessages)
{
Queue<EnvelopMessage> unsuccessfulPublishedMessages = new Queue<EnvelopMessage>();
var factory = new ConnectionFactory
{
HostName = _rabbitmqOptions.Host,
UserName = _rabbitmqOptions.User,
Password = _rabbitmqOptions.Password
};
using var connection = factory.CreateConnection();
using var channel = connection.CreateModel();
// with calling `ConfirmSelect` on the channel `NextPublishSeqNo` will be set to '1'
channel.ConfirmSelect();
_logger.LogInformation(
$"Start SequenceNumber for 'ConfirmSelect' is: {channel.NextPublishSeqNo}"
);
var startTime = Stopwatch.GetTimestamp();
channel.BasicAcks += (_, ea) =>
{
var envelop = GetMappedMessage(ea.DeliveryTag);
_logger.LogInformation(
$"Message with delivery tag '{ea.DeliveryTag}' and messageId: {envelop?.Message.MessageId} ack-ed, multiple is {ea.Multiple}."
);
RemovedConfirmedMessage(ea.DeliveryTag, ea.Multiple);
};
channel.BasicNacks += (_, ea) =>
{
var envelop = GetMappedMessage(ea.DeliveryTag);
_logger.LogInformation(
$"Message with delivery tag '{ea.DeliveryTag}' and messageId: {envelop?.Message.MessageId} nack-ed, multiple is {ea.Multiple}."
);
if (envelop is { })
unsuccessfulPublishedMessages.Enqueue(envelop);
RemovedConfirmedMessage(ea.DeliveryTag, ea.Multiple);
};
var batchChunk = 0;
var messageList = envelopMessages.ToList();
foreach (var envelopMessage in messageList)
{
channel.QueueDeclare(
queue: envelopMessage.Message.GetType().Name.Underscore(),
durable: true,
exclusive: false,
autoDelete: false,
arguments: null
);
var properties = channel.CreateBasicProperties();
properties.Persistent = true;
properties.Headers = envelopMessage.Metadata;
properties.ContentType = "application/json";
properties.Type = TypeMapper.GetTypeName(envelopMessage.Message.GetType());
properties.MessageId = envelopMessage.Message.MessageId.ToString();
var currentSequenceNumber = channel.NextPublishSeqNo;
_messagesDeliveryTagsDictionary.TryAdd(currentSequenceNumber, envelopMessage);
// After publishing publish message sequence number will be incremented
channel.BasicPublish(
exchange: string.Empty,
routingKey: envelopMessage.Message.GetType().Name.Underscore(),
basicProperties: properties,
body: Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(envelopMessage.Message))
);
batchChunk++;
var nextSequenceNumberAfterPublish = channel.NextPublishSeqNo;
_logger.LogInformation(
$"message with messageId: {
envelopMessage.Message.MessageId
} published, and current SequenceNumber is: {
currentSequenceNumber
}, Next SequenceNumber after publishing is: {
nextSequenceNumberAfterPublish
}."
);
if (
batchChunk == BatchSize
|| (batchChunk != BatchSize && (int)currentSequenceNumber == messageList.Count)
)
{
channel.WaitForConfirmsOrDie(TimeSpan.FromSeconds(5));
batchChunk = 0;
}
}
if (unsuccessfulPublishedMessages.Any())
await PublishAsync(unsuccessfulPublishedMessages);
_logger.LogInformation("All published messages are confirmed");
var endTime = Stopwatch.GetTimestamp();
_logger.LogInformation(
$"Published {
messageList.Count
} messages and handled confirm asynchronously {
Stopwatch.GetElapsedTime(startTime, endTime).TotalMilliseconds
} ms"
);
}
private void RemovedConfirmedMessage(ulong sequenceNumber, bool multiple)
{
if (multiple)
{
var confirmed = _messagesDeliveryTagsDictionary.Where(k => k.Key <= sequenceNumber);
foreach (var entry in confirmed)
{
_messagesDeliveryTagsDictionary.TryRemove(entry.Key, out _);
}
}
else
{
_messagesDeliveryTagsDictionary.TryRemove(sequenceNumber, out _);
}
}
private EnvelopMessage? GetMappedMessage(ulong sequenceNumber)
{
_messagesDeliveryTagsDictionary.TryGetValue(sequenceNumber, out EnvelopMessage? e);
return e;
}
Here we have a BatchChunk
local variable with initial value 0
, and we increase this variable with each publish until we reach our BatchSize
, which is 100
. After reaching batch size, we wait for confirmations for all messages in this batch before continuing and going to the next batch. Then we reset our "batch chunk" and prepare for the next batch's confirmations:
if (batchChunk == BatchSize || (batchChunk != BatchSize && (int)currentSequenceNumber == messageList.Count))
{
channel.WaitForConfirmsOrDie(TimeSpan.FromSeconds(5));
batchChunk = 0;
}
Also for correlation and accessing delivered message based received delivery tag
inner channel.BasicAcks and channel.BasicNacks we use a mapping dictionary called _messagesDeliveryTagsDictionary
and we add published message sequence number
which is equivalent to the server's delivery tag
as key
and published message
as value
to this dictionary before publishing message:
var currentSequenceNumber = channel.NextPublishSeqNo;
_messagesDeliveryTagsDictionary.TryAdd(currentSequenceNumber, envelopMessage);
// After publishing publish message sequence number will be incremented
channel.BasicPublish(
exchange: string.Empty,
routingKey: envelopMessage.Message.GetType().Name.Underscore(),
basicProperties: properties,
body: Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(envelopMessage))
);
Now we can access published messaged based on its delivery tag inner inner channel.BasicAcks and channel.BasicNacks:
channel.BasicAcks += (_, ea) =>
{
var envelop = GetMappedMessage(ea.DeliveryTag);
_logger.LogInformation(
$"Message with delivery tag '{ea.DeliveryTag}' and messageId: {envelop?.Message.MessageId} ack-ed, multiple is {ea.Multiple}."
);
RemovedConfirmedMessage(ea.DeliveryTag, ea.Multiple);
};
channel.BasicNacks += (_, ea) =>
{
var envelop = GetMappedMessage(ea.DeliveryTag);
_logger.LogInformation(
$"Message with delivery tag '{ea.DeliveryTag}' and messageId: {envelop?.Message.MessageId} nack-ed, multiple is {ea.Multiple}."
);
if (envelop is { })
unsuccessfulPublishedMessages.Enqueue(envelop);
RemovedConfirmedMessage(ea.DeliveryTag, ea.Multiple);
};
And we can get delivered message based based on its delivery tag with this helper:
private EnvelopMessage? GetMappedMessage(ulong sequenceNumber)
{
_messagesDeliveryTagsDictionary.TryGetValue(sequenceNumber, out EnvelopMessage? e);
return e;
}
Also after receiving confirmation regardless of acked or nacked inner our channel.BasicAcks
and channel.BasicNacks
event handlers, we usually do some information logging for acked and successfully delivered message for channel.BasicAcks
and when messages are nack-ed inner channel.BasicNacks
usually we log error and do some mechanism for republishing nacked message. for both case we should remove corresponding delivery tag entry from _messagesDeliveryTagsDictionary
dictionary because their job completed and confirmation received and we don't need to track them. for removing this entry we call RemovedConfirmedMessage
with corresponding sequenceNumber
for deleting entry in dictionary:
private void RemovedConfirmedMessage(ulong sequenceNumber, bool multiple)
{
if (multiple)
{
var confirmed = _messagesDeliveryTagsDictionary.Where(k => k.Key <= sequenceNumber);
foreach (var entry in confirmed)
{
_messagesDeliveryTagsDictionary.TryRemove(entry.Key, out _);
}
}
else
{
_messagesDeliveryTagsDictionary.TryRemove(sequenceNumber, out _);
}
}
For handling nacked messages or unsuccessful confirmations inner our channel.BasicNacks
handler, we've created a local queue unsuccessfulPublishedMessages
and we add nacked messages to this queue for republishing in the end our process.
channel.BasicNacks += (_, ea) =>
{
var envelop = GetMappedMessage(ea.DeliveryTag);
_logger.LogInformation(
$"Message with delivery tag '{ea.DeliveryTag}' and messageId: {envelop?.Message.MessageId} nack-ed, multiple is {ea.Multiple}."
);
if (envelop is { })
unsuccessfulPublishedMessages.Enqueue(envelop);
RemovedConfirmedMessage(ea.DeliveryTag, ea.Multiple);
}
if (unsuccessfulPublishedMessages.Any())
await PublishAsync(unsuccessfulPublishedMessages);
Now let's implement this batch publishing with PublishBatch
instead of BasicPublish
.
If we want, each message to be published individually, we need to establish a connection between the client and RabbitMQ separately, so when the number of messages increases, our overall permanence will decrease because each establishing connection takes some time. When we publish messages in a batch instead of publishing them separately, we can decrease the number of connections that are required for publishing messages because we use a single connection for publishing a batch of messages. Therefore, we decrease the overhead of establishing connections for each message separately, and our overall performance will increase.
We can send our batch of messages in-place and in a single transaction, which will decrease overhead, establish a connection between the client and RabbitMQ for each published message, and decrease network traffic.
public class SyncBatchPublisherConfirm : IPublisher
{
private readonly ILogger<SyncBatchPublisherConfirm> _logger;
private readonly RabbitMqOptions _rabbitmqOptions;
private readonly ConcurrentDictionary<ulong, EnvelopMessage> _messagesDeliveryTagsDictionary =
new();
public SyncBatchPublisherConfirm(
IOptions<RabbitMqOptions> rabbitmqOptions,
ILogger<SyncBatchPublisherConfirm> logger
)
{
_logger = logger;
_rabbitmqOptions = rabbitmqOptions.Value;
}
public int TimeOut { get; set; } = 60;
public int BatchSize { get; set; } = 5;
public async Task PublishAsync(EnvelopMessage message)
{
await PublishAsync(new List<EnvelopMessage> { message });
}
public async Task PublishAsync(IEnumerable<EnvelopMessage> envelopMessages)
{
Queue<EnvelopMessage> unsuccessfulPublishedMessages = new Queue<EnvelopMessage>();
var factory = new ConnectionFactory
{
HostName = _rabbitmqOptions.Host,
UserName = _rabbitmqOptions.User,
Password = _rabbitmqOptions.Password
};
using var connection = factory.CreateConnection();
using var channel = connection.CreateModel();
// with calling `ConfirmSelect` on the channel `NextPublishSeqNo` will be set to '1'
channel.ConfirmSelect();
_logger.LogInformation(
$"Start SequenceNumber for 'ConfirmSelect' is: {channel.NextPublishSeqNo}"
);
var startTime = Stopwatch.GetTimestamp();
channel.BasicAcks += (_, ea) =>
{
var envelop = GetMappedMessage(ea.DeliveryTag);
_logger.LogInformation(
$"Message with delivery tag '{ea.DeliveryTag}' and messageId: {envelop?.Message.MessageId} ack-ed, multiple is {ea.Multiple}."
);
RemovedConfirmedMessage(ea.DeliveryTag, ea.Multiple);
};
channel.BasicNacks += (_, ea) =>
{
var envelop = GetMappedMessage(ea.DeliveryTag);
_logger.LogInformation(
$"Message with delivery tag '{ea.DeliveryTag}' and messageId: {envelop?.Message.MessageId} nack-ed, multiple is {ea.Multiple}."
);
if (envelop is { })
unsuccessfulPublishedMessages.Enqueue(envelop);
RemovedConfirmedMessage(ea.DeliveryTag, ea.Multiple);
};
var messageList = envelopMessages.ToList();
Queue<EnvelopMessage> batchQueue = new Queue<EnvelopMessage>();
ulong currentSequenceNumber = channel.NextPublishSeqNo; // 1
foreach (var envelopMessage in messageList)
{
batchQueue.Enqueue(envelopMessage);
if (
batchQueue.Count == BatchSize
|| (
batchQueue.Count != BatchSize
&& ((batchQueue.Count - 1) + (int)currentSequenceNumber == messageList.Count)
)
)
{
currentSequenceNumber = PublishBatch(channel, batchQueue, currentSequenceNumber);
channel.WaitForConfirmsOrDie(TimeSpan.FromSeconds(50));
batchQueue = new Queue<EnvelopMessage>();
}
}
if (unsuccessfulPublishedMessages.Any())
await PublishAsync(unsuccessfulPublishedMessages);
_logger.LogInformation("All published messages are confirmed");
var endTime = Stopwatch.GetTimestamp();
_logger.LogInformation(
$"Published {
messageList.Count
} messages and handled confirm asynchronously {
Stopwatch.GetElapsedTime(startTime, endTime).TotalMilliseconds
} ms"
);
}
private ulong PublishBatch(
IModel channel,
IEnumerable<EnvelopMessage> envelopMessages,
ulong currentSequenceNumber = 1
)
{
// Create a batch of messages
var batch = channel.CreateBasicPublishBatch();
var batchMessages = envelopMessages.ToList();
foreach (var envelope in batchMessages)
{
channel.QueueDeclare(
queue: envelope.Message.GetType().Name.Underscore(),
durable: true,
exclusive: false,
autoDelete: false,
arguments: null
);
var properties = channel.CreateBasicProperties();
properties.Persistent = true;
properties.Headers = envelope.Metadata;
properties.ContentType = "application/json";
properties.Type = TypeMapper.GetTypeName(envelope.Message.GetType());
properties.MessageId = envelope.Message.MessageId.ToString();
var body = new ReadOnlyMemory<byte>(
Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(envelope.Message))
);
batch.Add(
exchange: string.Empty,
routingKey: envelope.Message.GetType().Name.Underscore(),
mandatory: true,
properties: properties,
body: body
);
_messagesDeliveryTagsDictionary.TryAdd(currentSequenceNumber++, envelope);
}
// Publish the batch of messages in a single transaction, After publishing publish messages sequence number will be incremented. internally will assign `NextPublishSeqNo` for each message and them to pendingDeliveryTags collection
batch.Publish();
return channel.NextPublishSeqNo;
}
}
This is like our previous approach, but instead of using batch confirmation with BasicPublish
to publish each message separately and wait to get confirmation for a batch of messages, we publish messages in a batch and wait to get confirmation for this batch, and it is totally more efficient.
For publishing a batch of messages, we should create a batch with channel.CreateBasicPublishBatch()
and then add publish messages to this batch with the batch.Add
method. After our batch is read to publish, we use the batch.Publish method on the batch for publishing a batch of messages.
var batch = channel.CreateBasicPublishBatch();
foreach (var envelope in batchMessages)
{
//...
var properties = channel.CreateBasicProperties();
batch.Add(
exchange: string.Empty,
routingKey: envelope.Message.GetType().Name.Underscore(),
mandatory: true,
properties: properties,
body: body
);
}
When we publish a batch of messages with batch.Publish, internally, before publishing the batch of messages to the broker, it creates publish sequence number
or NextPublishSeqNo for each message in the batch separately.
Generally Waiting for a batch of messages to be confirmed improves throughput drastically over waiting for a confirmation for individual messages up to 20-30 times
, but it is still synchronous, and we should block the thread for publishing a batch of messages and getting broker confirmations. If we publish messages in a batch and wait for their confirmations is more effect and reduce cost and number establishing connection between client and rabbitmq and totally improve of performance and network traffic. This approach can increase message publishing time if the batch size is `large because the publisher should wait to get broker confirmations for all messages in the batch before publishing the next batch, so be careful about choosing the correct batch size suitable for your system.
Publisher Confirm Asynchronously
When we use publisher confirm in a batch, we have to wait to get confirmation for a batch of messages simultaneously, and after getting confirmation for all these messages in the batch, we can go ahead and process the next batch for publishing. This approach can be useful when the number of messages being published is low, but when the number of messages increases, our message throughput and performance will decrease. So it is better for high-throughput publisher confirms that we use asynchronous publisher confirm
for our messages.
To prevent waiting for confirmations for batches of messages synchronously, which decreases our performance, RabbitMQ supports handling confirmations asynchronously. So our publisher does not have to wait for confirmation before publishing the next message to the broker and can do this asynchronously. The broker confirms published messages asynchronously and sends a notification to its registered event handlers, so to register an event handler for getting notification for a nacked
message we should use channel.BasicNacks and for registering an event handler for getting notification when a message ack-ed
we should use channel.BasicAcks. with this approach our performance will increase because publisher can publish messages very quickly without waiting for any confirmation (we get confirmation in our callbacks or event handlers), So as a result, our throughput and rate of published messages drastically increased over time.
As we mentioned before, we get the result of each message confirmation in our callbacks. The channel.BasicNacks
callback for nacked
messages and channel.BasicAcks
for ack-ed
messages, and we can do our logics inside these callbacks with help for a mapped dictionary for published sequence number and published message, which is called _messagesDeliveryTagsDictionary
, and we previously explained it completely in the Publisher Confirm with Batch Messages Synchronously section. inner BasicAcks
we can do some logging and other logics, after that we should remove our corresponding ack-ed
message with received delivery tag
from the _messagesDeliveryTagsDictionary
dictionary because its confirmation process completed and we should remove it from _messagesDeliveryTagsDictionary
dictionary with using RemovedConfirmedMessage
helper method and passing the corresponding delivery tag
, which is used as an entry for deleting elements in the dictionary:
channel.BasicAcks += (_, ea) =>
{
var envelop = GetMappedMessage(ea.DeliveryTag);
_logger.LogInformation(
$"Message with delivery tag '{ea.DeliveryTag}' and messageId: {envelop?.Message.MessageId} ack-ed, multiple is {ea.Multiple}."
);
RemovedConfirmedMessage(ea.DeliveryTag, ea.Multiple);
};
Also inner channel. BasicNacks
callback for nacked
messages, we should log the errors, and then we add our nacked message to our unsuccessful published messages queue unsuccessfulPublishedMessages
for republishing at the end of the publishing process. After that, we should also remove our corresponding nacked
message with the received delivery tag
from the _messagesDeliveryTagsDictionary
dictionary because its confirmation process has completed and we should remove it from the _messagesDeliveryTagsDictionary
dictionary:
channel.BasicNacks += (_, ea) =>
{
var envelop = GetMappedMessage(ea.DeliveryTag);
_logger.LogInformation(
$"Message with delivery tag '{ea.DeliveryTag}' and messageId: {envelop?.Message.MessageId} nack-ed, multiple is {ea.Multiple}."
);
if (envelop is { })
unsuccessfulPublishedMessages.Enqueue(envelop);
RemovedConfirmedMessage(ea.DeliveryTag, ea.Multiple);
};
Also, when we use publisher confirm, it is possible the broker does some batch confirmation for us, which increases our preference for doing confirmation for multiple messages in-place. When we have a batch confirmation from the broker, we get ea.Multiple
with a value of true
in our BasicNacks
and BasicAcks
callbacks. Multiple
is a boolean value. If false, only one message is confirmed or nack-ed; if true, all messages with a lower or equal sequence number
are confirmed or nack-ed. We explained the multiple
confirmation in the Publisher Confirm with Batch Messages Synchronously section.
The complete code for handling publisher confirm asynchronously and using BasicPublish
:
public class AsyncPublisherConfirm : IPublisher
{
private readonly ILogger<AsyncPublisherConfirm> _logger;
private readonly RabbitMqOptions _rabbitmqOptions;
private readonly ConcurrentDictionary<ulong, EnvelopMessage> _messagesDeliveryTagsDictionary =
new();
public AsyncPublisherConfirm(
IOptions<RabbitMqOptions> rabbitmqOptions,
ILogger<AsyncPublisherConfirm> logger
)
{
_logger = logger;
_rabbitmqOptions = rabbitmqOptions.Value;
}
public int TimeOut { get; set; } = 60;
public async Task PublishAsync(EnvelopMessage message)
{
await PublishAsync(new List<EnvelopMessage> { message });
}
public async Task PublishAsync(IEnumerable<EnvelopMessage> envelopMessages)
{
Queue<EnvelopMessage> unsuccessfulPublishedMessages = new Queue<EnvelopMessage>();
var factory = new ConnectionFactory
{
HostName = _rabbitmqOptions.Host,
UserName = _rabbitmqOptions.User,
Password = _rabbitmqOptions.Password
};
using var connection = factory.CreateConnection();
using var channel = connection.CreateModel();
// with calling `ConfirmSelect` on the channel `NextPublishSeqNo` will be set to '1'
channel.ConfirmSelect();
_logger.LogInformation(
$"Start SequenceNumber for 'ConfirmSelect' is: {channel.NextPublishSeqNo}"
);
var startTime = Stopwatch.GetTimestamp();
channel.BasicAcks += (_, ea) =>
{
var envelop = GetMappedMessage(ea.DeliveryTag);
_logger.LogInformation(
$"Message with delivery tag '{
ea.DeliveryTag
}' and messageId: {
envelop?.Message.MessageId
} ack-ed, multiple is {
ea.Multiple
}."
);
RemovedConfirmedMessage(ea.DeliveryTag, ea.Multiple);
};
channel.BasicNacks += (_, ea) =>
{
var envelop = GetMappedMessage(ea.DeliveryTag);
_logger.LogInformation(
$"Message with delivery tag '{
ea.DeliveryTag
}' and messageId: {
envelop?.Message.MessageId
} nack-ed, multiple is {
ea.Multiple
}."
);
if (envelop is { })
unsuccessfulPublishedMessages.Enqueue(envelop);
RemovedConfirmedMessage(ea.DeliveryTag, ea.Multiple);
};
var list = envelopMessages.ToList();
foreach (var envelopMessage in list)
{
channel.QueueDeclare(
queue: envelopMessage.Message.GetType().Name.Underscore(),
durable: true,
exclusive: false,
autoDelete: false,
arguments: null
);
var properties = channel.CreateBasicProperties();
properties.Persistent = true;
properties.Headers = envelopMessage.Metadata;
properties.ContentType = "application/json";
properties.Type = TypeMapper.GetTypeName(envelopMessage.Message.GetType());
properties.MessageId = envelopMessage.Message.MessageId.ToString();
var currentSequenceNumber = channel.NextPublishSeqNo;
_messagesDeliveryTagsDictionary.TryAdd(currentSequenceNumber, envelopMessage);
channel.BasicPublish(
exchange: string.Empty,
routingKey: envelopMessage.Message.GetType().Name.Underscore(),
basicProperties: null,
body: Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(envelopMessage.Message))
);
var nextSequenceNumberAfterPublish = channel.NextPublishSeqNo;
_logger.LogInformation(
$"message with messageId: {
envelopMessage.Message.MessageId
} published, and current SequenceNumber is: {
currentSequenceNumber
}, next SequenceNumber after publishing is: {
nextSequenceNumberAfterPublish
}."
);
}
await WaitUntilConditionMet(
() => Task.FromResult(_messagesDeliveryTagsDictionary.IsEmpty),
TimeOut,
"All messages could not be confirmed in 60 seconds"
);
if (unsuccessfulPublishedMessages.Any())
await PublishAsync(unsuccessfulPublishedMessages);
_logger.LogInformation("All published messages are confirmed");
var endTime = Stopwatch.GetTimestamp();
_logger.LogInformation(
$"Published {
list.Count
} messages and handled confirm asynchronously {
Stopwatch.GetElapsedTime(startTime, endTime).TotalMilliseconds
} ms"
);
}
Now let's implement this asynchronous confirmation with PublishBatch
instead of BasicPublish
.
If we want, each message to be published individually, we need to establish a connection between the client and RabbitMQ separately, so when the number of messages increases, our overall permanence will decrease because each establishing connection takes some time. When we publish messages in a batch instead of publishing them separately, we can decrease the number of connections that are required for publishing messages because we use a single connection for publishing a batch of messages. Therefore, we decrease the overhead of establishing connections for each message separately, and our overall performance will increase.
We can send our batch of messages in-place and in a single transaction, which will decrease overhead, establish a connection between the client and RabbitMQ for each published message, and decrease network traffic.
public class PublisherConfirmBatchAsync : IPublisher
{
private readonly ILogger<PublisherConfirmBatchAsync> _logger;
private readonly RabbitMqOptions _rabbitmqOptions;
private readonly ConcurrentDictionary<ulong, EnvelopMessage> _messagesDeliveryTagsDictionary =
new();
public PublisherConfirmBatchAsync(
IOptions<RabbitMqOptions> rabbitmqOptions,
ILogger<PublisherConfirmBatchAsync> logger
)
{
_logger = logger;
_rabbitmqOptions = rabbitmqOptions.Value;
}
public int TimeOut { get; set; } = 60;
public int BatchSize { get; set; } = 100;
public async Task PublishAsync(EnvelopMessage message)
{
await PublishAsync(new List<EnvelopMessage> { message });
}
public async Task PublishAsync(IEnumerable<EnvelopMessage> envelopMessages)
{
Queue<EnvelopMessage> unsuccessfulPublishedMessages = new Queue<EnvelopMessage>();
var factory = new ConnectionFactory
{
HostName = _rabbitmqOptions.Host,
UserName = _rabbitmqOptions.User,
Password = _rabbitmqOptions.Password
};
using var connection = factory.CreateConnection();
using var channel = connection.CreateModel();
// with calling `ConfirmSelect` on the channel `NextPublishSeqNo` will be set to '1'
channel.ConfirmSelect();
_logger.LogInformation(
$"Start SequenceNumber for 'ConfirmSelect' is: {channel.NextPublishSeqNo}"
);
var startTime = Stopwatch.GetTimestamp();
channel.BasicAcks += (_, ea) =>
{
var envelop = GetMappedMessage(ea.DeliveryTag);
_logger.LogInformation(
$"Message with delivery tag '{
ea.DeliveryTag
}' and messageId: {
envelop?.Message.MessageId
} ack-ed, multiple is {
ea.Multiple
}."
);
RemovedConfirmedMessage(ea.DeliveryTag, ea.Multiple);
};
channel.BasicNacks += (_, ea) =>
{
var envelop = GetMappedMessage(ea.DeliveryTag);
_logger.LogInformation(
$"Message with delivery tag '{
ea.DeliveryTag
}' and messageId: {
envelop?.Message.MessageId
} nack-ed, multiple is {
ea.Multiple
}."
);
if (envelop is { })
unsuccessfulPublishedMessages.Enqueue(envelop);
RemovedConfirmedMessage(ea.DeliveryTag, ea.Multiple);
};
var messageList = envelopMessages.ToList();
Queue<EnvelopMessage> batchQueue = new Queue<EnvelopMessage>();
ulong currentSequenceNumber = channel.NextPublishSeqNo; // 1
foreach (var envelopMessage in messageList)
{
batchQueue.Enqueue(envelopMessage);
if (
batchQueue.Count == BatchSize
|| (
batchQueue.Count != BatchSize
&& ((batchQueue.Count - 1) + (int)currentSequenceNumber == messageList.Count)
)
)
{
currentSequenceNumber = PublishBatch(channel, batchQueue, currentSequenceNumber);
channel.WaitForConfirmsOrDie(TimeSpan.FromSeconds(50));
batchQueue = new Queue<EnvelopMessage>();
}
}
await WaitUntilConditionMet(
() => Task.FromResult(_messagesDeliveryTagsDictionary.IsEmpty),
TimeOut,
"All messages could not be confirmed in 60 seconds"
);
if (unsuccessfulPublishedMessages.Any())
await PublishAsync(unsuccessfulPublishedMessages);
_logger.LogInformation("All published messages are confirmed");
var endTime = Stopwatch.GetTimestamp();
_logger.LogInformation(
$"Published {
messageList.Count
} messages and handled confirm asynchronously {
Stopwatch.GetElapsedTime(startTime, endTime).TotalMilliseconds
} ms"
);
}
private ulong PublishBatch(
IModel channel,
IEnumerable<EnvelopMessage> envelopMessages,
ulong currentSequenceNumber = 1
)
{
// Create a batch of messages
var batch = channel.CreateBasicPublishBatch();
var batchMessages = envelopMessages.ToList();
foreach (var envelope in batchMessages)
{
channel.QueueDeclare(
queue: envelope.Message.GetType().Name.Underscore(),
durable: true,
exclusive: false,
autoDelete: false,
arguments: null
);
var properties = channel.CreateBasicProperties();
properties.Persistent = true;
properties.Headers = envelope.Metadata;
properties.ContentType = "application/json";
properties.Type = TypeMapper.GetTypeName(envelope.Message.GetType());
properties.MessageId = envelope.Message.MessageId.ToString();
var body = new ReadOnlyMemory<byte>(
Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(envelope.Message))
);
batch.Add(
exchange: string.Empty,
routingKey: envelope.Message.GetType().Name.Underscore(),
mandatory: true,
properties: properties,
body: body
);
_messagesDeliveryTagsDictionary.TryAdd(currentSequenceNumber++, envelope);
}
// Publish the batch of messages in a single transaction, After publishing publish messages sequence number will be incremented. internally will assign `NextPublishSeqNo` for each message and them to pendingDeliveryTags collection
batch.Publish();
return channel.NextPublishSeqNo;
}
}
Source Code
All source code with different approaches are available in This Repository on the GitHub.