Event Hubs orientado a IoT


Para tener un centro de eventos listo para usar en nuestro sistema de IoT (hacer click aquí) es necesario seguir la siguiente guía.

Crear Event Hub

  • En el panel izquierdo de opciones buscar uno que se llama “Service Bus” (dependiendo del idioma configurado por la cuenta) para mostrar únicamente los espacios de nombre creados.
  • En la parte inferior izquierda podrá ver un botón que dice “Nuevo”, en este seleccionar la opción que dice “Centro de eventos” o “Event Hub
  • Cuando se desplieguen las opciones escribir un nombre del centro de eventos y darle click al botón que dice, “crear centro de eventos
  • Esperamos un momento entre 5 a 10 minutos para que se termine de crear, cuando esté listo en el estado podremos observar que dice Activo, hacer click en el nombre

Configurar Event Hub

  • En el menú superior seleccionar “Centro de eventos” o “Event Hubs” y hacer click en el que se ha creado
  • Ahora se carga otro menú superior y seleccionar el que dice “Configurar”. En la sección de “directivas de acceso compartido” crear dos reglas, la primera llamada “SendRule” con el permiso de “Enviar” y la segunda llamada “ReceiveRule” con el permiso de “Escuchar”. Luego dar click en guardar
  • Hacer click en el botón de ir hacia atrás en la plataforma para ir al dashboard
  • En el panel inferior hacer click en el botón que dice “INFORMACIÓN DE CONEXIÓN” que está en el medio de las tres opciones.
  • Allí desplegará una ventana con las dos reglas que se crearon anteriormente, ahora ya se tiene el Event Hub creado y configurado listo para usar.

Enviar mensajes vía C#

  • Abrir Visual Studio 2015 Preview, crear un proyecto nuevo de tipo aplicación de consola en C# y llamar al proyecto “Sender
  • En el explorador de soluciones hacer click derecho encima del nombre de la solución y buscar la opción “Administrar paquetes NuGet para Solución
  • En el buscador escribir “Microsoft Azure Service Bus”, aceptar términos e instalar.
  • En el explorador de soluciones buscar el archivo llamado Program.cs y hacerle doble click, en este archivo de código escribir
using Microsoft.ServiceBus.Messaging;
  • Al inicio de la clase escribir estos atributos estáticos y reemplazar por la información anteriormente obtenida
static string eventHubName = "{event hub name}";
static string connectionString = "{send connection string}";
  • Ahora agregar el siguiente código después del constructor llamado Main
static async Task SendingRandomMessages()
{
  var eventHubClient = EventHubClient.CreateFromConnectionString(connectionString, eventHubName);
  while (true)
  {
     try
     {
        var message = Guid.NewGuid().ToString();
        Console.WriteLine("{0} > Sending message: {1}", DateTime.Now.ToString(), message);
        await eventHubClient.SendAsync(new EventData(Encoding.UTF8.GetBytes(message)));
        }
      catch (Exception exception)
      {
        Console.ForegroundColor = ConsoleColor.Red;
        Console.WriteLine("{0} > Exception: {1}", DateTime.Now.ToString(), exception.Message);
        Console.ResetColor();
      }

      await Task.Delay(200);
    }
}
  • En el método llamado Main() agregar las siguientes líneas de código
Console.WriteLine("Presione Ctrl-C para parar este proceso");
Console.WriteLine("Presione Enter para iniciar ahora");
Console.ReadLine();
SendingRandomMessages().Wait();
  • Por último, el código del archivo Program.cs debería quedar de la siguiente manera

Tip: Para el texto “{event hub name}” escribir el nombre del centro de eventos en minúscula y sin guinoes, ejemplo, en esta guía el Event Hub se llama eventosIoTExpert escribir eventosiotexpert

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Microsoft.ServiceBus.Messaging;

namespace Sender
{
    class Program
    {
        static string eventHubName = "eventosiotexpert";
        static string connectionString = "Endpoint=sb://eventosiotexpert-ns.servicebus.windows.net/;SharedAccessKeyName=SendRule;SharedAccessKey=kPmsZKzed+4WSPP9jeRidpiKJ8yFB6LlErqBN+nQx88=";

