Tips&Tricks - When my data was replicated in RA-GRS?

Today is Friday so it's time for something quick and easy. When working with Azure Storage, you might wonder from time to time when your data was replicated recently. This gives some insight into how Azure Storage internally works and what are the drawbacks of this component. Let's find last synchronization date and consider for a moment what it really means for us.

Synchronization date and time

Before we start - to be actually able to find the synchronization timestamp the actual replication has to happen. This means, that for this feature LRS mode of a storage account, it just won't work. You may ask why - the answer is fairly simple. LRS replicates data only within a datacenter and, what is even more important, it does it synchronously. There's no replication between many regions = there's no such thing like synchronization because it's either data is saved and replicated or the whole operation fails.

Presumably synchronization timestamp should be available in other three models of data replication - ZRS, GRS and RA-GRS - but surprisingly... it's not. This feature works only for RA-GRS accounts because of one simple thing - this is the only mode, which allows you to read data from a secondary location. Of course it has some limits(like you cannot declare failover to another region), but finally you'll be able to read replicated data. 

You can easily read last synchronization date by going to Azure Portal and accessing e.g. Tables:

Initial status of Table Storage with RA-GRS mode

 

Is my data replicated?

This is a serious question - if geo-replication happens asynchronously, will my data be copied to the secondary location without corruption? Well, it depends - there's an obvious gap between the primary and the secondary storage and the answer is directly related to the fact when a disaster happened. What is important here is the fact, that Azure Storage out-of-the-box doesn't guarantee, that each and every record or blob will be replicated on time. Of course it doesn't mean, that data from the half of a dat will be lost - we're talking about a few minutes - but still for some systems losing a one record means being totally unreliable.

What can you do improve your guarantees and improve consistency in geo-replication? I strongly advise to read this article regarding possible outages in Azure Storage and consequences. What for sure you can do is to implement your own backup policy and support in-built mechanism of replication by performing synchronous writes to additional storage. Depending on your needs and expectation using a different storage(like CosmosDB, which introduced Table Storage on steroid) could be a viable solution also.

Azure Functions, WebJobs and Data Lake - writing a custom extension

I've been struggling to find some time to write something interesting regarding Azure Functions recently and... finally! In the upcoming blog posts I'll present you the way to easily create a custom extension, which can be used to automatically bind a function parameter to a Data Lake. No more boilerplate code, no need to handle the whole process on your own - a clean and easy way to extend your functions with even more syntactic sugar. Can't way to actually present some ideas so let's dive into the solution.

Extension model

As you may know, Azure Functions are actually built atop of WebJobs SDK. In this SDK the concept of bindings to different Azure components was initially introduced - after that, people responsible for Functions extended it and added even more triggers and other elements, so you can easily integrate with e.g. Table Storage using only attributes.

This whole extension model allows you to write a custom binding(either as a trigger or an output) and use it in your code, so part of the work can be done automatically. Note, that currently there's no easy way to run your custom extension within a Function - nonetheless we'll try to bypass those theoretical limits and prepare a solution, which you'll be able to use in a real scenario.

To make the long story short - to create an extension you need following things:

  • actual attribute for binding a parameter
  • custom binding provider deriving from IBindingProvider
  • config provider derived from IExtensionConfigProvider

This is only a high-level picture of what we're about to build, but it should give you an idea what will be the shape of the extension we'll make. Let's try to write some code.

What do I need?

Preferably VS2017 with a class and console project. We'll be using WebJobs SDK so references to Microsoft.Azure.WebJobs and Microsoft.Azure.WebJobs.Extensions will be needed.

Attribute

This is the easiest part of our project. All you need is to create a following class deriving from a base Attribute:

/
using System;

namespace WebJobs.DataLake
{
    [AttributeUsage(AttributeTargets.Parameter)]
    public sealed class DataLakeAttribute : Attribute
    {
        public DataLakeAttribute(string clientId, string clientSecret)
        {
            ClientId = clientId;
            ClientSecret = clientSecret;
        }

        public string ClientId { get; private set; }
        public string ClientSecret { get; private set; }
    }
}

This attribute will be needed and used when determining parameters needed to connect to a Data Lake. In fact we're done here - there's nothing what is needed more to make it work.

Binding provider

Binding provider is a bit more tricky, it requires much more work also to finish it. In fact it requires more than only one component:

  • IBindingProvider which encapsulates the actual logic
  • IBinding which tells how the actual binding happens
  • IValueProvider which is used to link a binding to a parameter instance

I think a bit of clarification is needed here. Let's consider following example:

/
public static void CustomBinding([TimerTrigger("*/15 * * * * *")] TimerInfo timerInfo, [DataLake("clientId", "clientSecret")] DataLakeProvider dataLake)
{
}

This function is triggered using a TimerTrigger with some interval between each call. It also binds a DataLakeProvider parameter in addition to a passed connection info. To bind information from DataLake attribute to DataLakeProvider you have to go through the whole flow IBindingProvider -> IBinding -> IValueProvider. I won't get into details how to implement each component, let's assume that currently it looks like this:

