Design and implementation of Cosmos DB
Change Feed-centric architecture

Kazuyuki Miyake

Tatsuro Shibamura

Agenda

1. Change Feed-centric architecture Design & Strategy

2. Change Feed-centric architecture Deep Dive

  • Tatsuro Shibamura (Microsoft MVP for Azure)
  • @shibayan

1. Cosmos DB Change Feed-centric architecture
Design & Strategy

Massive data processing Needs and Challenges

Balancing massive data writing and complex queries

  • No performance degradation under Massive writing
  • Can handle different types of queries
  • Cost model to pay as you go

Limitations of traditional architectures

Try to handle everything in one big datastore...

  • Write-optimized datastore are weak on complex queries
  • Query-optimized data stores are weak to massively concurrent writes

-> As a result, rely on over-spec datastores

CQRS + Materialized-Views

  1. Separate write and read to absorb differences
  2. Deploy a query-optimized Materialized-View

Cosmos DB ChangeFeed + Azure Functions

  • No need to implement mechanisms for CQRS
  • Synchronized in near Real-time

Scalable ChangeFeed Centric Architecture

  • Various services can be added starting from Change Feed

ChangeFeed Centric Case Study - JFE Engineering

jfee

2. Cosmos DB Change Feed-centric architecture
Deep Dive

Two Change Feed usage patterns

1. Push model -> Data Transformation, Stream Processing

2. Pull model -> Batch Processing

Data Transformation, Stream Processing

  • Used for processing to stream data with low latency
  • The best solution is to use CosmosDBTrigger in Azure Functions
  • For write-fast storage such as SQL Database and Redis Cache
    • Also used when writing back to Cosmos DB (creating materialized view)

Sample code - Push model

public class Function1
{
    public Function1(CosmosClient cosmosClient)
    {
        _container = cosmosClient.GetContainer("SampleDB", "MaterializedView");
    }

    private readonly Container _container;

    [FunctionName("Function1")]
    public async Task Run([CosmosDBTrigger(
                               databaseName: "SampleDB",
                               collectionName: "TodoItems",
                               LeaseCollectionName = "leases")]
                           IReadOnlyList<Document> input, ILogger log)
    {
        var tasks = new Task[input.Count];

        for (int i = 0; i < input.Count; i++)
        {
            // Change the partition key and write it back (actually, do advanced conversion)
            var partitionKey = new PartitionKey(input[i].GetPropertyValue<string>("anotherKey"));

            tasks[i] = _container.UpsertItemStreamAsync(new MemoryStream(input[i].ToByteArray()), partitionKey);
        }

        await Task.WhenAll(tasks);
    }
}

Batch Processing

  • Use when you need to process a large amount of data at one time
  • It is practical to implement it using TimerTrigger in Azure Functions
  • Used for archiving to Blob Storage / Data Lake Storage Gen 2
    • Storage GPv2 and Data Lake Storage Gen 2 are charged by the number of write transactions, so writing stream data every time increases costs

Data Lake Storage Gen 2 Transaction prices

Sample code - Pull model

public class Function2
{
    public Function2(CosmosClient cosmosClient)
    {
        _container = cosmosClient.GetContainer("SampleDB", "TodoItems");
    }

    private readonly Container _container;

    [FunctionName("Function2")]
    public async Task Run([TimerTrigger("0 */5 * * * *")] TimerInfo myTimer, ILogger log)
    {
        var continuationToken = await LoadContinuationTokenAsync();

        var changeFeedStartFrom = continuationToken != null ? ChangeFeedStartFrom.ContinuationToken(continuationToken) : ChangeFeedStartFrom.Now();

        var changeFeedIterator = _container.GetChangeFeedIterator<TodoItem>(changeFeedStartFrom, ChangeFeedMode.Incremental);

        while (changeFeedIterator.HasMoreResults)
        {
            try
            {
                var items = await changeFeedIterator.ReadNextAsync();

                // TODO: Implementation
            }
            catch (CosmosException ex) when (ex.StatusCode == HttpStatusCode.NotModified)
            {
                continuationToken ??= ex.Headers.ContinuationToken;
                break;
            }
        }

        await SaveContinuationTokenAsync(continuationToken);
    }
}

For a reliable Change Feed-centric architecture

Improving resiliency

Idempotency and eventual consistency

Avoid inconsistent states

Improving resiliency - Retry policy

  • CosmosDBTrigger proceeds to the next Change Feed when an execution error occurs.
  • Retry policy is used because data in case of failure will be lost without being processed again.
  • Use FixedDelayRetry or ExponentialBackoffRetry with an unlimited (-1) maximum number of retries.
    • Change Feed will not proceed until successful, so no data will be lost.

Sample code - Retry policy

public class Function1
{
    // infinity retry with 10 sec interval
    [FixedDelayRetry(-1, "00:00:10")]
    [FunctionName("Function1")]
    public async Task Run([CosmosDBTrigger(
                              databaseName: "SampleDB",
                              collectionName: "TodoItems",
                              LeaseCollectionName = "leases")]
                          IReadOnlyList<Document> input)
    {
        // TODO: Implementation
    }
}

Focus on idempotency and eventual consistency

  • Coding for idempotency whenever possible
    • For storage that can be overwrite or delete (Cosmos DB / SQL Database / etc)
  • When it is difficult to ensure idempotency, focus on eventual consistency.
    • Focus on "At least once"
    • For storage that can only be append (Blob Storage / Data Lake Storage Gen 2)

Avoid inconsistent states - Graceful shutdown

  • Azure Functions will be restarted when a new version is deployed or platform is updated
    • If the host is restarted while executing a Function, the states may be inconsistent
  • Implement Graceful shutdown to avoid inconsistent states
    • Increase resiliency by combining with Retry policy

Sample code - Graceful shutdown

public class Function1
{
    // infinity retry with 10 sec interval
    [FixedDelayRetry(-1, "00:00:10")]
    [FunctionName("Function1")]
    public async Task Run([CosmosDBTrigger(
                              databaseName: "SampleDB",
                              collectionName: "TodoItems",
                              LeaseCollectionName = "leases")]
                          IReadOnlyList<Document> input,
                          CancellationToken cancellationToken)
    {
        try
        {
            // Pass cancellation token
            await Task.Delay(TimeSpan.FromSeconds(5), cancellationToken);
        }
        catch (OperationCanceledException)
        {
            // TODO: Implement rollback
            throw;
        }
    }
}

References