        static void Main(string[] args)
        {
            Console.WriteLine("Press Ctrl-C to stop the sender process");
            Console.WriteLine("Press Enter to start now");
            Console.ReadLine();
            SendingRandomMessages().Wait();
        }

        static async Task SendingRandomMessages()
        {
            var eventHubClient = EventHubClient.CreateFromConnectionString(connectionString, eventHubName);
            while (true)
            {
                try
                {
                    var message = Guid.NewGuid().ToString();
                    Console.WriteLine("{0} > Sending message: {1}", DateTime.Now.ToString(), message);
                    await eventHubClient.SendAsync(new EventData(Encoding.UTF8.GetBytes(message)));
                }
                catch (Exception exception)
                {
                    Console.ForegroundColor = ConsoleColor.Red;
                    Console.WriteLine("{0} > Exception: {1}", DateTime.Now.ToString(), exception.Message);
                    Console.ResetColor();
                }

                await Task.Delay(200);
            }
        }
    }
}

Recibir mensajes vía C#

  • Bien, ahora es necesario crear un programa en C# para poder recibir los eventos enviados anteriormente, para esto es necesario volver a Microsoft Azure y crear un nuevo contenedor de elementos, en el botón “Nuevo” buscar “SERVICIO DE DATOS”, “ALMACENAMIENTO”, “CREACIÓN RÁPIDA” e ingresar un nombre para la URL que deseamos, finalmente hacer click en el botón de “CREAR CUENTA DE ALMACENAMIENTO”.
  • Cuando haya finalizado la creación de la cuenta de almacenamiento, ingresar a esta haciendo click encima del nombre, cuando haya cargado en la parte inferior podrá ver un botón con la opción de “ADMINISTRAR CLAVES DE ACCESO”, hacer click en este
  • Copiar la llave primaria
  • En Visual Studio crear una aplicación de consola con C# y llamarla “Receiver
  • Hacer click derecho en la solución y hacer click en la opción “Administrar paquetes NuGet”.
  • Buscar “Microsoft Azure Service Bus Event Hub – EventProcessorHost”, aceptar términos e instalar.
  • Al inicio de todo el código en el archivo Program.cs escribir el siguiente código:
using Microsoft.ServiceBus.Messaging;
using System.Diagnostics;
using System.Threading.Tasks;
  • Ahora, en el mismo archivo buscar las palabras class Program en una línea anterior escribir el siguiente código:
class SimpleEventProcessor : IEventProcessor
    {
        Stopwatch checkpointStopWatch;

        async Task IEventProcessor.CloseAsync(PartitionContext context, CloseReason reason)
        {
            Console.WriteLine(string.Format("Processor Shuting Down.  Partition '{0}', Reason: '{1}'.", context.Lease.PartitionId, reason.ToString()));
            if (reason == CloseReason.Shutdown)
            {
                await context.CheckpointAsync();
            }
        }

        Task IEventProcessor.OpenAsync(PartitionContext context)
        {
            Console.WriteLine(string.Format("SimpleEventProcessor initialize.  Partition: '{0}', Offset: '{1}'", context.Lease.PartitionId, context.Lease.Offset));
            this.checkpointStopWatch = new Stopwatch();
            this.checkpointStopWatch.Start();
            return Task.FromResult<object>(null);
        }

        async Task IEventProcessor.ProcessEventsAsync(PartitionContext context, IEnumerable<EventData> messages)
        {
            foreach (EventData eventData in messages)
            {
                string data = Encoding.UTF8.GetString(eventData.GetBytes());

                Console.WriteLine(string.Format("Message received.  Partition: '{0}', Data: '{1}'",
                    context.Lease.PartitionId, data));
            }

            //Call checkpoint every 5 minutes, so that worker can resume processing from the 5 minutes back if it restarts.
            if (this.checkpointStopWatch.Elapsed > TimeSpan.FromMinutes(5))
            {
                await context.CheckpointAsync();
                this.checkpointStopWatch.Restart();
            }
        }
    }
  • Ahora en static void Main(string[] args) escribir el siguiente código:
