Sub-orchestrations in Durable Functions

In the previous post I was working on very basic concepts of Durable Functions. Since they are connected to a simple in-cloud game engine, I'll go a bit further and show you how sub-orchestrations help in shaping a solution, so all concepts are decoupled and isolated.

Why sub-orchestration?

While it's perfectly fine to build your orchestrations using multiple activities called one-by-one(or parallelized - it's still a valid solution), sometimes you'd like to isolate different concepts emerging from a one project. Let's consider our example - we'd like to create a galaxy with N planets inside it. There're two approaches possible:

  • perform all operations inside one orchestration so each action is an activity
  • decouple those actions so one orchestration could call another(and we can call them separately)

Let's consider following example:

/
[FunctionName("ProvisionNewDevices")]
public static async Task ProvisionNewDevices(
    [OrchestrationTrigger] DurableOrchestrationContext ctx)
{
    string[] deviceIds = await ctx.CallActivityAsync<string[]>("GetNewDeviceIds");

    // Run multiple device provisioning flows in parallel
    var provisioningTasks = new List<Task>();
    foreach (string deviceId in deviceIds)
    {
        Task provisionTask = ctx.CallSubOrchestratorAsync("DeviceProvisioningOrchestration", deviceId);
        provisioningTasks.Add(provisionTask);
    }

    await Task.WhenAll(provisioningTasks);

    // ...
}

Here you can see one of the biggest advantages of such approach - you can run multiple flows simultaneously and just await until each is finished. With several activities it's still doable, however I'd rather consider it an antipattern.

Making a working solution

My current orchestration looks like this:

/
[FunctionName("Galaxy_Create_Start")]
public static async Task<HttpResponseMessage> StartOrchestration(
	[HttpTrigger(AuthorizationLevel.Function, "post", Route = "orchestration/start")] HttpRequestMessage req,
	[OrchestrationClient] DurableOrchestrationClient starter,
	TraceWriter log)
{
	// Function input comes from the request content.
	var instanceId = await starter.StartNewAsync("Galaxy_Create", null);

	var payload = await req.Content.ReadAsStringAsync();
	log.Info($"Started orchestration with ID = '{instanceId}'.");
	log.Info($"The payload is: {payload}");

	return starter.CreateCheckStatusResponse(req, instanceId);
}

[FunctionName("Galaxy_Create")]
public static async Task<string> RunImpl([OrchestrationTrigger] DurableOrchestrationContext context)
{
	var result = await Task.WhenAll(context.CallActivityAsync<string>("Utility_Coords"),
		context.CallActivityAsync<string>("Utility_Galaxy_Name"));
	var galaxyContext = new CreateGalaxyContext(result[1], result[0]);

	await context.CallActivityAsync("Galaxy_Create_Impl", galaxyContext);
	await context.CallSubOrchestratorAsync("Planet_Create", galaxyContext);

	return "Galaxy created!";
}

[FunctionName("Galaxy_Create_Impl")]
public static async Task CreateGalaxy(
	[ActivityTrigger] CreateGalaxyContext context,
	[Table("galaxies")] IAsyncCollector<GalaxyDataEntity> galaxies)
{
		await galaxies.AddAsync(new GalaxyDataEntity(context.Name, context.Coords));                
}

As you can see, there's a special call, which schedules another orchestration within this one:

/
await context.CallSubOrchestratorAsync("Planet_Create", galaxyContext);

Let's look at this new orchestration:

/
[FunctionName("Planet_Create")]
public static async Task<string> RunImpl([OrchestrationTrigger] DurableOrchestrationContext context)
{
	var activities = new List<Task>();
	var number = await context.CallActivityAsync<int>("Utility_Number");
	var galaxyContext = JsonConvert.DeserializeObject<JArray>(context.GetInputAsJson().ToString()).First;

	for (var i = 0; i < number; i++)
	{
		var result = await Task.WhenAll(context.CallActivityAsync<string>("Utility_Coords"),
			context.CallActivityAsync<string>("Utility_Planet_Name"),
			context.CallActivityAsync<string>("Utility_Planet_Type"));

		var planetContext = new CreatePlanetContext(galaxyContext.ToObject<CreateGalaxyContext>(), result[0],
			result[1], (PlanetType) Enum.Parse(typeof(PlanetType), result[2]));
		activities.Add(context.CallActivityAsync<int>("Planet_Create_Impl", planetContext));
	}

	await Task.WhenAll(activities);

	return "Planet created!";
}

[FunctionName("Planet_Create_Impl")]
public static async Task CreatePlanet(
	[ActivityTrigger] CreatePlanetContext context,
	[Table("planet")] IAsyncCollector<PlanetDataEntity> planets)
{
	await planets.AddAsync(new PlanetDataEntity(context.GalaxyContext.Name, context.Name, context.Coords, context.Type));
}

