Skip to content

@breadstone/ziegel-rx ​

MIT LicenseTypeScriptnpm

Reactive programming extensions and advanced RxJS utilities for the ziegel framework. Provides enhanced operators, reactive patterns, BLoC architecture, and subscription management utilities.

Reactive Extensions: Advanced reactive programming with custom operators, BLoC patterns, and enhanced RxJS utilities for complex data streams.

🚀 Overview ​

@breadstone/ziegel-rx provides:

  • Enhanced RxJS Operators: Custom operators for common reactive patterns
  • BLoC Architecture: Business Logic Component pattern implementation
  • Subscription Management: Advanced subscription handling and cleanup
  • Reactive Extensions: Extensions for observables and subjects
  • Stream Utilities: Countdown, observe, and transformation utilities
  • Pattern Implementations: Reactive design patterns for enterprise applications

📦 Installation ​

bash
npm install @breadstone/ziegel-rx
# or
yarn add @breadstone/ziegel-rx

🧩 Features & Usage Examples ​

Enhanced RxJS Operators ​

typescript
import { notNull, empty } from '@breadstone/ziegel-rx';
import { map, filter } from 'rxjs/operators';

// Filter out null values from streams
const dataStream$ = source$.pipe(
  notNull(), // Removes null and undefined values
  map(data => data.value)
);

// Empty operator for placeholder streams
const emptyStream$ = empty<string&gt;();

BLoC Architecture Pattern ​

typescript
import { Bloc, BlocObserver, Transition } from '@breadstone/ziegel-rx';

// Define events and states
interface CounterEvent {
  type: 'increment' | 'decrement';
}

interface CounterState {
  count: number;
}

// Implement BLoC
class CounterBloc extends Bloc<CounterEvent, CounterState&gt; {
  constructor() {
    super({ count: 0 });
  }

  mapEventToState(event: CounterEvent): CounterState {
    const currentState = this.state;
    switch (event.type) {
      case 'increment':
        return { count: currentState.count + 1 };
      case 'decrement':
        return { count: currentState.count - 1 };
      default:
        return currentState;
    }
  }
}

// Use BLoC
const counterBloc = new CounterBloc();
counterBloc.add({ type: 'increment' });

BLoC Observer ​

typescript
import { BlocObserver, Transition } from '@breadstone/ziegel-rx';

class AppBlocObserver extends BlocObserver {
  onTransition(transition: Transition<any, any&gt;): void {
    console.log('State transition:', transition);
  }

  onError(error: Error): void {
    console.error('BLoC error:', error);
  }
}

// Set global observer
Bloc.observer = new AppBlocObserver();

Subscription Extensions ​

typescript
import {
  CompositeUnsubscribable,
  subscribeOnce,
  once
} from '@breadstone/ziegel-rx';

// Composite subscription management
const subscriptions = new CompositeUnsubscribable();

subscriptions.add(
  stream1$.subscribe(value => console.log('Stream 1:', value))
);
subscriptions.add(
  stream2$.subscribe(value => console.log('Stream 2:', value))
);

// Unsubscribe from all at once
subscriptions.unsubscribe();

// Subscribe once to an observable
subscribeOnce(dataStream$, value => {
  console.log('Received once:', value);
});

// Alternative syntax
once(dataStream$, value => {
  console.log('Received once:', value);
});

Observable Extensions ​

typescript
import {
  observe,
  toBehaviorSubject,
  toReplaySubject,
  toSubject
} from '@breadstone/ziegel-rx';
import { from } from 'rxjs';

// Convert arrays to observables with observe
const arrayObservable$ = observe([1, 2, 3, 4, 5]);

// Convert observables to different subject types
const source$ = from([1, 2, 3]);

const behaviorSubject = toBehaviorSubject(source$, 0); // With initial value
const replaySubject = toReplaySubject(source$, 3); // Replay last 3 values
const subject = toSubject(source$);

Countdown Utilities ​

typescript
import { countdown, countdown2 } from '@breadstone/ziegel-rx';

// Countdown from 10 to 0
countdown(10).subscribe(count => {
  console.log('Countdown:', count);
});

// Advanced countdown with custom interval
countdown2(30, 1000).subscribe(count => {
  console.log('30 second countdown:', count);
});

📚 Package import points ​

typescript
import {
    // Subscription Management
    CompositeUnsubscribable,

    // Extensions
    countdown, countdown2, observe,
    subscribeOnce, once,
    toBehaviorSubject, toReplaySubject, toSubject,

    // Operators
    empty, notNull,

    // BLoC Pattern
    Bloc, BlocObserver, Transition,
    EventStreamClosedException
} from '@breadstone/ziegel-rx';

📚 API Documentation ​

For detailed API documentation, visit: API Docs

  • @breadstone/ziegel-core: Foundation utilities and reactive patterns
  • rxjs: Core reactive programming library

License ​

MIT

Issues ​

Please report bugs and feature requests in the Issue Tracker


## Best Practices

### Subscription Management

```typescript
// Always unsubscribe to prevent memory leaks
class ProperSubscriptionManagement {
  private subscriptions = new Subscription();

  init() {
    // Add multiple subscriptions to composite subscription
    this.subscriptions.add(
      stream1$.subscribe(/* handler */)
    );

    this.subscriptions.add(
      stream2$.subscribe(/* handler */)
    );

    // Use takeUntil pattern
    const destroy$ = new Subject<void&gt;();

    stream3$.pipe(
      takeUntil(destroy$)
    ).subscribe(/* handler */);
  }

  cleanup() {
    this.subscriptions.unsubscribe();
  }
}

Error Handling Best Practices ​

typescript
// Handle errors at appropriate levels
const robustStream$ = sourceStream$.pipe(
  // Handle specific errors close to source
  catchError(error => {
    if (error instanceof NetworkError) {
      return timer(1000).pipe(
        switchMap(() => sourceStream$) // Retry after delay
      );
    }
    throw error; // Re-throw unknown errors
  }),

  // Global error handling at application level
  catchError(error => {
    errorService.report(error);
    return of(fallbackValue);
  })
);

Operator Chaining ​

typescript
// Keep operator chains readable
const processedData$ = sourceData$.pipe(
  // Data validation
  filter(data => data.isValid),

  // Data transformation
  map(data => normalizeData(data)),

  // Async operations
  switchMap(data => enrichWithExternalData(data)),

  // Error handling
  catchError(handleProcessingError),

  // Side effects
  tap(data => logProcessedData(data)),

  // Final transformation
  map(data => formatForDisplay(data))
);

Migration Guide ​

From RxJS ​

typescript
// RxJS code works with minimal changes
import { Observable, map, filter } from 'rxjs';

// This works as-is with Ziegel RX
const stream$ = new Observable(subscriber => {
  subscriber.next('Hello');
  subscriber.complete();
}).pipe(
  map(value => value.toUpperCase()),
  filter(value => value.length > 0)
);

// Enhanced with Ziegel features
import { Observable } from '@ziegel/rx';

const enhancedStream$ = new Observable(subscriber => {
  subscriber.next('Hello');
  subscriber.complete();
}).pipe(
  // Additional Ziegel operators available
  debugOperator(),
  performanceMonitor(),
  errorBoundary()
);

API Reference ​

For complete API documentation, see the API Reference.