string eventHubConnectionString = "{event hub connection string}";
string eventHubName = "{event hub name}";
string storageAccountName = "{storage account name}";
string storageAccountKey = "{storage account key}";
string storageConnectionString = string.Format("DefaultEndpointsProtocol=https;AccountName={0};AccountKey={1}",
              storageAccountName, storageAccountKey);

string eventProcessorHostName = Guid.NewGuid().ToString();
EventProcessorHost eventProcessorHost = new EventProcessorHost(eventProcessorHostName, eventHubName,
              EventHubConsumerGroup.DefaultGroupName, eventHubConnectionString, storageConnectionString);
eventProcessorHost.RegisterEventProcessorAsync<SimpleEventProcessor>().Wait();

Console.WriteLine("Receiving. Press enter key to stop worker.");
Console.ReadLine();

Por último, así quedaría el código del archivo Program.cs

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Microsoft.ServiceBus.Messaging;
using System.Diagnostics;

namespace Receiver
{
    class SimpleEventProcessor : IEventProcessor
    {
        Stopwatch checkpointStopWatch;

        async Task IEventProcessor.CloseAsync(PartitionContext context, CloseReason reason)
        {
            Console.WriteLine(string.Format("Processor Shuting Down.  Partition '{0}', Reason: '{1}'.",
                context.Lease.PartitionId, reason.ToString()));
            if (reason == CloseReason.Shutdown)
            {
                await context.CheckpointAsync();
            }
        }

        Task IEventProcessor.OpenAsync(PartitionContext context)
        {
            Console.WriteLine(string.Format("SimpleEventProcessor initialize.  Partition: '{0}', Offset: '{1}'",
                context.Lease.PartitionId, context.Lease.Offset));
            this.checkpointStopWatch = new Stopwatch();
            this.checkpointStopWatch.Start();
            return Task.FromResult<object>(null);
        }

        async Task IEventProcessor.ProcessEventsAsync(PartitionContext context, IEnumerable<EventData> messages)
        {
            foreach (EventData eventData in messages)
            {
                string data = Encoding.UTF8.GetString(eventData.GetBytes());

                Console.WriteLine(string.Format("Message received.  Partition: '{0}', Data: '{1}'",
                    context.Lease.PartitionId, data));
            }

            //Call checkpoint every 5 minutes, so that worker can resume processing from the 5 minutes back if it restarts.
            if (this.checkpointStopWatch.Elapsed > TimeSpan.FromMinutes(5))
            {
                await context.CheckpointAsync();
                this.checkpointStopWatch.Restart();
            }
        }
    }
    class Program
    {
        static void Main(string[] args)
        {
            string eventHubConnectionString = "Endpoint=sb://eventosiotexpert-ns.servicebus.windows.net/;SharedAccessKeyName=ReceiveRule;SharedAccessKey=UNEhepEiZE+g4ebH+EA2zEkIuHyiM48IQzZRDBLZwLQ=";
            string eventHubName = "eventosiotexpert";
            string storageAccountName = "storageiotexpert";
            string storageAccountKey = "6pkAiXuZVSEVlV070r7M3MDOc/Hq3ywVnVN82emYRf/kkH2kO5CAgS3TQf4VOXDi4kzZtkLKMKjIRI3nu8zT4Q==";
            string storageConnectionString = string.Format("DefaultEndpointsProtocol=https;AccountName={0};AccountKey={1}",
                        storageAccountName, storageAccountKey);

            string eventProcessorHostName = Guid.NewGuid().ToString();
            EventProcessorHost eventProcessorHost = new EventProcessorHost(eventProcessorHostName,
                eventHubName, EventHubConsumerGroup.DefaultGroupName, eventHubConnectionString, storageConnectionString);
            eventProcessorHost.RegisterEventProcessorAsync<SimpleEventProcessor>().Wait();

            Console.WriteLine("Receiving. Press enter key to stop worker.");
            Console.ReadLine();
        }
    }
}

Basado en: http://azure.microsoft.com/en-us/documentation/articles/service-bus-event-hubs-csharp-ephcs-getstarted/