Table of Contents

State store

Overview

The state store enables non-persistent states to be stored in a distributed architecture.The distributed cache must be fast, scalable (several replicas) and built for high avaibility.

The state store stores data in key/value format, with the key indexed.

Consistency

The state store offers a choice of two consistency:

  • Strong consistency
    Each node in the cluster responds with the latest data, even if the system has to block the query until all replicas have been updated. If you query a "consistent system" for a currently updated item, you won't get a response until all replicas have been properly updated. However, you will always receive the most up-to-date data.

  • Availability
    Each node returns an immediate response, even if this response does not include the most recent data. If you query an "available system" for an item that's being updated, you'll get the best possible answer the service can provide at the time.

Concurrential access

The state store offers a choice between two types of optimistic concurrential access:

  • Last write wins
    There is no concurrential access check, the key/value is updated without checking that there has been a competitive modification.

  • First write win
    Use of ETAG to manage competitive access, if writing requires reloading the current data and modifying it.

Transactions

Neos does not manage transactions involving the writing of multiple keys/values.

Components

The neos run -dp command uses the Redis component to implement the state store. Neos also recommends using Redis in production.

Architecture

graph TD
  CTA("Cluster Neos A") --> |"Update state by key"|STORE("Store (e.g. REDIS)"):::store
  STORE --> |"Get state by key"|CTB("Cluster Neos B")
  STORE --> |"Query State"|CTB
  CTC("Cluster Neos C") --> |"Update state by key"|STORE
  CTD("Cluster Neos D") --> |"Get state by key"|STORE
  classDef store fill:#C00306,color:#fff;

Getting started

In development mode, the state store is only available with the -dp option at startup, using the neos run -dp command.

To check that the necessary prerequisites (dapr, wsl, redis) have been installed, run the neos setup command first.

If you encounter problems with the neos setup command, you can manually install dapr and redis by following the documentation Install prerequisites manually.

For example, under Windows, the neos-state-redis.yaml file is created in C:\Users\[windows_user]\.neos\dapr\components.

Use of Neos State Store API

Neos expose the interface GroupeIsa.Neos.ClusterCommunication.DistributedStore.IStateStore to manage the state store.

To illustrate the use of the state store, we'll cache countries from an OpenData API call. The key will be the country's ISO code.

Add entry to the state store

To add an entry to the state store, you should use the method IStateStore.SaveStateAsync

  • 1) Inject IStateStore
private readonly IHttpClientFactory _httpClientFactory;
private readonly Lazy<IStateStore> _countryStateStore;
private const string _keyPrefix = "Country/";

/// <summary>
/// Initializes a new instance of the <see cref="UpdateExternalCountryCache"/> class.
/// </summary>
/// <param name="httpClientFactory">The httpClientFactory.</param>
/// <param name="countryStateStore">The country state store.</param>
public UpdateExternalCountryCache(IHttpClientFactory httpClientFactory, Lazy<IStateStore> countryStateStore)
{
    _httpClientFactory = httpClientFactory;
    _countryStateStore = countryStateStore;
}
Warning

IStateStore is only implemented for DAPR, if you use it outside the DAPR context, you need to use lazy dependency injection.

Note

You can index by a key identifying a country, in which case you'll have as many value keys as countries. In this case, the best practice is to generate a key with a prefix identifying the type of data (e.g. country), which allows you to use the state store for other data.
This choice is preferable if you wish to obtain information on a country quickly, as the state store does not allow you to obtain the list of keys, making it difficult to update or delete all the countries.
You can also index with a "Country" key whose value is all countries (json table of countries).
This choice is preferable when you want to manipulate all countries. > The Neos TechnicalDemos cluster presents these two implementations.

  • 2) Examples of how to add or update a key

    Update with concurrential access LastWrite

Example: Update a country in the case of a country cache.

await _stateStore.SaveStateAsync("FR", country);

Example: Update cache for all countries

await _countryStateStore.SaveStateAsync("Countries", countries);
Note

The default concurrential access mode is LastWrite and consistency is Strong.

Update with concurrential access FirstWrite

 bool succeded = await _countryStateStore.TrySaveStateAsync($"{_keyPrefix}/FR", country , etag ?? string.Empty, StateStoreConsistency.Availability, StateStoreConcurrency.FirstWrite);
Note

To create a new key/value, etag must be set to string.Empty; to update, etag must first be retrieved using the IStateStore.GetStateAsync method.

  • 3) Get a key/value from the blind by its key
(Country country, string etag) =  await _countryStateStore.GetStateAsync($"{_keyPrefix}/FR");
  • 4) Get the store keys/values by query
Warning

This feature is not available for all State Store components. Redis supports this feature provided the RedisSearch and RedisJson modules are installed.

Example: obtain the set of keys/values for European Union countries.

 StateStoreQueryResult<ExternalCountry> externalItems = await _countryStateStore.QueryStateAsync<ExternalCountry>(
    new()
    {
        Filter = new("europeanUnion", true),
        Page = new(50)
    },
    new string[] { "europeanUnionIdx" });
Note

Default pagination is 10 records per page. The response's PaginationToken property is used to obtain the following page.

