In today's data-driven applications, the ability to stream logs and data in real-time is crucial for monitoring, analytics, and responsive user experiences. This guide will walk you through building a robust streaming API using .NET Core and RabbitMQ that can handle high-throughput data streams efficiently.
Why .NET Core and RabbitMQ?
.NET Core provides excellent performance, cross-platform compatibility, and built-in support for asynchronous programming - perfect for streaming scenarios. RabbitMQ offers reliable message queuing with advanced routing capabilities, making it ideal for distributing streaming data to multiple consumers.
Architecture Overview
Our streaming API will consist of:
- Data Producer: Receives logs/data and publishes to RabbitMQ
- RabbitMQ Broker: Handles message routing and queuing
- Streaming API: Consumes from RabbitMQ and streams to clients via WebSockets/Server-Sent Events
- Client Applications: Subscribe to real-time data streams
Prerequisites
- .NET Core 6.0 or later
- RabbitMQ server
- Visual Studio or VS Code
- Basic knowledge of C# and message queuing concepts
Step 1: Setting Up the Project
First, create a new .NET Core Web API project and install the required NuGet packages:
dotnet new webapi -n StreamingAPI
cd StreamingAPI
dotnet add package RabbitMQ.Client
dotnet add package Microsoft.AspNetCore.SignalR
dotnet add package Newtonsoft.Json
Step 2: Configure RabbitMQ Connection
Create a RabbitMQ service for managing connections:
// Services/IRabbitMQService.cs
public interface IRabbitMQService
{
void PublishMessage(string queueName, object message);
void StartConsuming(string queueName, Action<string> onMessageReceived);
void StopConsuming();
}
// Services/RabbitMQService.cs
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using Newtonsoft.Json;
using System.Text;
public class RabbitMQService : IRabbitMQService, IDisposable
{
private readonly IConnection _connection;
private readonly IModel _channel;
private readonly string _hostname;
private readonly string _username;
private readonly string _password;
public RabbitMQService(IConfiguration configuration)
{
_hostname = configuration["RabbitMQ:Hostname"] ?? "localhost";
_username = configuration["RabbitMQ:Username"] ?? "guest";
_password = configuration["RabbitMQ:Password"] ?? "guest";
var factory = new ConnectionFactory()
{
HostName = _hostname,
UserName = _username,
Password = _password
};
_connection = factory.CreateConnection();
_channel = _connection.CreateModel();
}
public void PublishMessage(string queueName, object message)
{
_channel.QueueDeclare(queue: queueName, durable: true, exclusive: false, autoDelete: false);
var json = JsonConvert.SerializeObject(message);
var body = Encoding.UTF8.GetBytes(json);
var properties = _channel.CreateBasicProperties();
properties.Persistent = true;
_channel.BasicPublish(exchange: "", routingKey: queueName, basicProperties: properties, body: body);
}
public void StartConsuming(string queueName, Action<string> onMessageReceived)
{
_channel.QueueDeclare(queue: queueName, durable: true, exclusive: false, autoDelete: false);
var consumer = new EventingBasicConsumer(_channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
onMessageReceived(message);
_channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
};
_channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer);
}
public void StopConsuming()
{
_channel?.Close();
}
public void Dispose()
{
_channel?.Dispose();
_connection?.Dispose();
}
}
Step 3: Create Data Models
Define models for your streaming data:
// Models/LogEntry.cs
public class LogEntry
{
public string Id { get; set; } = Guid.NewGuid().ToString();
public DateTime Timestamp { get; set; } = DateTime.UtcNow;
public string Level { get; set; } = "INFO";
public string Message { get; set; } = string.Empty;
public string Source { get; set; } = string.Empty;
public Dictionary<string, object> Properties { get; set; } = new();
}
// Models/StreamData.cs
public class StreamData
{
public string Type { get; set; } = string.Empty;
public object Data { get; set; } = new();
public DateTime Timestamp { get; set; } = DateTime.UtcNow;
}
Step 4: Implement SignalR Hub for Real-time Streaming
Create a SignalR hub to stream data to connected clients:
// Hubs/StreamingHub.cs
using Microsoft.AspNetCore.SignalR;
public class StreamingHub : Hub
{
public async Task JoinGroup(string groupName)
{
await Groups.AddToGroupAsync(Context.ConnectionId, groupName);
await Clients.Caller.SendAsync("JoinedGroup", groupName);
}
public async Task LeaveGroup(string groupName)
{
await Groups.RemoveFromGroupAsync(Context.ConnectionId, groupName);
await Clients.Caller.SendAsync("LeftGroup", groupName);
}
public override async Task OnDisconnectedAsync(Exception? exception)
{
await base.OnDisconnectedAsync(exception);
}
}
Step 5: Create the Streaming Service
Implement a background service that consumes from RabbitMQ and streams to clients:
// Services/StreamingBackgroundService.cs
using Microsoft.AspNetCore.SignalR;
using Newtonsoft.Json;
public class StreamingBackgroundService : BackgroundService
{
private readonly IRabbitMQService _rabbitMQService;
private readonly IHubContext<StreamingHub> _hubContext;
private readonly ILogger<StreamingBackgroundService> _logger;
public StreamingBackgroundService(
IRabbitMQService rabbitMQService,
IHubContext<StreamingHub> hubContext,
ILogger<StreamingBackgroundService> logger)
{
_rabbitMQService = rabbitMQService;
_hubContext = hubContext;
_logger = logger;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
_logger.LogInformation("Streaming Background Service started");
_rabbitMQService.StartConsuming("logs", async (message) =>
{
try
{
var logEntry = JsonConvert.DeserializeObject<LogEntry>(message);
if (logEntry != null)
{
await _hubContext.Clients.Group("logs").SendAsync("ReceiveLogEntry", logEntry, stoppingToken);
}
}
catch (Exception ex)
{
_logger.LogError(ex, "Error processing log message: {Message}", message);
}
});
_rabbitMQService.StartConsuming("data", async (message) =>
{
try
{
var streamData = JsonConvert.DeserializeObject<StreamData>(message);
if (streamData != null)
{
await _hubContext.Clients.Group("data").SendAsync("ReceiveStreamData", streamData, stoppingToken);
}
}
catch (Exception ex)
{
_logger.LogError(ex, "Error processing data message: {Message}", message);
}
});
await Task.Delay(Timeout.Infinite, stoppingToken);
}
public override async Task StopAsync(CancellationToken cancellationToken)
{
_logger.LogInformation("Streaming Background Service stopping");
_rabbitMQService.StopConsuming();
await base.StopAsync(cancellationToken);
}
}
Step 6: Create API Controllers
Add controllers for publishing data to the streams:
// Controllers/LogsController.cs
[ApiController]
[Route("api/[controller]")]
public class LogsController : ControllerBase
{
private readonly IRabbitMQService _rabbitMQService;
private readonly ILogger<LogsController> _logger;
public LogsController(IRabbitMQService rabbitMQService, ILogger<LogsController> logger)
{
_rabbitMQService = rabbitMQService;
_logger = logger;
}
[HttpPost]
public IActionResult PublishLog([FromBody] LogEntry logEntry)
{
try
{
_rabbitMQService.PublishMessage("logs", logEntry);
return Ok(new { Message = "Log published successfully", LogId = logEntry.Id });
}
catch (Exception ex)
{
_logger.LogError(ex, "Error publishing log entry");
return StatusCode(500, new { Error = "Failed to publish log entry" });
}
}
[HttpPost("batch")]
public IActionResult PublishLogBatch([FromBody] List<LogEntry> logEntries)
{
try
{
foreach (var logEntry in logEntries)
{
_rabbitMQService.PublishMessage("logs", logEntry);
}
return Ok(new { Message = $"{logEntries.Count} logs published successfully" });
}
catch (Exception ex)
{
_logger.LogError(ex, "Error publishing log batch");
return StatusCode(500, new { Error = "Failed to publish log batch" });
}
}
}
// Controllers/DataController.cs
[ApiController]
[Route("api/[controller]")]
public class DataController : ControllerBase
{
private readonly IRabbitMQService _rabbitMQService;
private readonly ILogger<DataController> _logger;
public DataController(IRabbitMQService rabbitMQService, ILogger<DataController> logger)
{
_rabbitMQService = rabbitMQService;
_logger = logger;
}
[HttpPost("stream")]
public IActionResult PublishStreamData([FromBody] StreamData streamData)
{
try
{
_rabbitMQService.PublishMessage("data", streamData);
return Ok(new { Message = "Data published successfully" });
}
catch (Exception ex)
{
_logger.LogError(ex, "Error publishing stream data");
return StatusCode(500, new { Error = "Failed to publish stream data" });
}
}
}
Step 7: Configure Services in Program.cs
Wire up all the services in your Program.cs file:
// Program.cs
using StreamingAPI.Services;
using StreamingAPI.Hubs;
var builder = WebApplication.CreateBuilder(args);
// Add services
builder.Services.AddControllers();
builder.Services.AddSignalR();
builder.Services.AddSingleton<IRabbitMQService, RabbitMQService>();
builder.Services.AddHostedService<StreamingBackgroundService>();
// Add CORS
builder.Services.AddCors(options =>
{
options.AddPolicy("AllowAll", policy =>
{
policy.AllowAnyOrigin()
.AllowAnyMethod()
.AllowAnyHeader();
});
});
builder.Services.AddEndpointsApiExplorer();
builder.Services.AddSwaggerGen();
var app = builder.Build();
// Configure pipeline
if (app.Environment.IsDevelopment())
{
app.UseSwagger();
app.UseSwaggerUI();
}
app.UseCors("AllowAll");
app.UseHttpsRedirection();
app.UseAuthorization();
app.MapControllers();
app.MapHub<StreamingHub>("/streaminghub");
app.Run();
Step 8: Configuration
Add RabbitMQ configuration to appsettings.json:
{
"Logging": {
"LogLevel": {
"Default": "Information",
"Microsoft.AspNetCore": "Warning"
}
},
"AllowedHosts": "*",
"RabbitMQ": {
"Hostname": "localhost",
"Username": "guest",
"Password": "guest"
}
}
Step 9: Client Implementation Example
Here's a simple JavaScript client to consume the streams:
<!DOCTYPE html>
<html>
<head>
<title>Streaming API Client</title>
<script src="https://cdnjs.cloudflare.com/ajax/libs/microsoft-signalr/6.0.1/signalr.min.js"></script>
</head>
<body>
<div id="logs"></div>
<div id="data"></div>
<script>
const connection = new signalR.HubConnectionBuilder()
.withUrl("https://localhost:7000/streaminghub")
.build();
connection.start().then(function () {
// Join groups
connection.invoke("JoinGroup", "logs");
connection.invoke("JoinGroup", "data");
});
// Handle log entries
connection.on("ReceiveLogEntry", function (logEntry) {
const logsDiv = document.getElementById("logs");
const logElement = document.createElement("div");
logElement.innerHTML = `<strong>${logEntry.timestamp}</strong> [${logEntry.level}] ${logEntry.message}`;
logsDiv.appendChild(logElement);
});
// Handle stream data
connection.on("ReceiveStreamData", function (streamData) {
const dataDiv = document.getElementById("data");
const dataElement = document.createElement("div");
dataElement.innerHTML = `<strong>${streamData.timestamp}</strong> [${streamData.type}] ${JSON.stringify(streamData.data)}`;
dataDiv.appendChild(dataElement);
});
</script>
</body>
</html>
Testing the API
To test your streaming API:
- Start RabbitMQ: Ensure RabbitMQ server is running
- Run the API:
dotnet run
- Open the client: Load the HTML client in a browser
- Send test data: Use Postman or curl to POST to /api/logs or /api/data/stream
Performance Considerations
- Connection Pooling: Use connection pooling for RabbitMQ connections in high-throughput scenarios
- Message Batching: Batch messages when possible to reduce overhead
- Memory Management: Implement proper disposal patterns for resources
- Error Handling: Add comprehensive error handling and retry logic
- Monitoring: Implement health checks and monitoring for both RabbitMQ and SignalR connections
Conclusion
You now have a robust streaming API built with .NET Core and RabbitMQ that can handle real-time log and data streaming. This architecture provides:
- High throughput message processing
- Real-time data delivery to multiple clients
- Scalable and maintainable code structure
- Reliable message queuing with RabbitMQ
- WebSocket-based real-time communication
This foundation can be extended with additional features like authentication, data filtering, persistence, and advanced routing based on your specific requirements.
Happy streaming! 🚀