Skip to content

@breadstone/ziegel-platform-messaging ​

MIT LicenseTypeScriptnpm

Event-driven messaging and communication infrastructure for the ziegel platform. Provides publish-subscribe patterns, event aggregation, reactive messaging, and decoupled communication for enterprise applications.

Messaging: Enterprise event-driven architecture with pub-sub patterns, event aggregation, and reactive messaging capabilities.

🚀 Overview ​

@breadstone/ziegel-platform-messaging provides:

  • Event Aggregator: Centralized event management and distribution
  • Pub-Sub Events: Publish-subscribe messaging patterns
  • Event Decorators: Decorator-based event handling
  • Subscription Management: Typed subscription tokens and management
  • Reactive Integration: RxJS integration for reactive event streams
  • Type Safety: Strongly typed events and subscriptions

📦 Installation ​

bash
npm install @breadstone/ziegel-platform-messaging
# or
yarn add @breadstone/ziegel-platform-messaging

🧩 Features & Usage Examples ​

Event Aggregator ​

typescript
import { EventAggregator, PubSubEvent } from '@breadstone/ziegel-platform-messaging';

// Create event types
class UserLoggedIn extends PubSubEvent {
  constructor(public userId: string, public timestamp: Date) {
    super();
  }
}

class OrderCreated extends PubSubEvent {
  constructor(public orderId: string, public customerId: string, public amount: number) {
    super();
  }
}

// Use event aggregator
const eventAggregator = new EventAggregator();

// Subscribe to events
const subscription = eventAggregator.subscribe(UserLoggedIn, (event) => {
  console.log(`User ${event.userId} logged in at ${event.timestamp}`);
});

// Publish events
eventAggregator.publish(new UserLoggedIn('user123', new Date()));
eventAggregator.publish(new OrderCreated('order456', 'user123', 99.99));

Subscription Tokens ​

typescript
import { EventAggregator, SubscriptionToken } from '@breadstone/ziegel-platform-messaging';

class NotificationService {
  private tokens: SubscriptionToken[] = [];

  constructor(private eventAggregator: EventAggregator) {
    this.setupSubscriptions();
  }

  private setupSubscriptions(): void {
    // Subscribe with tokens for cleanup
    const userToken = this.eventAggregator.subscribe(UserLoggedIn, this.handleUserLogin);
    const orderToken = this.eventAggregator.subscribe(OrderCreated, this.handleOrderCreated);

    this.tokens.push(userToken, orderToken);
  }

  private handleUserLogin = (event: UserLoggedIn): void => {
    this.sendNotification('login', `Welcome back, ${event.userId}!`);
  };

  private handleOrderCreated = (event: OrderCreated): void => {
    this.sendNotification('order', `Order ${event.orderId} created for $${event.amount}`);
  };

  private sendNotification(type: string, message: string): void {
    console.log(`[${type.toUpperCase()}] ${message}`);
  }

  dispose(): void {
    // Clean up subscriptions
    this.tokens.forEach(token => this.eventAggregator.unsubscribe(token));
    this.tokens = [];
  }
}

Event Decorators ​

typescript
import { Event, EventAggregator } from '@breadstone/ziegel-platform-messaging';

class UserService {
  constructor(private eventAggregator: EventAggregator) {}

  @Event(UserLoggedIn)
  async loginUser(userId: string, password: string): Promise<User> {
    // Authenticate user
    const user = await this.authenticate(userId, password);

    // Event is automatically published when method completes
    return user;
  }

  @Event(UserLoggedOut)
  async logoutUser(userId: string): Promise<void> {
    // Logout logic
    await this.clearSession(userId);

    // Event is automatically published
  }
}

Event Subscriptions ​

typescript
import { EventSubscription, IEventSubscription } from '@breadstone/ziegel-platform-messaging';

class CustomEventSubscription implements IEventSubscription {
  constructor(
    private eventType: new (...args: any[]) => any,
    private handler: (event: any) => void,
    private filters?: (event: any) => boolean
  ) {}

