Table of Contents

Pub/Sub communication

Use of Neos communication API

graph TD
    CTA --> |"1 - publish event"|DAPR_A("Sidecar DAPR A")
    DAPR_A:::dapr --> |"2 - write message"|MQ("message broker component (ex: RabbitMQ)")
    MQ --> DAPR_B("Sidecar DAPR B")
    MQ --> DAPR_C("Sidecar DAPR C")
    DAPR_B:::dapr --> CTB
    DAPR_C:::dapr --> CTC

    subgraph Cluster B
        CTB["subscription"]
    end

    subgraph Cluster C
        CTC["subscription"]
    end

    subgraph Cluster A
        CTA["Business code"]
    end

    classDef tm fill:#30EDFF;
    classDef dapr fill:#0067C0,color:#fff
    classDef api fill:#347804,color:#fff

The message broker component is a component supported by DAPR supported components. In the examples we will use RabbitMQ.

Configuration

It is first necessary to set up the DAPR communication.
If you use docker-compose then create a components folder and in this folder create the neos-pubsub.yaml file with the following contents.

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: neos-pubsub
  namespace: my-namespace
spec:
  type: pubsub.rabbitmq
  version: v1
  metadata:
  - name: host
    value: "amqp://rabbitmq:5672"
  - name: durable
    value: "false"
  - name: deletedWhenUnused
    value: "false"
  - name: autoAck
    value: "false"
  - name: reconnectWait
    value: "0"
  - name: concurrency
    value: parallel

The service in the docker-compose.yml to start the DAPR side-car :

mycluster-backend-dapr:
    image: "daprio/daprd:1.8.4"
    container_name: mycluster-backend-dapr
    command: [
      "./daprd",
     "--app-id", "mycluster",
     "--app-port", "7000",
     "--log-level", "debug",
     "--resources-path", "./components",
     "--config", "./configuration/neos-config.yaml"
     ]
    volumes:
      - "./components/:/components"
      - "./configuration/:/configuration"
      - "../docker/backend/MyCluster.AspNetCore:/app"
    network_mode: "service:mycluster-backend"
    depends_on:
      - mycluster-backend

If you use Kubernetes the following yaml file must be apply with the command kubectl apply -f neos-pubsub.yaml.

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: neos-pubsub
  namespace: my-namespace
spec:
  type: pubsub.rabbitmq
  version: v1
  metadata:
  - name: host
    value: "amqp://rabbitmq:5672"
  - name: durable
    value: "false"
  - name: deletedWhenUnused
    value: "false"
  - name: autoAck
    value: "false"
  - name: reconnectWait
    value: "0"
  - name: concurrency
    value: parallel

It is possible to modify the spec part to use another component than rabbitmq or to modify the rabbitmq settings. See DAPR RabbitMQ for more information.

Warning

The name neos-pubsub and the namespace neos must not be changed.

Use of Neos Pub/Sub

Publish

The GroupeIsa.Neos.ClusterCommunication.IEventPublication service provides the method PublishEventAsync to publish a message.

The code below publish an event named InterClusterMessage. The event data is a simple string that represents a message to be displayed.

public async Task ExecuteAsync(string message)
{
    if (_applicationInfo.IsServiceCommunicationEnabled)
    {
        await _eventPublication.PublishEventAsync("InterClusterMessage", message);
    }
    else
    {
        throw new BusinessException(Properties.Resources.CommunicationInterClusterNotAvailable);
    }
}

Time to live

By default, every published event is cleared if no subscriber handles it before 72h.

The time to live duration can be configured by passing a PubSubOptions instance to the PublishEventAsync method.

In the example bellow, the event InterClusterMessage has a custom time to live of 30s. If the event is not handled successfully by a subscriber before it expires, it will be removed from the events queue.

public async Task ExecuteAsync(string message)
{
    if (_applicationInfo.IsServiceCommunicationEnabled)
    {
        PubSubOptions options = new() { TimeToLiveInSeconds = 30 };

        await _eventPublication.PublishEventAsync("InterClusterMessage", message, options);
    }
    else
    {
        throw new BusinessException(Properties.Resources.CommunicationInterClusterNotAvailable);
    }
}

Subscribe

To subscribe to a published message, a server method must be created in the target cluster. Click on "Advanced options" button in the toolbar.
Then select the "Subscription" tab.
Turn the "Subscribe to en event" switch on.
Enter the name of the event in "Subscribed event name".

This server method will be executed for all subscribed clusters when the InterClusterMessage is published.

Below is the code for the server method, the IBroadcastMessage send a message by signalR to all connections.

/// <summary>
/// Represents MessageReception method.
/// </summary>
public class MessageReception : IMessageReception
{
    private readonly IBroadcastMessage _broadcastMessage;

    /// <summary>
    /// Initializes a new instance of the <see cref="MessageReception"/> class.
    /// </summary>
    /// <param name="broadcastMessage">The service to broadcast a message.</param>
    public MessageReception(IBroadcastMessage broadcastMessage)
    {
        _broadcastMessage = broadcastMessage;
    }

    /// <inheritdoc/>
    public async Task ExecuteAsync(string message)
    {
        await _broadcastMessage.ExecuteAsync($"InterCluster message: {message}", Domain.Enums.NotificationTarget.All);
    }
}

Retries and Dead letter topic

There are times when applications might not be able to handle messages for a variety of reasons. For example, there could be transient issues retrieving data needed to process a message or the app business logic fails returning an error. Dead letter topics are used to forward messages that cannot be delivered to a subscribing app. This eases the pressure on app by freeing them from dealing with these failed messages, allowing developers to write code that reads from the dead letter topic and either fixes the message and resends this, or abandons it completely.

Dead letter topics are typically used in along with a retry resiliency policy and a dead letter subscription that handles the required logic for dealing with the messages forwarded from the dead letter topic.

When a dead letter topic is set, any message that failed to be delivered to an app for a configured topic is put on the dead letter topic to be forwarded to a subscription that handles these messages. This could be the same app or a completely different one. (source)

In Neos, a default dead letter topic DefaultDeadLetter is set for every subscription. Thus, you can subscribe to this topic in any cluster server method if you want to handle messages that failed to be delivered.

Before the message is published on the dead letter topic, a Dapr retry resiliency policy is applied.

By default, the global retry policy on a Neos environment is the following :

policies:
  retries:
    # Neos pubsub retry policy (5 retries with 10s delay between each retry)
    neosPubsubRetries:
      policy: constant
      duration: 10s
      maxRetries: 5
targets: # https://docs.dapr.io/developing-applications/building-blocks/pubsub/pubsub-deadletter/#retries-and-dead-letter-topics
  components:
    neos-pubsub:
      inbound:
        retry: neosPubsubRetries
      outbound:
        retry: neosPubsubRetries
Note

To customize global resiliency policy in your environment, please see this article for development or this article for production.

Note

In addition, you can also customize resiliency policies by cluster. Please see this article for development or this article for production.

Warning

If you subscribe to the DefaultDeadLetter topic and a messages fails to be delivered, the event will be deleted.

Under the hood

GroupeIsa.Neos.ClusterCommunication.IEventPublication use the building DAPR Pub/Sub

Note

If the default resiliency policies doesn't fit your needs, you can override them in the cluster configuration file, see this article for global policies and this article for cluster specific policies.