Mocking IoT Telemetry Data with Azure

Hello World!

This should prove to be an interesting series of posts coming up, as I am working on a new project that is very unique and interesting. The idea is to use incoming data from Arduinos, Raspberry Pis, Gallileos, Edisons and other assortments of IoT type devices connected to oil and gas pipelines to determine if a leak is currently in progress and also predict if a leak is likely to occur in the future based on current and trending conditions.

My part in the project is all back end analytics, and I have very little to do with the actual telemetry and hardware. The telemetry will be posted using Azure Event Hubs, and thus my portion of the project begins with mocking that real time data at a large enough geo dispersed scale that I can develop a system that can handle it, and then switch my configurations to consume from the production event hubs. Since I am no longer a consultant working on projects with trade secrets and everything these days is about the elevation of skills in the community, I have posted everything on github that you can download and peruse at your leisure. Please note that this is in progress and well the github source may not necessarily work when you look. I’ll try to enforce a standard to comment “working – comment” on pushes to the repository. The git repository is located here: https://github.com/drcrook1/OilGas

So 30,000 foot view

of the components of the project are: C# to deal with Azure components, F# to do the real heavy lifting. We are using Azure Event Hubs, because Event Hubs kicks ass. The final component that hosts the worker role is currently Cloud Services, but because WebSite’s Web Job has the capability to scale vertically without a redeployment, I recommend to go that route, however I have implemented a cloud service, just realize, vertical scaling requires a redeployment and is not instantaneous.

So to generate the mock data, there are a few project files that you will want to focus on in the solution. Those projects are: OilGas.DataGeneration, OilGas.DataGenerator, Oil.Gas.DataGenerator.CloudService, OilGas.EventHubProvider and OilGas.TelemetryCore.

Lets talk high level about what each project is for and what it does.

OilGas.TelemetryCore is nothing more than a definition of all telemetry data that may be posted out to event hubs. This is shared across almost all projects in the solution. This is a C# project.

OilGas.DataGenerator.CloudService is the hosting project. This is the project that contains the definition for how many of what size machines will be used in the deployment. This is the Azure specific project that you can right click publish out to Azure.

OilGas.DataGenerator is a C# worker role. It simply wraps my F# implementations of posting data. It is C# purely for the reason of the F# worker role just doesn’t work. I have submitted a bug to the Azure team, and a fix is due to be included in the next release.

OilGas.DataGeneration is poorly named, it should be OilGas.DataGenerator.Core. This is the bread and butter of how posting data works. It is an F# application that leverages the parallel Sequence libraries to take advantage of all cores and make sure I can peg my current box both in horizontal and vertically scaled scenarios.

OilGas.EventHubProvider was my attempt to have a re-useable event hub library. This could probably have just been shoved in with OilGas.DataGeneration, but whatever, its here. It simply provides a simpler interface for dealing with Event Hubs.

So I’m going to focus on what I believe to be the most interesting components of the solution for this article, and let you check out github for the rest.

The Cloud Service, go read the documentation on MSDN, its straightforward, its simple, I’m not going to talk about that; same with the worker role. The core to this is the EventHub Provider and the DataGeneration (or Generator.Core if I get around to renaming it). Lets start with the DataGeneration project, as that will allow me to break the problem down in the way that I think.

DataGeneration

The first part and file of this project is Core.fs. If you are unfamiliar with railway oriented programming, I have an article written on that located here: http://indiedevspot.azurewebsites.net/2015/01/19/two-track-coding-rop-for-dummies-part-1/ personal plug, in Core.fs, you will find a function “pipeWithAdditionals”, if you can figure that out, comment here, contribute, I ran out of time, but something on the backlog to figure out. Core.fs is nothing more than ensuring I have ROP available to my code base.

The real complexity lies in this section of code. The rest should be self explanatory (I hope, leave comments if its not).

    let Run () =
        let s = seq { for i in 1 .. 5 -> CreateTelemetryDevice i } 
                    |> Seq.filter(fun d ->
                                            match d with
                                            | Success s -> true
                                            | Failure f -> false)
                    |> Seq.map(fun d -> d.SuccessValue)
        while(true) do
            Thread.Sleep(4000)
            s |> PSeq.iter(fun d ->
                                Thread.Sleep(1000)
                                d |> GenerateSensorData |> ignore
                                )

Everything before the while(true) statement simply creates new telemetry devices and transforms from wrapped success results into devices. I use ROP, but not the way its intended here. After the while true, I use the PSeq, which is a nuget package called: “FSharp.Collections.ParallelSeq”. This takes a sequence and dynamically determines the number of cores you have available and kicks off that many processes scheduled specifically on those cores. Notice I have sleep, just ditch those sleep statements if you want to generate data faster.

