Azure Functions, WebJobs and Data Lake - writing a custom extension

I've been struggling to find some time to write something interesting regarding Azure Functions recently and... finally! In the upcoming blog posts I'll present you the way to easily create a custom extension, which can be used to automatically bind a function parameter to a Data Lake. No more boilerplate code, no need to handle the whole process on your own - a clean and easy way to extend your functions with even more syntactic sugar. Can't way to actually present some ideas so let's dive into the solution.

Extension model

As you may know, Azure Functions are actually built atop of WebJobs SDK. In this SDK the concept of bindings to different Azure components was initially introduced - after that, people responsible for Functions extended it and added even more triggers and other elements, so you can easily integrate with e.g. Table Storage using only attributes.

This whole extension model allows you to write a custom binding(either as a trigger or an output) and use it in your code, so part of the work can be done automatically. Note, that currently there's no easy way to run your custom extension within a Function - nonetheless we'll try to bypass those theoretical limits and prepare a solution, which you'll be able to use in a real scenario.

To make the long story short - to create an extension you need following things:

  • actual attribute for binding a parameter
  • custom binding provider deriving from IBindingProvider
  • config provider derived from IExtensionConfigProvider

This is only a high-level picture of what we're about to build, but it should give you an idea what will be the shape of the extension we'll make. Let's try to write some code.

What do I need?

Preferably VS2017 with a class and console project. We'll be using WebJobs SDK so references to Microsoft.Azure.WebJobs and Microsoft.Azure.WebJobs.Extensions will be needed.

Attribute

This is the easiest part of our project. All you need is to create a following class deriving from a base Attribute:

/
using System;

namespace WebJobs.DataLake
{
    [AttributeUsage(AttributeTargets.Parameter)]
    public sealed class DataLakeAttribute : Attribute
    {
        public DataLakeAttribute(string clientId, string clientSecret)
        {
            ClientId = clientId;
            ClientSecret = clientSecret;
        }

        public string ClientId { get; private set; }
        public string ClientSecret { get; private set; }
    }
}

This attribute will be needed and used when determining parameters needed to connect to a Data Lake. In fact we're done here - there's nothing what is needed more to make it work.

Binding provider

Binding provider is a bit more tricky, it requires much more work also to finish it. In fact it requires more than only one component:

  • IBindingProvider which encapsulates the actual logic
  • IBinding which tells how the actual binding happens
  • IValueProvider which is used to link a binding to a parameter instance

I think a bit of clarification is needed here. Let's consider following example:

/
public static void CustomBinding([TimerTrigger("*/15 * * * * *")] TimerInfo timerInfo, [DataLake("clientId", "clientSecret")] DataLakeProvider dataLake)
{
}

This function is triggered using a TimerTrigger with some interval between each call. It also binds a DataLakeProvider parameter in addition to a passed connection info. To bind information from DataLake attribute to DataLakeProvider you have to go through the whole flow IBindingProvider -> IBinding -> IValueProvider. I won't get into details how to implement each component, let's assume that currently it looks like this:

/
internal class DataLakeAttributeBindingProvider : IBindingProvider
{
	public Task<IBinding> TryCreateAsync(BindingProviderContext context)
	{
		if (context == null)
		{
			throw new ArgumentNullException(nameof(context));
		}

		var parameter = context.Parameter;
		var attribute = parameter.GetCustomAttribute<DataLakeAttribute>(inherit: false);
		if (attribute == null)
		{
			return Task.FromResult<IBinding>(null);
		}

		if (!ValueBinder.MatchParameterType(context.Parameter, new[] { typeof(DataLakeProvider)}))
		{
			throw new InvalidOperationException(string.Format(CultureInfo.CurrentCulture,
				"Can't bind DataLakeAttribute to type '{0}'.", parameter.ParameterType));
		}

		return Task.FromResult<IBinding>(new DataLakeBinding(parameter));
	}

	private class DataLakeBinding : IBinding
	{
		private readonly ParameterInfo _parameter;

		public DataLakeBinding(ParameterInfo parameter)
		{
			_parameter = parameter;
		}

		public Task<IValueProvider> BindAsync(object value, ValueBindingContext context)
		{
			throw new NotImplementedException();
		}

		public Task<IValueProvider> BindAsync(BindingContext context)
		{
			if (context == null)
			{
				throw new ArgumentNullException(nameof(context));
			}

			var attribute = _parameter.GetCustomAttribute<DataLakeAttribute>(inherit: false);
			var valueProviderType = typeof(DataLakeValueProvider);
			var valueProvider = (IValueProvider)Activator.CreateInstance(
				valueProviderType, _parameter, attribute);

			return Task.FromResult(valueProvider);
		}

		public ParameterDescriptor ToParameterDescriptor()
		{
			return new ParameterDescriptor
			{
				Name = _parameter.Name
			};
		}

		public bool FromAttribute => true;
	}

	private class DataLakeValueProvider : IValueProvider
	{
		private readonly ParameterInfo _parameter;
		private readonly DataLakeAttribute _resolvedAttribute;

		public DataLakeValueProvider(ParameterInfo parameter,
			DataLakeAttribute resolvedAttribute)
		{
			_parameter = parameter;
			_resolvedAttribute = resolvedAttribute;
		}

		public Task<object> GetValueAsync()
		{
			var value = new DataLakeProvider();

			return Task.FromResult<object>(value);
		}

		public string ToInvokeString()
		{
			return string.Empty;
		}

		public Type Type => _parameter.ParameterType;
	}
}

It's not fully implemented, yet it gives some basic functionality and allows some initial testing.

Running a solution

