Pub/Sub communication
Use of Neos communication API
graph TD
CTA --> |"1 - publish event"|DAPR_A("Sidecar DAPR A")
DAPR_A:::dapr --> |"2 - write message"|MQ("message broker component (ex: RabbitMQ)")
MQ --> DAPR_B("Sidecar DAPR B")
MQ --> DAPR_C("Sidecar DAPR C")
DAPR_B:::dapr --> CTB
DAPR_C:::dapr --> CTC
subgraph Cluster B
CTB["subscription"]
end
subgraph Cluster C
CTC["subscription"]
end
subgraph Cluster A
CTA["Business code"]
end
classDef tm fill:#30EDFF;
classDef dapr fill:#0067C0,color:#fff
classDef api fill:#347804,color:#fff
The message broker
component is a component supported by DAPR supported components. In the examples we will use RabbitMQ.
Configuration
It is first necessary to set up the DAPR communication.
If you use docker-compose then create a components
folder and in this folder create the neos-pubsub.yaml
file with the following contents.
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: neos-pubsub
namespace: my-namespace
spec:
type: pubsub.rabbitmq
version: v1
metadata:
- name: host
value: "amqp://rabbitmq:5672"
- name: durable
value: "false"
- name: deletedWhenUnused
value: "false"
- name: autoAck
value: "false"
- name: reconnectWait
value: "0"
- name: concurrency
value: parallel
The service in the docker-compose.yml to start the DAPR side-car :
mycluster-backend-dapr:
image: "daprio/daprd:1.8.4"
container_name: mycluster-backend-dapr
command: [
"./daprd",
"--app-id", "mycluster",
"--app-port", "7000",
"--log-level", "debug",
"--resources-path", "./components",
"--config", "./configuration/neos-config.yaml"
]
volumes:
- "./components/:/components"
- "./configuration/:/configuration"
- "../docker/backend/MyCluster.AspNetCore:/app"
network_mode: "service:mycluster-backend"
depends_on:
- mycluster-backend
If you use Kubernetes the following yaml file must be apply with the command kubectl apply -f neos-pubsub.yaml
.
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: neos-pubsub
namespace: my-namespace
spec:
type: pubsub.rabbitmq
version: v1
metadata:
- name: host
value: "amqp://rabbitmq:5672"
- name: durable
value: "false"
- name: deletedWhenUnused
value: "false"
- name: autoAck
value: "false"
- name: reconnectWait
value: "0"
- name: concurrency
value: parallel
It is possible to modify the spec
part to use another component than rabbitmq
or to modify the rabbitmq
settings. See DAPR RabbitMQ for more information.
Warning
The name neos-pubsub
and the namespace neos
must not be changed.
Use of Neos Pub/Sub
Publish
The GroupeIsa.Neos.ClusterCommunication.IEventPublication service provides the method PublishEventAsync
to publish a message.
The code below publish an event named InterClusterMessage
. The event data is a simple string that represents a message to be displayed.
public async Task ExecuteAsync(string message)
{
if (_applicationInfo.IsServiceCommunicationEnabled)
{
await _eventPublication.PublishEventAsync("InterClusterMessage", message);
}
else
{
throw new BusinessException(Properties.Resources.CommunicationInterClusterNotAvailable);
}
}
Time to live
By default, every published event is cleared if no subscriber handles it before 72h.
The time to live duration can be configured by passing a PubSubOptions instance to the PublishEventAsync method.
In the example bellow, the event InterClusterMessage
has a custom time to live of 30s
.
If the event is not handled successfully by a subscriber before it expires, it will be removed from the events queue.
public async Task ExecuteAsync(string message)
{
if (_applicationInfo.IsServiceCommunicationEnabled)
{
PubSubOptions options = new() { TimeToLiveInSeconds = 30 };
await _eventPublication.PublishEventAsync("InterClusterMessage", message, options);
}
else
{
throw new BusinessException(Properties.Resources.CommunicationInterClusterNotAvailable);
}
}
Subscribe
To subscribe to a published message, a server method must be created in the target cluster.
Click on "Advanced options" button in the toolbar.
Then select the "Subscription" tab.
Turn the "Subscribe to en event" switch on.
Enter the name of the event in "Subscribed event name".
This server method will be executed for all subscribed clusters when the InterClusterMessage
is published.
Below is the code for the server method, the IBroadcastMessage
send a message by signalR to all connections.
/// <summary>
/// Represents MessageReception method.
/// </summary>
public class MessageReception : IMessageReception
{
private readonly IBroadcastMessage _broadcastMessage;
/// <summary>
/// Initializes a new instance of the <see cref="MessageReception"/> class.
/// </summary>
/// <param name="broadcastMessage">The service to broadcast a message.</param>
public MessageReception(IBroadcastMessage broadcastMessage)
{
_broadcastMessage = broadcastMessage;
}
/// <inheritdoc/>
public async Task ExecuteAsync(string message)
{
await _broadcastMessage.ExecuteAsync($"InterCluster message: {message}", Domain.Enums.NotificationTarget.All);
}
}
Retries and Dead letter topic
There are times when applications might not be able to handle messages for a variety of reasons. For example, there could be transient issues retrieving data needed to process a message or the app business logic fails returning an error. Dead letter topics are used to forward messages that cannot be delivered to a subscribing app. This eases the pressure on app by freeing them from dealing with these failed messages, allowing developers to write code that reads from the dead letter topic and either fixes the message and resends this, or abandons it completely.
Dead letter topics are typically used in along with a retry resiliency policy and a dead letter subscription that handles the required logic for dealing with the messages forwarded from the dead letter topic.
When a dead letter topic is set, any message that failed to be delivered to an app for a configured topic is put on the dead letter topic to be forwarded to a subscription that handles these messages. This could be the same app or a completely different one. (source)
In Neos, a default dead letter topic DefaultDeadLetter
is set for every subscription.
Thus, you can subscribe to this topic in any cluster server method if you want to handle messages that failed to be delivered.
Before the message is published on the dead letter topic, a Dapr retry resiliency policy is applied.
By default, the global retry policy on a Neos environment is the following :
policies:
retries:
# Neos pubsub retry policy (5 retries with 10s delay between each retry)
neosPubsubRetries:
policy: constant
duration: 10s
maxRetries: 5
targets: # https://docs.dapr.io/developing-applications/building-blocks/pubsub/pubsub-deadletter/#retries-and-dead-letter-topics
components:
neos-pubsub:
inbound:
retry: neosPubsubRetries
outbound:
retry: neosPubsubRetries
Note
To customize global resiliency policy in your environment, please see this article for development or this article for production.
Note
In addition, you can also customize resiliency policies by cluster. Please see this article for development or this article for production.
Warning
If you subscribe to the DefaultDeadLetter
topic and a messages fails to be delivered, the event will be deleted.
Under the hood
GroupeIsa.Neos.ClusterCommunication.IEventPublication
use the building DAPR Pub/Sub
Note
If the default resiliency policies doesn't fit your needs, you can override them in the cluster configuration file, see this article for global policies and this article for cluster specific policies.