En esta clase aprenderemos cómo mejorar el rendimiento y escalabilidad de Orleans Streams, incluyendo filtrado de eventos, procesamiento paralelo y múltiples proveedores de Streams.
Objetivos de la Clase
- Comprender cómo escalar y optimizar Streams en Orleans.
- Implementar filtros de eventos en Streams Orleans.
- Configurar múltiples proveedores de Streams para diferentes tipos de datos.
- Probar la transmisión optimizada de eventos en Orleans.
¿Cómo Escalar Orleans Streams?
Orleans Streams es una solución de mensajería distribuida que permite a los Granos comunicarse sin necesidad de referencias directas.
Para escalar Streams en entornos de alta carga, podemos:
- Usar filtros para evitar el envío innecesario de eventos.
- Procesar eventos en paralelo para aumentar el rendimiento.
- Configurar múltiples proveedores de Streams según el tipo de datos.
Implementar Filtrado de Eventos en Streams
Por defecto, Orleans envía todos los eventos a todos los suscriptores, lo que puede ser ineficiente.
Podemos filtrar eventos para que solo ciertos Granos reciban los eventos relevantes.
1. Crear una Interfaz de Stream con Filtros
Crea IFiltradoStreamGrain.cs
en Granos:
using System.Threading.Tasks;
using Orleans;
public interface IFiltradoStreamGrain : IGrainWithGuidKey
{
Task PublicarEvento(int valor);
}
2. Implementar el Publicador con Filtrado
Crea FiltradoStreamGrain.cs
en Granos:
using System;
using System.Threading.Tasks;
using Orleans;
using Orleans.Streams;
public class FiltradoStreamGrain : Grain, IFiltradoStreamGrain
{
private IAsyncStream<int> _stream;
public override Task OnActivateAsync()
{
var streamProvider = GetStreamProvider("event-stream");
_stream = streamProvider.GetStream<int>(this.GetPrimaryKey(), "filtro");
return Task.CompletedTask;
}
public async Task PublicarEvento(int valor)
{
await _stream.OnNextAsync(valor);
}
}
3. Implementar un Suscriptor con Filtro
Crea FiltradoSubscriberGrain.cs
en Granos:
using System;
using System.Threading.Tasks;
using Orleans;
using Orleans.Streams;
public class FiltradoSubscriberGrain : Grain, IFiltradoStreamGrain
{
public override async Task OnActivateAsync()
{
var streamProvider = GetStreamProvider("event-stream");
var stream = streamProvider.GetStream<int>(this.GetPrimaryKey(), "filtro");
await stream.SubscribeAsync((valor, token) =>
{
if (valor > 10) // Filtra eventos menores o iguales a 10
{
Console.WriteLine($"[Suscriptor] Evento recibido: {valor}");
}
return Task.CompletedTask;
});
}
}
Explicación del Código
- El Publicador envía todos los eventos.
- El Suscriptor filtra eventos, procesando solo los valores mayores a 10.
Configurar Múltiples Proveedores de Streams
Si una aplicación maneja diferentes tipos de eventos, es recomendable usar múltiples proveedores de Streams.
Edita Program.cs
en OrleansDemo:
builder.UseOrleans(siloBuilder =>
{
siloBuilder.UseLocalhostClustering();
siloBuilder.AddMemoryStreams("event-stream");
siloBuilder.AddMemoryStreams("notificaciones-stream");
});
1. Crear un Publicador con Diferentes Streams
using System;
using System.Threading.Tasks;
using Orleans;
using Orleans.Streams;
public class MultiStreamGrain : Grain
{
private IAsyncStream<string> _streamNotificaciones;
private IAsyncStream<int> _streamDatos;
public override Task OnActivateAsync()
{
var streamProvider = GetStreamProvider("event-stream");
_streamDatos = streamProvider.GetStream<int>(this.GetPrimaryKey(), "datos");
var notificacionesProvider = GetStreamProvider("notificaciones-stream");
_streamNotificaciones = notificacionesProvider.GetStream<string>(this.GetPrimaryKey(), "notificaciones");
return Task.CompletedTask;
}
public async Task PublicarNotificacion(string mensaje)
{
await _streamNotificaciones.OnNextAsync(mensaje);
}
public async Task PublicarDato(int valor)
{
await _streamDatos.OnNextAsync(valor);
}
}
Probar Orleans Streams Optimizado
1. Ejecutar el Silo Orleans
dotnet run
2. Probar con un Cliente Orleans
Edita Program.cs
en OrleansClient y agrega:
var cliente = new ClientBuilder()
.UseLocalhostClustering()
.Build();
await cliente.Connect();
var streamGrain = cliente.GetGrain<IFiltradoStreamGrain>(Guid.NewGuid());
await streamGrain.PublicarEvento(5);
await streamGrain.PublicarEvento(15); // Solo este será procesado por el suscriptor
Salida esperada
[Suscriptor] Evento recibido: 15
Si todo está configurado correctamente, los Streams Orleans procesarán eventos de forma optimizada.
Cuestionario de Autoevaluación
- ¿Qué es un Stream en Orleans y cómo mejora la comunicación entre Granos?
- ¿Cómo se pueden filtrar eventos en Streams Orleans?
- ¿Por qué usar múltiples proveedores de Streams en una aplicación?
- ¿Qué sucede si un Stream no tiene suscriptores al momento de publicar un evento?
- ¿Cómo Orleans maneja la concurrencia en Streams cuando hay múltiples suscriptores?
Resumen de la Clase
- Orleans Streams permite procesar eventos en tiempo real entre Granos.
- El filtrado de eventos mejora la eficiencia al evitar el procesamiento innecesario.
- Múltiples proveedores de Streams permiten gestionar diferentes tipos de eventos de forma independiente.
- Probamos un sistema donde solo los eventos relevantes se procesan, optimizando el rendimiento.
Próxima Clase: Integración con ASP.NET Core en Producción
En la siguiente clase, aprenderemos cómo integrar Orleans con ASP.NET Core en un entorno de producción, optimizando rendimiento y seguridad.