  handle(event: any): void {
    if (this.filters && !this.filters(event)) {
      return; // Skip if filters don't match
    }

    this.handler(event);
  }

  dispose(): void {
    // Cleanup logic
  }
}

// Use custom subscription
const eventAggregator = new EventAggregator();
const customSubscription = new CustomEventSubscription(
  UserLoggedIn,
  (event) => console.log('Filtered login:', event.userId),
  (event) => event.userId.startsWith('admin') // Only admin users
);

eventAggregator.subscribe(UserLoggedIn, customSubscription.handle.bind(customSubscription));

Reactive Event Streams ​

typescript
import { fromPubSubEvent, pubSub, pubSubEvent } from '@breadstone/ziegel-platform-messaging';
import { filter, map, debounceTime } from 'rxjs/operators';

const eventAggregator = new EventAggregator();

// Create observable streams from events
const userLogins$ = fromPubSubEvent(eventAggregator, UserLoggedIn);
const orderCreated$ = pubSub(eventAggregator, OrderCreated);

// Process login events
userLogins$.pipe(
  filter(event => event.userId.startsWith('premium')),
  debounceTime(1000)
).subscribe(event => {
  console.log('Premium user logged in:', event.userId);
});

// Process order events
orderCreated$.pipe(
  filter(event => event.amount > 100),
  map(event => ({ orderId: event.orderId, highValue: true }))
).subscribe(data => {
  console.log('High value order:', data.orderId);
});

// Combine multiple event streams
import { combineLatest } from 'rxjs';

combineLatest([userLogins$, orderCreated$]).pipe(
  filter(([login, order]) => login.userId === order.customerId)
).subscribe(([login, order]) => {
  console.log(`User ${login.userId} created order ${order.orderId} after login`);
});

📚 Package Exports ​

typescript
import {
    // Event Decorators
    Event,

    // Core Event Aggregation
    EventAggregator,
    IEventAggregator,

    // Events
    PubSubEvent,

    // Subscriptions
    SubscriptionToken,
    EventSubscription,
    IEventSubscription,

    // Reactive Extensions
    fromPubSubEvent,
    pubSub,
    pubSubEvent
} from '@breadstone/ziegel-platform-messaging';

🔧 Advanced Usage ​

Event Bus with Multiple Aggregators ​

typescript
import { EventAggregator, IEventAggregator } from '@breadstone/ziegel-platform-messaging';

class EventBus {
  private aggregators = new Map<string, IEventAggregator>();

  getAggregator(domain: string): IEventAggregator {
    if (!this.aggregators.has(domain)) {
      this.aggregators.set(domain, new EventAggregator());
    }
    return this.aggregators.get(domain)!;
  }

  publishToDomain<T extends PubSubEvent&gt;(domain: string, event: T): void {
    const aggregator = this.getAggregator(domain);
    aggregator.publish(event);
  }

  subscribeToDomain<T extends PubSubEvent&gt;(
    domain: string,
    eventType: new (...args: any[]) => T,
    handler: (event: T) => void
  ): SubscriptionToken {
    const aggregator = this.getAggregator(domain);
    return aggregator.subscribe(eventType, handler);
  }
}

// Usage
const eventBus = new EventBus();

// Domain-specific events
eventBus.subscribeToDomain('user', UserLoggedIn, (event) => {
  console.log('User domain event:', event.userId);
});

eventBus.subscribeToDomain('order', OrderCreated, (event) => {
  console.log('Order domain event:', event.orderId);
});

eventBus.publishToDomain('user', new UserLoggedIn('user123', new Date()));
eventBus.publishToDomain('order', new OrderCreated('order456', 'user123', 99.99));

Event Middleware ​

typescript
class EventMiddleware {
  constructor(private eventAggregator: EventAggregator) {
    this.setupMiddleware();
  }

  private setupMiddleware(): void {
    // Intercept all events for logging
    const originalPublish = this.eventAggregator.publish;

    this.eventAggregator.publish = <T extends PubSubEvent&gt;(event: T): void => {
      // Pre-processing
      console.log('Publishing event:', event.constructor.name);
      this.logEvent(event);

      // Call original publish
      originalPublish.call(this.eventAggregator, event);

      // Post-processing
      this.updateMetrics(event);
    };
  }

