@breadstone/ziegel-rx ​
Reactive programming extensions and advanced RxJS utilities for the ziegel framework. Provides enhanced operators, reactive patterns, BLoC architecture, and subscription management utilities.
Reactive Extensions: Advanced reactive programming with custom operators, BLoC patterns, and enhanced RxJS utilities for complex data streams.
🚀 Overview ​
@breadstone/ziegel-rx provides:
- Enhanced RxJS Operators: Custom operators for common reactive patterns
- BLoC Architecture: Business Logic Component pattern implementation
- Subscription Management: Advanced subscription handling and cleanup
- Reactive Extensions: Extensions for observables and subjects
- Stream Utilities: Countdown, observe, and transformation utilities
- Pattern Implementations: Reactive design patterns for enterprise applications
📦 Installation ​
bash
npm install @breadstone/ziegel-rx
# or
yarn add @breadstone/ziegel-rx🧩 Features & Usage Examples ​
Enhanced RxJS Operators ​
typescript
import { notNull, empty } from '@breadstone/ziegel-rx';
import { map, filter } from 'rxjs/operators';
// Filter out null values from streams
const dataStream$ = source$.pipe(
notNull(), // Removes null and undefined values
map(data => data.value)
);
// Empty operator for placeholder streams
const emptyStream$ = empty<string>();BLoC Architecture Pattern ​
typescript
import { Bloc, BlocObserver, Transition } from '@breadstone/ziegel-rx';
// Define events and states
interface CounterEvent {
type: 'increment' | 'decrement';
}
interface CounterState {
count: number;
}
// Implement BLoC
class CounterBloc extends Bloc<CounterEvent, CounterState> {
constructor() {
super({ count: 0 });
}
mapEventToState(event: CounterEvent): CounterState {
const currentState = this.state;
switch (event.type) {
case 'increment':
return { count: currentState.count + 1 };
case 'decrement':
return { count: currentState.count - 1 };
default:
return currentState;
}
}
}
// Use BLoC
const counterBloc = new CounterBloc();
counterBloc.add({ type: 'increment' });BLoC Observer ​
typescript
import { BlocObserver, Transition } from '@breadstone/ziegel-rx';
class AppBlocObserver extends BlocObserver {
onTransition(transition: Transition<any, any>): void {
console.log('State transition:', transition);
}
onError(error: Error): void {
console.error('BLoC error:', error);
}
}
// Set global observer
Bloc.observer = new AppBlocObserver();Subscription Extensions ​
typescript
import {
CompositeUnsubscribable,
subscribeOnce,
once
} from '@breadstone/ziegel-rx';
// Composite subscription management
const subscriptions = new CompositeUnsubscribable();
subscriptions.add(
stream1$.subscribe(value => console.log('Stream 1:', value))
);
subscriptions.add(
stream2$.subscribe(value => console.log('Stream 2:', value))
);
// Unsubscribe from all at once
subscriptions.unsubscribe();
// Subscribe once to an observable
subscribeOnce(dataStream$, value => {
console.log('Received once:', value);
});
// Alternative syntax
once(dataStream$, value => {
console.log('Received once:', value);
});Observable Extensions ​
typescript
import {
observe,
toBehaviorSubject,
toReplaySubject,
toSubject
} from '@breadstone/ziegel-rx';
import { from } from 'rxjs';
// Convert arrays to observables with observe
const arrayObservable$ = observe([1, 2, 3, 4, 5]);
// Convert observables to different subject types
const source$ = from([1, 2, 3]);
const behaviorSubject = toBehaviorSubject(source$, 0); // With initial value
const replaySubject = toReplaySubject(source$, 3); // Replay last 3 values
const subject = toSubject(source$);Countdown Utilities ​
typescript
import { countdown, countdown2 } from '@breadstone/ziegel-rx';
// Countdown from 10 to 0
countdown(10).subscribe(count => {
console.log('Countdown:', count);
});
// Advanced countdown with custom interval
countdown2(30, 1000).subscribe(count => {
console.log('30 second countdown:', count);
});📚 Package import points ​
typescript
import {
// Subscription Management
CompositeUnsubscribable,
// Extensions
countdown, countdown2, observe,
subscribeOnce, once,
toBehaviorSubject, toReplaySubject, toSubject,
// Operators
empty, notNull,
// BLoC Pattern
Bloc, BlocObserver, Transition,
EventStreamClosedException
} from '@breadstone/ziegel-rx';📚 API Documentation ​
For detailed API documentation, visit: API Docs
Related Packages ​
- @breadstone/ziegel-core: Foundation utilities and reactive patterns
- rxjs: Core reactive programming library
License ​
MIT
Issues ​
Please report bugs and feature requests in the Issue Tracker
## Best Practices
### Subscription Management
```typescript
// Always unsubscribe to prevent memory leaks
class ProperSubscriptionManagement {
private subscriptions = new Subscription();
init() {
// Add multiple subscriptions to composite subscription
this.subscriptions.add(
stream1$.subscribe(/* handler */)
);
this.subscriptions.add(
stream2$.subscribe(/* handler */)
);
// Use takeUntil pattern
const destroy$ = new Subject<void>();
stream3$.pipe(
takeUntil(destroy$)
).subscribe(/* handler */);
}
cleanup() {
this.subscriptions.unsubscribe();
}
}Error Handling Best Practices ​
typescript
// Handle errors at appropriate levels
const robustStream$ = sourceStream$.pipe(
// Handle specific errors close to source
catchError(error => {
if (error instanceof NetworkError) {
return timer(1000).pipe(
switchMap(() => sourceStream$) // Retry after delay
);
}
throw error; // Re-throw unknown errors
}),
// Global error handling at application level
catchError(error => {
errorService.report(error);
return of(fallbackValue);
})
);Operator Chaining ​
typescript
// Keep operator chains readable
const processedData$ = sourceData$.pipe(
// Data validation
filter(data => data.isValid),
// Data transformation
map(data => normalizeData(data)),
// Async operations
switchMap(data => enrichWithExternalData(data)),
// Error handling
catchError(handleProcessingError),
// Side effects
tap(data => logProcessedData(data)),
// Final transformation
map(data => formatForDisplay(data))
);Migration Guide ​
From RxJS ​
typescript
// RxJS code works with minimal changes
import { Observable, map, filter } from 'rxjs';
// This works as-is with Ziegel RX
const stream$ = new Observable(subscriber => {
subscriber.next('Hello');
subscriber.complete();
}).pipe(
map(value => value.toUpperCase()),
filter(value => value.length > 0)
);
// Enhanced with Ziegel features
import { Observable } from '@ziegel/rx';
const enhancedStream$ = new Observable(subscriber => {
subscriber.next('Hello');
subscriber.complete();
}).pipe(
// Additional Ziegel operators available
debugOperator(),
performanceMonitor(),
errorBoundary()
);API Reference ​
For complete API documentation, see the API Reference.
Related Packages ​
ziegel-core- Core framework functionalityziegel-platform- Platform services and infrastructure