0

Microsoft Orleans #05: Eventos y Streams

En esta clase aprenderemos cómo utilizar Orleans Streams para transmitir eventos en tiempo real entre Granos, permitiendo que múltiples Granos se suscriban a un mismo flujo de datos.

Objetivos de la Clase

  • Comprender el concepto de Streams en Orleans.
  • Diferenciar entre comunicación directa y comunicación basada en Streams.
  • Implementar un Grano Publicador que envía eventos a un Stream.
  • Implementar un Grano Suscriptor que recibe y procesa eventos.
  • Probar la transmisión de eventos en tiempo real.

¿Qué son los Orleans Streams?

Los Orleans Streams permiten la transmisión de eventos en tiempo real entre Granos sin necesidad de que estos se conozcan directamente.

Diferencias entre Comunicación Directa y Streams

Método de ComunicaciónDescripciónDesventajas
Comunicación Directa (GetGrain<T>())Un Grano llama a otro para ejecutar un método.Acoplamiento alto, el emisor debe conocer al receptor.
Streams (IAsyncStream<T>)Un Grano Publicador envía eventos a un Stream, y múltiples Granos Suscriptores pueden recibirlos.Mayor latencia comparada con llamadas directas.

Ejemplo de flujo de Orleans Streams

  1. Un Grano Sensor envía datos de temperatura a un Stream.
  2. Uno o más Granos Monitores se suscriben para recibir estos eventos.
  3. Cada vez que el Sensor envía un evento, todos los Monitores lo reciben simultáneamente.

Implementación de Streams en Orleans

Instalar Dependencias

Ejecuta en la terminal del proyecto OrleansDemo:

dotnet add package Microsoft.Orleans.Streaming.Memory

Esto instala Streams en memoria, ideales para pruebas.

Configurar el Silo para soportar Streams

Edita Program.cs en OrleansDemo y habilita los Streams:

using System;
using System.Threading.Tasks;
using Microsoft.Extensions.Hosting;
using Orleans;
using Orleans.Hosting;

class Program
{
    static async Task Main(string[] args)
    {
        var host = Host.CreateDefaultBuilder()
            .UseOrleans(builder =>
            {
                builder.UseLocalhostClustering();
                builder.AddMemoryStreams("event-stream");
            })
            .Build();

        await host.RunAsync();
    }
}

Implementar un Grano Publicador

El Grano Sensor enviará eventos de temperatura a un Stream.

Definir la interfaz del Publicador

Crea ISensorGrain.cs en Granos:

using System.Threading.Tasks;
using Orleans;

public interface ISensorGrain : IGrainWithGuidKey
{
    Task IniciarMediciones();
}

Implementar el Grano Publicador

Crea SensorGrain.cs en Granos:

using System;
using System.Threading.Tasks;
using Orleans;
using Orleans.Streams;

public class SensorGrain : Grain, ISensorGrain
{
    private IAsyncStream<int> _stream;

    public override async Task OnActivateAsync()
    {
        var streamProvider = GetStreamProvider("event-stream");
        _stream = streamProvider.GetStream<int>(this.GetPrimaryKey(), "temperature");
    }

    public async Task IniciarMediciones()
    {
        var random = new Random();

        while (true)
        {
            int temperatura = random.Next(-10, 40);
            Console.WriteLine($"[Sensor] Enviando temperatura: {temperatura}°C");
            await _stream.OnNextAsync(temperatura);
            await Task.Delay(5000);
        }
    }
}

Explicación del código

  • GetStream<int>() obtiene un Stream llamado "temperature".
  • OnNextAsync(temperatura) envía eventos de temperatura.
  • Un bucle genera valores cada 5 segundos.

Implementar un Grano Suscriptor

El Grano Monitor recibirá y procesará eventos de temperatura.

Definir la interfaz del Suscriptor

Crea IMonitorGrain.cs en Granos:

using System.Threading.Tasks;
using Orleans;

public interface IMonitorGrain : IGrainWithGuidKey
{
    Task Suscribirse();
}

Implementar el Grano Suscriptor

Crea MonitorGrain.cs en Granos:

using System;
using System.Threading.Tasks;
using Orleans;
using Orleans.Streams;

public class MonitorGrain : Grain, IMonitorGrain
{
    public override async Task OnActivateAsync()
    {
        var streamProvider = GetStreamProvider("event-stream");
        var stream = streamProvider.GetStream<int>(this.GetPrimaryKey(), "temperature");

        await stream.SubscribeAsync((temperatura, token) =>
        {
            Console.WriteLine($"[Monitor] Recibida temperatura: {temperatura}°C");
            return Task.CompletedTask;
        });
    }

    public Task Suscribirse() => Task.CompletedTask;
}

Explicación del código

  • SubscribeAsync() escucha eventos de temperatura.
  • Cuando recibe un evento, imprime la temperatura en consola.

Configurar el Cliente para Activar los Streams

Edita Program.cs en OrleansClient para iniciar el Sensor y el Monitor:

using System;
using System.Threading.Tasks;
using Orleans;
using Orleans.Hosting;

class Program
{
    static async Task Main(string[] args)
    {
        var client = new ClientBuilder()
            .UseLocalhostClustering()
            .Build();

        await client.Connect();
        Console.WriteLine("Cliente conectado a Orleans.");

        var sensorGrain = client.GetGrain<ISensorGrain>(Guid.NewGuid());
        var monitorGrain = client.GetGrain<IMonitorGrain>(Guid.NewGuid());

        await monitorGrain.Suscribirse();
        Console.WriteLine("Monitor suscrito a eventos de temperatura.");

        await sensorGrain.IniciarMediciones();
    }
}

Explicación del código

  • Crea un Sensor y un Monitor.
  • El Monitor se suscribe antes de que el Sensor empiece a enviar datos.

Ejecutar el Proyecto

Paso 1: Iniciar el Silo

Ejecuta OrleansDemo.

Paso 2: Ejecutar el Cliente

Ejecuta OrleansClient.
Deberías ver algo como:

Cliente conectado a Orleans.
Monitor suscrito a eventos de temperatura.
[Sensor] Enviando temperatura: 15°C
[Monitor] Recibida temperatura: 15°C
[Sensor] Enviando temperatura: 22°C
[Monitor] Recibida temperatura: 22°C
[Sensor] Enviando temperatura: -5°C
[Monitor] Recibida temperatura: -5°C

Cuestionario de Autoevaluación

  1. ¿Qué son los Orleans Streams y para qué se usan?
  2. ¿Cuál es la diferencia entre GetGrain<T>() y GetStream<T>()?
  3. ¿Qué método permite suscribirse a un Stream?
  4. ¿Cómo se configura el Silo para habilitar Streams en memoria?
  5. ¿Qué sucede si múltiples Granos se suscriben al mismo Stream?

Resumen de la Clase

  • Orleans Streams permiten la transmisión de eventos en tiempo real.
  • Un Grano Publicador (SensorGrain) envía eventos a un Stream.
  • Un Grano Suscriptor (MonitorGrain) recibe eventos automáticamente.
  • Los Streams desacoplan los Granos, permitiendo mayor escalabilidad.
  • Probamos la transmisión de eventos en Orleans y confirmamos su funcionamiento.

Próxima Clase: Integración de Orleans con Bases de Datos

En la siguiente clase, aprenderemos cómo guardar datos de Granos en bases de datos reales como SQL Server y MongoDB.

Fernando Sonego

Deja una respuesta

Tu dirección de correo electrónico no será publicada. Los campos obligatorios están marcados con *