  private logEvent(event: PubSubEvent): void {
    // Log to external service
    fetch('/api/events/log', {
      method: 'POST',
      body: JSON.stringify({
        eventType: event.constructor.name,
        timestamp: new Date(),
        data: event
      })
    });
  }

  private updateMetrics(event: PubSubEvent): void {
    // Update event metrics
    console.log(`Event ${event.constructor.name} processed`);
  }
}

Event Sourcing Pattern ​

typescript
import { EventAggregator, PubSubEvent } from '@breadstone/ziegel-platform-messaging';

abstract class DomainEvent extends PubSubEvent {
  abstract readonly aggregateId: string;
  abstract readonly eventVersion: number;
  readonly timestamp = new Date();
}

class UserCreated extends DomainEvent {
  readonly aggregateId: string;
  readonly eventVersion = 1;

  constructor(public userId: string, public email: string, public name: string) {
    super();
    this.aggregateId = userId;
  }
}

class UserEmailChanged extends DomainEvent {
  readonly aggregateId: string;
  readonly eventVersion = 1;

  constructor(public userId: string, public oldEmail: string, public newEmail: string) {
    super();
    this.aggregateId = userId;
  }
}

class EventStore {
  private events: DomainEvent[] = [];

  constructor(private eventAggregator: EventAggregator) {
    this.setupEventHandlers();
  }

  private setupEventHandlers(): void {
    // Store all domain events
    this.eventAggregator.subscribe(DomainEvent, (event) => {
      this.events.push(event);
      console.log(`Stored event: ${event.constructor.name} for ${event.aggregateId}`);
    });
  }

  getEventsForAggregate(aggregateId: string): DomainEvent[] {
    return this.events.filter(event => event.aggregateId === aggregateId);
  }

  getAllEvents(): DomainEvent[] {
    return [...this.events];
  }
}

// Usage
const eventAggregator = new EventAggregator();
const eventStore = new EventStore(eventAggregator);

// Publish domain events
eventAggregator.publish(new UserCreated('user123', 'user@example.com', 'John Doe'));
eventAggregator.publish(new UserEmailChanged('user123', 'user@example.com', 'john@example.com'));

// Retrieve event history
const userEvents = eventStore.getEventsForAggregate('user123');
console.log('User event history:', userEvents);

🎯 Integration Examples ​

React Component Integration ​

typescript
import React, { useEffect, useState } from 'react';
import { EventAggregator, PubSubEvent } from '@breadstone/ziegel-platform-messaging';

class NotificationEvent extends PubSubEvent {
  constructor(public message: string, public type: 'info' | 'warning' | 'error') {
    super();
  }
}

const eventAggregator = new EventAggregator();

function NotificationDisplay() {
  const [notifications, setNotifications] = useState([]);

  useEffect(() => {
    const token = eventAggregator.subscribe(NotificationEvent, (event) => {
      const notification = {
        id: Date.now(),
        message: event.message,
        type: event.type
      };

      setNotifications(prev => [...prev, notification]);

      // Auto-remove after 5 seconds
      setTimeout(() => {
        setNotifications(prev => prev.filter(n => n.id !== notification.id));
      }, 5000);
    });

    return () => {
      eventAggregator.unsubscribe(token);
    };
  }, []);

  return (
    React.createElement('div', null,
      notifications.map(notification =>
        React.createElement('div', {
          key: notification.id,
          className: `notification ${notification.type}`
        }, notification.message)
      )
    )
  );
}

// Trigger notifications from anywhere
function showNotification(message: string, type: 'info' | 'warning' | 'error' = 'info') {
  eventAggregator.publish(new NotificationEvent(message, type));
}

Service Communication ​

typescript
import { EventAggregator } from '@breadstone/ziegel-platform-messaging';