The great thing is that we can pass a full context to the sub-orchestration, so it can use data, which was obtained by the previous activities.

Summary

Sub-orchestrations are a great addition to the Durable Functions SDK, especially that they're such a simple concept. I strongly encourage you to try it all by yourself, so you can feel how powerful the concept is.

Reactive Durable Functions

Durable Functions itself are a big topic and I'll come back to the them soon. In the previous post I created a simple Function App, which inserts a row into Table Storage. All activities were orchestrated and it was really easy to schedule more work. Today I want to present you how easy you can transform such active architecture into a passive one using Event Grid. This will be a fairly easy episode so let's start!

Function App

There's no need to change anything in the Function App since for now we'll use a HttpTrigger. Unfortunately we cannot host it locally(because of Event Grid) so there's a need to publish it to the cloud.

Event Grid

You can easily deploy Event Grid from the marketplace. For now there's nothing special regarding its installation, so I won't go into details.

Event Grid chosen from Marketplace - still in preview though

Combining it all together

Once we have components deployed we can configure Event Grid so it'll pass events to the chosen endpoint. In our case it'll be our Galaxy_Create_Start function(which we used to start orchestration). We'll need one thing - our function URL. To get it go to the Function App you deployed, find a function and click on Get Function URL.

Once you have it, we can go to Event Grid and create a new subscriber.

What we need now is a new subscription. This feature allow you to orchestrate events flow, so each subscriber can be subscribed to a particular event type. With this configured we can centralize the way, how e.g. multiple services built with Azure Functions integrate with event producers.

My subscriber configured and subscribed to nebula.galaxy event type

When you configure a new subscriber it will be added to the list of all supported subscribers. We've done all what we needed to integrate Event Grid with Azure Functions, let's test it now.

Working example

There're two important things what we need to test our solution - Event Grid endpoint and access key. The former ss available on the main screen - please copy it so you'll know where to post your messages. Access keys can be found under Settings section in Event Grid main menu.

Since we don't have any real producer yet, we'll try to simulate one. For this purposed I used Postman, however all is up to you. The only thing we have to do is to post a HTTP request to the mentioned enpoint. Here you have an example:

/
POST /api/events HTTP/1.1
Host: your.eventgrid.azure.net
aeg-sas-key: YOUR_KEY
Content-Type: application/json
Cache-Control: no-cache

[
    {
        "id": "2",
        "eventType": "nebula.galaxy",
        "subject": "nebula/galaxy/create",
        "eventTime": "2017-11-08T13:25:00+01:00",
        "data":{
        }
    }
]

As you can see the payload has a specific schema, which will be validated on the Event Grid side. In fact it should self-explanatory. What is important here is the fact, that this payload is being passed to the function. If you change the main function a little bit:

/
[FunctionName("Galaxy_Create_Start")]
public static async Task<HttpResponseMessage> StartOrchestration(
	[HttpTrigger(AuthorizationLevel.Function, "post", Route = "orchestration/start")] HttpRequestMessage req,
	[OrchestrationClient] DurableOrchestrationClient starter,
	TraceWriter log)
{
	// Function input comes from the request content.
	string instanceId = await starter.StartNewAsync("Galaxy_Create", null);

	var payload = await req.Content.ReadAsStringAsync();
	log.Info($"Started orchestration with ID = '{instanceId}'.");
	log.Info($"The payload is: {payload}");

	return starter.CreateCheckStatusResponse(req, instanceId);
}

You'll see following result when you go to the function and check the console:

/
2017-11-08T12:36:54.565 Function started (Id=ecb2655e-912e-435b-b916-f21b65729716)
2017-11-08T12:36:55.144 Started orchestration with ID = 'f7d2ad0001204ff0a381d61b448ef8b7'.
2017-11-08T12:36:55.144 The payload is: [{
  "id": "3",
  "eventType": "nebula.galaxy",
  "subject": "nebula/galaxy/create",
  "eventTime": "2017-11-08T12:25:00+00:00",
  "data": {},
  "topic": "/SUBSCRIPTIONS/____________/RESOURCEGROUPS/NEBULA-EUW-DEV-RG2/PROVIDERS/MICROSOFT.EVENTGRID/TOPICS/NEBULA-EUW-DEV-EVENTGRID"
}]
2017-11-08T12:36:55.173 Function completed (Success, Id=ecb2655e-912e-435b-b916-f21b65729716, Duration=596ms)

Of course you can easily deserialize it and incorporate into your flow.

Summary

As you can see implementing Event Grid as a gateway to the underlying architecture is a piece of cake. In fact we didn't need any change in our code - the whole integration perfomed seamlessly. In the next episode I'll try to present you how to integrate a producer(Event Hub), so we don't have to post messages directly to our Event Grid.