Next steps on this for me is to change device id’s to being generated based on machine name+core number and to maybe rebuild devices every 10 minutes to deal with possibly random vertical scaling that is possible due to the way clouds work. This particular instance is built for a machine instance with 4 cores. (notice that I have 4 devices, and I am uses parallel seq on a seq with 4 items in it, each device. This will work on as small as 1 core, but as large as 4.

So where is the tie in to OilGas.EventHubProvider?

That is located in the actual generation function:

    let createProvider() =
        let config = CreateEHConfiguration
                            <!> succeed @"https://rtoieventhub-ns.servicebus.windows.net"
                            <*> succeed @"Endpoint=sb://rtoieventhub-ns.servicebus.windows.net/;SharedAccessKeyName=master;SharedAccessKey=iAZa4YpxH4IeEatOC1MpImgVLby45NGZq+6TRcqrRAU="
                            <*> succeed @"oilgasmockinput"
        match config with
        | Failure f -> Failure f
        | Success s -> 
            new EventHubProvider(s.ns, s.description, s.connstring) |> succeed

    let generateFlowEvent(provider:EventHubProvider, device:TelemetryDevice) =
        let eventData = new PipeFlowTelemetryEvent(
                            device.id.ToString() + "F" + DateTime.UtcNow.ToString(),
                            device.id, 3.0f, DateTime.UtcNow, device.longitude, device.latitude)
        provider.SendEvent(eventData).Wait()
        device

    let GenerateSensorData (device : TelemetryDevice) =
        let p = createProvider()
        match p with
        | Success s -> generateFlowEvent(s, device) |> succeed
        | Failure f -> Failure f

p = createProvider() makes a call that is into the EventHubProvider to create the provider, and then the provider.SendEvent manages the actual sending of an event asynchronously, which is of course important to keep scalability high and make sure we peg all our cores in the most optimal way.

So lets check out Event Hub Provider’s Create Provider and SendEvent functions (or methods).

        /// <summary>
        /// Instantiate an Event Hub along with a Client.
        /// </summary>
        /// <param name="hubNameSpace">uri namespace</param>
        /// <param name="description">description</param>
        /// <param name="connectionString">connection string from app.config</param>
        public EventHubProvider(string hubNameSpace, string description, string connectionString, bool createClient = true)
        {
            ServiceBusConnectionStringBuilder builder = new ServiceBusConnectionStringBuilder(connectionString);
            builder.TransportType = TransportType.Amqp;
            NamespaceManager manager = NamespaceManager.CreateFromConnectionString(builder.ToString());
            this.EHDescription = manager.CreateEventHubIfNotExists(description);
            this.EHClient = EventHubClient.CreateFromConnectionString(connectionString, EHDescription.Path); 
        }

This is easy, it just simply wraps the standard creation of an EventHub Client per the msdn documentation. Piece of cake!

        /// <summary>
        /// Sends a message.  Note that if this fails, it will throw an exception.  For performance reasons, it is not try/catched here.
        /// </summary>
        /// <param name="eventHubName"></param>
        /// <param name="message"></param>
        /// <returns></returns>
        public Task SendEvent(ITelemetryMessage message)
        {
            var serializedString = JsonConvert.SerializeObject(message);
            EventData data = new EventData(Encoding.UTF8.GetBytes(serializedString))
            {
                PartitionKey = message.ID
            };
            // Set user properties if needed 
            data.Properties.Add("Type", "PipeFlow" + DateTime.UtcNow.ToLongTimeString());
            return this.EHClient.SendAsync(data);
        }

Sending an event is exactly the same deal. Simply wraps an event hub send event.

So where is the magic here? Well, it has to do with the F# PSeq operation, and a few things on the backlog to do. The largest to do item is to take advantage of Event Hub partitions. When you start sending insane quantities of data, you should start taking advantage of the 16 default partitions up to the maximum 32 partitions that you have available to you. When you do it this way, analyzing data from each partition is a bit easier to plan for, as 1 compute box per posting partition to analyze with, maximum 32 box cluster, which is pretty damn sizeable.

Summary

So here you have it, a basic solution for mocking telemetry input from IoT devices to Azure Event Hubs in which you can scale to god knows how high, we can take advantage of all cores, so G series boxes and scale horizontally. The biggest changes to do would be the following items:

  1. Remove thread sleeps.
  2. Number device Ids by machineId + thread Id.
  3. Randomize data based on realistic data trends and errors.
  4. Take advantage of Event Hub Partitions.

4 thoughts on “Mocking IoT Telemetry Data with Azure

  1. Pingback: Pipe Dreams: Fun with Azure and IoT - Jennifer Marsman - Site Home - MSDN Blogs

  2. Pingback: F# Weekly #11, 2015 | Sergey Tihon's Blog

Leave a Reply

Your email address will not be published. Required fields are marked *