class UserRegistrationService {
  constructor(
    private eventAggregator: EventAggregator,
    private emailService: EmailService,
    private auditService: AuditService
  ) {
    this.setupEventHandlers();
  }

  private setupEventHandlers(): void {
    this.eventAggregator.subscribe(UserCreated, this.handleUserCreated.bind(this));
  }

  async registerUser(userData: UserData): Promise&lt;User&gt; {
    // Create user
    const user = await this.createUser(userData);

    // Publish event
    this.eventAggregator.publish(new UserCreated(user.id, user.email, user.name));

    return user;
  }

  private async handleUserCreated(event: UserCreated): Promise&lt;void&gt; {
    // Send welcome email
    await this.emailService.sendWelcomeEmail(event.email, event.name);

    // Log audit event
    await this.auditService.logUserRegistration(event.userId);
  }
}

📚 API Documentation ​

For detailed API documentation, visit: API Docs

  • @breadstone/ziegel-platform: Core platform services
  • @breadstone/ziegel-rx: Reactive extensions
  • @breadstone/ziegel-core: Foundation utilities

License ​

MIT

Issues ​

Please report bugs and feature requests in the Issue Tracker


Part of the ziegel Enterprise TypeScript Framework readonly timestamp = new Date();

constructor( public readonly orderId: string, public readonly paymentId: string, public readonly amount: number ) {} }

// Create event bus const eventBus = new EventBus({ transport: 'redis', persistence: true, partitioning: { strategy: 'hash', field: 'customerId' } });

// Register event handlers @EventHandler(OrderPlaced) class OrderNotificationHandler { async handle(event: OrderPlaced): Promise<void> { await emailService.sendOrderConfirmation(event.customerId, event.orderId); } }

@EventHandler(OrderPlaced) class InventoryHandler { async handle(event: OrderPlaced): Promise<void> { await inventoryService.reserveItems(event.orderId); } }

// Register handlers eventBus.registerHandler(new OrderNotificationHandler()); eventBus.registerHandler(new InventoryHandler());

// Publish events await eventBus.publish(new OrderPlaced('order-123', 'customer-456', 99.99));


### Request/Response Pattern