/
internal class DataLakeAttributeBindingProvider : IBindingProvider
{
	public Task<IBinding> TryCreateAsync(BindingProviderContext context)
	{
		if (context == null)
		{
			throw new ArgumentNullException(nameof(context));
		}

		var parameter = context.Parameter;
		var attribute = parameter.GetCustomAttribute<DataLakeAttribute>(inherit: false);
		if (attribute == null)
		{
			return Task.FromResult<IBinding>(null);
		}

		if (!ValueBinder.MatchParameterType(context.Parameter, new[] { typeof(DataLakeProvider)}))
		{
			throw new InvalidOperationException(string.Format(CultureInfo.CurrentCulture,
				"Can't bind DataLakeAttribute to type '{0}'.", parameter.ParameterType));
		}

		return Task.FromResult<IBinding>(new DataLakeBinding(parameter));
	}

	private class DataLakeBinding : IBinding
	{
		private readonly ParameterInfo _parameter;

		public DataLakeBinding(ParameterInfo parameter)
		{
			_parameter = parameter;
		}

		public Task<IValueProvider> BindAsync(object value, ValueBindingContext context)
		{
			throw new NotImplementedException();
		}

		public Task<IValueProvider> BindAsync(BindingContext context)
		{
			if (context == null)
			{
				throw new ArgumentNullException(nameof(context));
			}

			var attribute = _parameter.GetCustomAttribute<DataLakeAttribute>(inherit: false);
			var valueProviderType = typeof(DataLakeValueProvider);
			var valueProvider = (IValueProvider)Activator.CreateInstance(
				valueProviderType, _parameter, attribute);

			return Task.FromResult(valueProvider);
		}

		public ParameterDescriptor ToParameterDescriptor()
		{
			return new ParameterDescriptor
			{
				Name = _parameter.Name
			};
		}

		public bool FromAttribute => true;
	}

	private class DataLakeValueProvider : IValueProvider
	{
		private readonly ParameterInfo _parameter;
		private readonly DataLakeAttribute _resolvedAttribute;

		public DataLakeValueProvider(ParameterInfo parameter,
			DataLakeAttribute resolvedAttribute)
		{
			_parameter = parameter;
			_resolvedAttribute = resolvedAttribute;
		}

		public Task<object> GetValueAsync()
		{
			var value = new DataLakeProvider();

			return Task.FromResult<object>(value);
		}

		public string ToInvokeString()
		{
			return string.Empty;
		}

		public Type Type => _parameter.ParameterType;
	}
}

It's not fully implemented, yet it gives some basic functionality and allows some initial testing.

Running a solution

To run a solution you need a console application. Make sure you've added Microsoft.Azure.WebJobs and Microsoft.Azure.WebJobs.Extensions and paste the following code:

/
internal class Program
{
	private static void Main()
	{
		var config = new JobHostConfiguration();
		config.UseTimers();
		config.UseDataLake();
		config.TypeLocator = new TypeLocator(typeof(Function));

		var host = new JobHost(config);

		host.Call(typeof(Function).GetMethod("CustomBinding"),
			new Dictionary<string, object>
			{
				{"timerInfo", new TimerInfo(new CronSchedule("*/15 * * * * *"), new ScheduleStatus())}
			});

		host.RunAndBlock();
	}
}

Some of those types will be unavailable to you. Firstly you have to implement a dummy TypeLocator:

/
internal class TypeLocator : ITypeLocator
{
	private Type[] _types;

	public TypeLocator(params Type[] types)
	{
		_types = types;
	}

	public IReadOnlyList<Type> GetTypes()
	{
		return _types;
	}
}

It's used to choose which function will be indexed by a JobHost instance so it's required to add your function to it. The next thing is a UseDataLake() extension method, which tells the host(or actually enables it) to actually perform binding:

/
public static class DataLakeHostConfigurationExtension
{
	public static void UseDataLake(this JobHostConfiguration config)
	{
		if (config == null)
		{
			throw new ArgumentNullException(nameof(config));
		}

		// Register our extension configuration provider
		config.RegisterExtensionConfigProvider(new DataLakeExtensionConfig());
	}

	private class DataLakeExtensionConfig : IExtensionConfigProvider
	{
		public void Initialize(ExtensionConfigContext context)
		{
			if (context == null)
			{
				throw new ArgumentNullException(nameof(context));
			}

			// Register our extension binding providers
			context.Config.RegisterBindingExtensions(new DataLakeAttributeBindingProvider());
		}
	}
}

Note that we're registering an extension for an attribute binding only. If we'd like to enable triggering a function based on a custom trigger, we'd have to add another binder. The last thing is the actual function we'd like to trigger:

/
public static class Function
{
	public static void CustomBinding([TimerTrigger("*/15 * * * * *")] TimerInfo timerInfo, [DataLake("clientId", "clientSecret")] DataLakeProvider dataLake)
	{
	} 
}

DataLakeProvider for this moment can be just an empty class. The important thing is that when you hit F5 you should be able to access a function triggered:

Summary

As you can see writing an exception for WebJobs is pretty easy and gives you almost unlimited possibilities when it comes to adding custom functionalities to your solution. In the next post I'll show you how to properly implement Data Lake connection, extend our API so we can do something in the function and how to be able to actually use it in a function. Stay tuned!