To run a solution you need a console application. Make sure you've added Microsoft.Azure.WebJobs and Microsoft.Azure.WebJobs.Extensions and paste the following code:

/
internal class Program
{
	private static void Main()
	{
		var config = new JobHostConfiguration();
		config.UseTimers();
		config.UseDataLake();
		config.TypeLocator = new TypeLocator(typeof(Function));

		var host = new JobHost(config);

		host.Call(typeof(Function).GetMethod("CustomBinding"),
			new Dictionary<string, object>
			{
				{"timerInfo", new TimerInfo(new CronSchedule("*/15 * * * * *"), new ScheduleStatus())}
			});

		host.RunAndBlock();
	}
}

Some of those types will be unavailable to you. Firstly you have to implement a dummy TypeLocator:

/
internal class TypeLocator : ITypeLocator
{
	private Type[] _types;

	public TypeLocator(params Type[] types)
	{
		_types = types;
	}

	public IReadOnlyList<Type> GetTypes()
	{
		return _types;
	}
}

It's used to choose which function will be indexed by a JobHost instance so it's required to add your function to it. The next thing is a UseDataLake() extension method, which tells the host(or actually enables it) to actually perform binding:

/
public static class DataLakeHostConfigurationExtension
{
	public static void UseDataLake(this JobHostConfiguration config)
	{
		if (config == null)
		{
			throw new ArgumentNullException(nameof(config));
		}

		// Register our extension configuration provider
		config.RegisterExtensionConfigProvider(new DataLakeExtensionConfig());
	}

	private class DataLakeExtensionConfig : IExtensionConfigProvider
	{
		public void Initialize(ExtensionConfigContext context)
		{
			if (context == null)
			{
				throw new ArgumentNullException(nameof(context));
			}

			// Register our extension binding providers
			context.Config.RegisterBindingExtensions(new DataLakeAttributeBindingProvider());
		}
	}
}

Note that we're registering an extension for an attribute binding only. If we'd like to enable triggering a function based on a custom trigger, we'd have to add another binder. The last thing is the actual function we'd like to trigger:

/
public static class Function
{
	public static void CustomBinding([TimerTrigger("*/15 * * * * *")] TimerInfo timerInfo, [DataLake("clientId", "clientSecret")] DataLakeProvider dataLake)
	{
	} 
}

DataLakeProvider for this moment can be just an empty class. The important thing is that when you hit F5 you should be able to access a function triggered:

Summary

As you can see writing an exception for WebJobs is pretty easy and gives you almost unlimited possibilities when it comes to adding custom functionalities to your solution. In the next post I'll show you how to properly implement Data Lake connection, extend our API so we can do something in the function and how to be able to actually use it in a function. Stay tuned!

Achieving consistency in Azure Table Storage #2

In the previous post I presented some easy ways of achieving consistency in Table Storage by storing data within one partition. This works in the majority of scenarios, there're some cases however, when you have to divide records(because of the design, to preserve scalability or to logically separate different concerns) and still ensure, that you can perform operations within a transaction. You may be a bit concerned - all in all we just talked, that storing data within a single(at least from the transaction point of view) partition is required to actually be able to perform EGTs. Well - as always there's a solution to go beyond some limits and achieve what we're aiming for. Let's go!

Eventually consistent transactions

Just a note before we start - this pattern won't guarantee, that a transaction is isolated. This means that a client will be able to read data while a transaction is being processed. Unfortunately there's no easy way to completely lock tables while an inter-partition operation is being performed.

Let's back to our eventual consistency. What does it mean? The answer is pretty simple - once a transaction is finished, our data can be considered consistent. All right - but this is something new. What's the difference between transaction performed as EGT? 

In EGT your are performing maximally 100 operations without a possibility to see an ongoing process. In other words - you always see the result of a transaction. With eventual consistency you can divide the process into steps:

  • get an entity from Partition 1
  • inserty an entity into Partition 2
  • delete an entity from Partition 1

Of course you can have more than only 3 steps. The crux here is the clear division between each step. If we consider other operations performed during a transaction:

  • get an entity from Partition 1
  • get an entity from Partition 2
  • inserty an entity into Partition 2
  • get an entity from Partition 1
  • delete an entity from Partition 1

The whole view should be clearer. With eventual consistency those bolded steps stand for operations, which clearly are victims of read phenomenas. Always consider possible drawbacks of solutions like this and if needed, use other database which isolates transactions.

Achieving eventual consistency

To achieve eventual consistency we have to introduce a few other components to our architecture. We'll need at least two things:

  • queue which holds actions, which should be performed in a transaction
  • worker roles, which reads messages from a queue and perform the actual transactions

Now let's talk about each new component in details.

Queue

By using a queue we're able to easily orchestrate operations, which should be performed by worker roles. The easiest example is creating a project, which will archive records stored in Table Storage. Thanks to a queue we can post a message saying 'Archive a record', which can be read by other components and processed. Finally workers can post their messages saying, that an action has been finished. 

Worker role

When we're saying about workers we think about simple services, which perform some part of a flow. In eventual consistency pattern they're responsible for handling a transaction logic. If we come back to the example from the previous point, a worker would be responsible for moving an entity from the one table to another and then deleting it. The important note here is idempotence - you have to ensure, that you won't add more than one instance of an entity in the case of restarting the flow. The same goes when deleting things - you should delete only if an entity exists.

Considerations

You can apply this pattern not only to perform operations between different partitions - it also works when you're working with other components like blobs. It has some obvious drawbacks like lack of isolation or external segment, which have to be handled in your code. On the other hand it's a perfectly valid approach, especially in table-other_storage scenario.