Warning

For the Redis component, only indexed properties can be used for the filter; the necessary index (e.g. europeanUnionIdx) must be specified in the QueryStateAsync method.

Warning

An index is mandatory even if the filter is left blank.

  • 5) Delete a key/value

Example: delete a country in the case of a country cache.

await _countryStateStore.DeleteStateAsync($"{_keyPrefix}/FR");

Example: delete the cache for all countries.

await _countryStateStore.DeleteStateAsync("Countries");
  • 6) Bulk update

Create a key/value set :

ExternalCountries countries = await GetExternalCountriesAsync();
await _countryStateStore.SaveBulkStateAsync<ExternalCountry>(countries.Records.Select(c => new BulkStateStoreItem<ExternalCountry>($"{_keyPrefix}/c.Fields.Iso2", c.Fields, string.Empty)).ToList());

In this example, we obtain data from an OpenData API and then cache it in bulk mode using the SaveBulkStateAsync method.

Delete a key/value set :

var cachedCountries = await _countryStateStore.QueryStateAsync<ExternalCountry>(new StateStoreQuery(Page = new(500)}));
await _countryStateStore.DeleteBulkStateAsync(
    cachedCountries.Values
    .Where(c => c.Key.StartsWith(_keyPrefix))
    .Select(c => new BulkStateStoreItem(c.Key, c.ETag)).ToList());

In this example, we obtain all the keys/values, then delete the country keys/values.

Redis component : create indexes

An index is required to query the Redis cache. These indexes can be defined in the cluster configuration file using the command neos run -dp.

# State store Redis indexes
RedisQueryIndexes:
  - Name: europeanUnionIdx
    Indexes: 
      - Key: europeanUnion
        type: NUMERIC      

An index must be defined with a unique name. An index is made up of one or more keys, the key name must be defined in CamelCase (the C# object in PascalCase is serialized in Json CamelCase).

Indexes use the syntax JsonPath. For more information on creating indexes in Redis, see Create Index with Json.

Index type values are available at FT.CREATE.

Json Type Index Type
string TEXT
number NUMERIC
boolean NOT SUPPORTED
Warning

The boolean type is not supported.
This index must be set for all clusters querying the Redis cache and using this index.

Under the hood

GroupeIsa.Neos.ClusterCommunication.DistributedStore.IStateStore use the building block DAPR StateStore.

Manually install prerequisites

Windows

  • Install DAPR (mode self-hosted)
winget install Dapr.CLI -V 1.10.0
  • Enable WSL2 and install a distribution

Install WSL2. Recommended distributions are Ubuntu or Debian.

  • Install Redis To install Redis follow the instructions at Redis install

Linux (Ubuntu, Debian)

  • Install DAPR (mode self-hosted)
wget -q https://raw.githubusercontent.com/dapr/cli/master/install/install.sh -O - | /bin/bash -s 1.10.0
  • Install Redis To install Redis follow the instructions at Redis install

Install Redis modules

Run under WSL2 for Windows or bash for Linux.

  • RedisJson
apt-get update
apt-get install git
curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh
source "$HOME/.cargo/env"
apt-get install build-essential
apt-get install -y clang
git clone https://github.com/RedisJSON/RedisJSON.git
cd RedisJSON
cargo build --release
mkdir /etc/redis/modules
cp ./target/release/librejson.so /etc/redis/modules/
echo "loadmodule /etc/redis/modules/librejson.so" >> /etc/redis/redis.conf

With SystemD, run the following command to restart:

sudo systemctl restart redis

or the following command :

sudo service redis-server restart
  • Redisearch
git clone --recursive https://github.com/RediSearch/RediSearch.git
cd RediSearch
make setup
make build
cp ./bin/linux-x64-release/search/redisearch.so /etc/redis/modules/
echo "loadmodule /etc/redis/modules/redisearch.so" >> /etc/redis/redis.conf

With SystemD, run the following command to restart:

sudo systemctl restart redis

or the following command :

sudo service redis-server restart

Uninstall Redis

To uninstall Redis, use the following command :

sudo apt-get purge --auto-remove redis-server

Viewing the Redis cache

Use redis-cli

Under Linux or WSL for Windows, launch the Redi client with the redis-cli command.

  • To view keys: Command KEY

    Example for viewing all keys :

    KEY *
    
  • To view the json value of a key : Command JSON.GET

    JSON.GET Country/FR
    
  • Delete a key: Command DEL

    DEL Country/FR
    
  • View index list Command FT._LIST

    FT._LIST
    
  • Delete an index Command FT.DROPINDEX

    FT.DROPINDEX europeanUnionIdx
    
  • Delete all keys command FLUSHDB

    FLUSHDB
    
Warning

If the Redis instance is also used for pub/sub, all services using it must be restarted after this command.

Extension VS Code

You can use Visual Studio Code Extension like vscode-redis-client to visualize keys, delete keys, etc ...

To view the Redis cache of a Kubernetes pod, you can perform a PORT-FORWARD :

kubectl port-forward [name-of-redis-pod] 6379 6380

And connect to the redis server 127.0.0.1:6380