```typescript
import {
  RequestBus,
  RequestHandler,
  Query,
  Command
} from '@ziegel/platform-messaging';

// Define commands and queries
class CreateUser implements Command {
  readonly type = 'CreateUser';

  constructor(
    public readonly userData: {
      email: string;
      name: string;
    }
  ) {}
}

class GetUser implements Query {
  readonly type = 'GetUser';

  constructor(public readonly userId: string) {}
}

// Define response types
interface CreateUserResponse {
  userId: string;
  success: boolean;
}

interface GetUserResponse {
  user: User | null;
  found: boolean;
}

// Create request bus
const requestBus = new RequestBus({
  timeout: 30000,
  retries: 2
});

// Register command handlers
@RequestHandler(CreateUser)
class CreateUserHandler {
  async handle(command: CreateUser): Promise&lt;CreateUserResponse&gt; {
    const user = await userService.create(command.userData);
    return {
      userId: user.id,
      success: true
    };
  }
}

@RequestHandler(GetUser)
class GetUserHandler {
  async handle(query: GetUser): Promise&lt;GetUserResponse&gt; {
    const user = await userService.findById(query.userId);
    return {
      user,
      found: user !== null
    };
  }
}

// Use request/response
const response = await requestBus.send<CreateUser, CreateUserResponse&gt;(
  new CreateUser({ email: 'user@example.com', name: 'John Doe' })
);

if (response.success) {
  console.log('User created with ID:', response.userId);
}

Message Queues ​

typescript
import {
  MessageQueue,
  QueueWorker,
  JobProcessor
} from '@ziegel/platform-messaging';

// Define job types
interface ProcessImage {
  type: 'ProcessImage';
  imageId: string;
  transformations: string[];
}

interface SendBulkEmail {
  type: 'SendBulkEmail';
  recipients: string[];
  templateId: string;
  data: Record&lt;string, any&gt;;
}

// Create message queue
const imageQueue = new MessageQueue<ProcessImage&gt;({
  name: 'image-processing',
  transport: 'redis',
  concurrency: 5,
  retry: {
    maxAttempts: 3,
    backoffStrategy: 'exponential',
    delays: [1000, 5000, 15000]
  },
  deadLetter: {
    enabled: true,
    queueName: 'image-processing-dlq'
  }
});

// Create job processor
@JobProcessor('ProcessImage')
class ImageProcessor {
  async process(job: ProcessImage): Promise&lt;void&gt; {
    console.log(`Processing image ${job.imageId}`);

    for (const transformation of job.transformations) {
      await imageService.applyTransformation(job.imageId, transformation);
    }

    console.log(`Image ${job.imageId} processed successfully`);
  }
}

// Create worker
const worker = new QueueWorker({
  queues: [imageQueue],
  processors: [new ImageProcessor()],
  concurrency: 3
});

// Start worker
await worker.start();

// Add jobs to queue
await imageQueue.add({
  type: 'ProcessImage',
  imageId: 'img-123',
  transformations: ['resize', 'watermark', 'compress']
});

Advanced Features ​

Saga Pattern ​

typescript
import {
  Saga,
  SagaStep,
  SagaManager
} from '@ziegel/platform-messaging';

// Define saga for order processing
class OrderProcessingSaga extends Saga {
  constructor(private orderId: string) {
    super(`order-processing-${orderId}`);
  }

  @SagaStep('OrderPlaced')
  async handleOrderPlaced(event: OrderPlaced): Promise&lt;void&gt; {
    // Step 1: Reserve inventory
    await this.send(new ReserveInventory(event.orderId));
    this.addCompensation(new ReleaseInventory(event.orderId));
  }

  @SagaStep('InventoryReserved')
  async handleInventoryReserved(event: InventoryReserved): Promise&lt;void&gt; {
    // Step 2: Process payment
    await this.send(new ProcessPayment(event.orderId, event.amount));
    this.addCompensation(new RefundPayment(event.orderId));
  }

  @SagaStep('PaymentProcessed')
  async handlePaymentProcessed(event: PaymentProcessed): Promise&lt;void&gt; {
    // Step 3: Ship order
    await this.send(new ShipOrder(event.orderId));
    this.complete();
  }

  @SagaStep('PaymentFailed')
  async handlePaymentFailed(event: PaymentFailed): Promise&lt;void&gt; {
    // Compensate by releasing inventory
    await this.compensate();
  }
}

// Register saga
const sagaManager = new SagaManager({
  persistence: 'redis',
  timeout: 300000 // 5 minutes
});

sagaManager.register(OrderProcessingSaga);

// Start saga when order is placed
eventBus.subscribe('OrderPlaced', async (event: OrderPlaced) => {
  await sagaManager.start(new OrderProcessingSaga(event.orderId), event);
});

Event Sourcing ​

typescript
import {
  EventStore,
  AggregateRoot,
  EventStream
} from '@ziegel/platform-messaging';

// Define aggregate
class User extends AggregateRoot {
  private _id: string;
  private _email: string;
  private _isActive: boolean;

  constructor(id?: string) {
    super();
    if (id) this._id = id;
  }

  // Business methods
  static create(id: string, email: string): User {
    const user = new User();
    user.raiseEvent(new UserCreated(id, email));
    return user;
  }

  activate(): void {
    if (this._isActive) {
      throw new Error('User is already active');
    }
    this.raiseEvent(new UserActivated(this._id));
  }

  deactivate(): void {
    if (!this._isActive) {
      throw new Error('User is already inactive');
    }
    this.raiseEvent(new UserDeactivated(this._id));
  }

  // Event handlers
  @EventHandler(UserCreated)
  private onUserCreated(event: UserCreated): void {
    this._id = event.userId;
    this._email = event.email;
    this._isActive = false;
  }

  @EventHandler(UserActivated)
  private onUserActivated(event: UserActivated): void {
    this._isActive = true;
  }

  @EventHandler(UserDeactivated)
  private onUserDeactivated(event: UserDeactivated): void {
    this._isActive = false;
  }

  // Getters
  get id(): string { return this._id; }
  get email(): string { return this._email; }
  get isActive(): boolean { return this._isActive; }
}

// Event store
const eventStore = new EventStore({
  transport: 'postgresql',
  snapshotting: {
    enabled: true,
    frequency: 10 // Snapshot every 10 events
  }
});

// Repository
class UserRepository {
  async save(user: User): Promise&lt;void&gt; {
    const events = user.getUncommittedEvents();
    await eventStore.saveEvents(user.id, events, user.version);
    user.markEventsAsCommitted();
  }

  async getById(id: string): Promise&lt;User&gt; {
    const events = await eventStore.getEvents(id);
    const user = new User(id);
    user.loadFromHistory(events);
    return user;
  }
}

Message Persistence ​

typescript
import {
  MessageStore,
  OutboxPattern,
  InboxPattern
} from '@ziegel/platform-messaging';

// Outbox pattern for reliable publishing
class UserService {
  constructor(
    private userRepository: UserRepository,
    private outbox: OutboxPattern
  ) {}

  async createUser(userData: CreateUserData): Promise&lt;User&gt; {
    return await this.outbox.execute(async (transaction) => {
      // Business logic
      const user = User.create(userData.email, userData.name);
      await this.userRepository.save(user, transaction);

      // Add events to outbox
      await this.outbox.addEvent(
        new UserCreated(user.id, user.email),
        transaction
      );

      return user;
    });
  }
}

// Inbox pattern for idempotent processing
class EmailService {
  constructor(private inbox: InboxPattern) {}

  @InboxHandler('UserCreated')
  async handleUserCreated(event: UserCreated, messageId: string): Promise&lt;void&gt; {
    await this.inbox.process(messageId, async () => {
      await this.sendWelcomeEmail(event.email);
    });
  }
}

Transport Adapters ​

Redis Transport ​

typescript
import { RedisTransport } from '@ziegel/platform-messaging/redis';

const redisTransport = new RedisTransport({
  connection: {
    host: 'localhost',
    port: 6379,
    password: 'redis-password'
  },
  keyPrefix: 'app:messaging:',
  serialization: 'json',
  compression: true,
  clustering: {
    enabled: true,
    nodes: [
      { host: 'redis-1', port: 6379 },
      { host: 'redis-2', port: 6379 }
    ]
  }
});

RabbitMQ Transport ​

typescript
import { RabbitMQTransport } from '@ziegel/platform-messaging/rabbitmq';

const rabbitTransport = new RabbitMQTransport({
  connection: {
    hostname: 'localhost',
    port: 5672,
    username: 'guest',
    password: 'guest'
  },
  exchanges: {
    events: {
      type: 'topic',
      durable: true
    }
  },
  queues: {
    'user-events': {
      durable: true,
      arguments: {
        'x-dead-letter-exchange': 'dlx'
      }
    }
  }
});

AWS SQS/SNS Transport ​

typescript
import { AWSTransport } from '@ziegel/platform-messaging/aws';

const awsTransport = new AWSTransport({
  region: 'us-east-1',
  credentials: {
    accessKeyId: process.env.AWS_ACCESS_KEY_ID,
    secretAccessKey: process.env.AWS_SECRET_ACCESS_KEY
  },
  sqs: {
    queueUrl: 'https://sqs.us-east-1.amazonaws.com/123456789/my-queue',
    visibilityTimeout: 300,
    maxReceiveCount: 3
  },
  sns: {
    topicArn: 'arn:aws:sns:us-east-1:123456789:my-topic'
  }
});

Configuration ​

typescript
interface MessagingConfig {
  // Transport configuration
  transport: {
    type: 'memory' | 'redis' | 'rabbitmq' | 'aws' | 'custom';
    options: TransportOptions;
  };

  // Serialization
  serialization: {
    format: 'json' | 'msgpack' | 'protobuf';
    compression: boolean;
  };

  // Retry configuration
  retry: {
    maxAttempts: number;
    backoffStrategy: 'fixed' | 'exponential' | 'linear';
    initialDelay: number;
    maxDelay: number;
    jitter: boolean;
  };

  // Dead letter handling
  deadLetter: {
    enabled: boolean;
    queueName: string;
    maxRetentionTime: number;
  };

  // Performance
  performance: {
    batchSize: number;
    concurrency: number;
    prefetch: number;
    ackTimeout: number;
  };

  // Monitoring
  monitoring: {
    metrics: boolean;
    tracing: boolean;
    healthChecks: boolean;
  };
}

Testing ​

typescript
import {
  createTestMessageBus,
  MockTransport,
  MessageCapture
} from '@ziegel/platform-messaging/testing';

describe('User Service', () => {
  let messageBus: MessageBus;
  let messageCapture: MessageCapture;

  beforeEach(() => {
    messageCapture = new MessageCapture();
    messageBus = createTestMessageBus({
      transport: new MockTransport(),
      capture: messageCapture
    });
  });

  it('should publish UserCreated event', async () => {
    const userService = new UserService(messageBus);

    await userService.createUser({
      email: 'test@example.com',
      name: 'Test User'
    });

    expect(messageCapture.getPublishedEvents()).toContainEqual(
      expect.objectContaining({
        type: 'UserCreated',
        email: 'test@example.com'
      })
    );
  });

  it('should handle message processing', async () => {
    const handler = jest.fn();
    messageBus.subscribe('TestEvent', handler);

    await messageBus.publish({
      type: 'TestEvent',
      data: 'test'
    });

    expect(handler).toHaveBeenCalledWith(
      expect.objectContaining({
        type: 'TestEvent',
        data: 'test'
      })
    );
  });
});

Best Practices ​

Message Design ​

typescript
// Use typed messages with clear structure
interface BaseMessage {
  readonly type: string;
  readonly timestamp: Date;
  readonly correlationId?: string;
  readonly causationId?: string;
}

interface DomainEvent extends BaseMessage {
  readonly aggregateId: string;
  readonly aggregateVersion: number;
}

// Version your messages
interface UserCreatedV1 extends DomainEvent {
  readonly type: 'UserCreated';
  readonly email: string;
}

interface UserCreatedV2 extends DomainEvent {
  readonly type: 'UserCreated';
  readonly version: 2;
  readonly email: string;
  readonly firstName: string;
  readonly lastName: string;
}

Error Handling ​

typescript
// Implement comprehensive error handling
class RobustMessageHandler {
  async handle(message: DomainEvent): Promise&lt;void&gt; {
    try {
      await this.processMessage(message);
    } catch (error) {
      if (error instanceof RetryableError) {
        throw error; // Let the messaging system retry
      }

      // Log non-retryable errors and move to dead letter
      logger.error('Non-retryable error processing message', {
        messageType: message.type,
        error: error.message,
        messageId: message.correlationId
      });

      await this.handlePoisonMessage(message, error);
    }
  }
}

Performance Optimization ​

typescript
// Use batching for high-throughput scenarios
const batchProcessor = new BatchMessageProcessor({
  batchSize: 100,
  maxWaitTime: 1000,
  processor: async (messages: DomainEvent[]) => {
    await database.transaction(async (tx) => {
      for (const message of messages) {
        await this.processMessage(message, tx);
      }
    });
  }
});

Migration Guide ​

From Event Emitter ​

typescript
// Old: Node.js EventEmitter
const EventEmitter = require('events');
const emitter = new EventEmitter();

emitter.on('user-created', (userData) => {
  console.log('User created:', userData);
});

emitter.emit('user-created', { id: '123', name: 'John' });

// New: ziegel-platform-messaging
const messageBus = new MessageBus(config);

messageBus.subscribe<UserCreated&gt;('UserCreated', (event) => {
  console.log('User created:', event);
});

await messageBus.publish<UserCreated&gt;('UserCreated', {
  type: 'UserCreated',
  userId: '123',
  name: 'John',
  timestamp: new Date()
});

API Reference ​

For detailed API documentation, see the auto-generated API reference.