tub opened a new issue, #7152: URL: https://github.com/apache/paimon/issues/7152
### Search before asking - [x] I searched in the [issues](https://github.com/apache/paimon/issues) and found nothing similar. ### Motivation Currently, paimon-python only supports batch reads - users must manually poll for new snapshots and manage read progress themselves. This makes it difficult to build streaming data pipelines in Python that react to changes as they occur. I'm proposing adding an asyncio-based streaming consumer, along with consumer-registration support. Wherever possible possible I've referred back to the Java implementation to keep them as similar as possible. I have a PoC branch for this locally, but I want to get some buy-in on the idea first before I start polishing it and push it publicly :) ### Solution ### Architecture ```mermaid flowchart TD SRB[StreamReadBuilder<br/>with_poll_interval_ms<br/>with_consumer_id<br/>with_shard<br/>with_include_row_kind] SRB --> ASTS[AsyncStreamingTableScan<br/>stream / stream_sync] SRB --> TR[TableRead<br/>reused from batch] SRB --> CM[ConsumerManager<br/>checkpoint / restore] ASTS --> FUS[FollowUpScanner<br/>per-snapshot] ASTS --> IDS[IncrementalDiffScanner<br/>catch-up mode] ``` ### Key Classes | Component | Purpose | |-----------|---------| | `StreamReadBuilder` | API for configuring streaming reads | | `AsyncStreamingTableScan` | Core streaming engine with async iterator | | `FollowUpScanner` | Interface for determining which snapshots to scan | | `DeltaFollowUpScanner` | Scans APPEND commits (changelog-producer=none) | | `ChangelogFollowUpScanner` | Scans changelog manifests (changelog-producer=input/full-compaction/lookup) | | `IncrementalDiffScanner` | Efficient catch-up when many snapshots behind | | `ConsumerManager` | Persists read progress for recovery | ## API Design ### Basic Usage ```python table = catalog.get_table('database.table') # Create streaming read builder stream_builder = table.new_stream_read_builder() stream_builder.with_poll_interval_ms(1000) # Create scan and reader scan = stream_builder.new_streaming_scan() table_read = stream_builder.new_read() # Async streaming (recommended) async for plan in scan.stream(): arrow_table = table_read.to_arrow(plan.splits()) process(arrow_table) # Or synchronous for plan in scan.stream_sync(): arrow_table = table_read.to_arrow(plan.splits()) process(arrow_table) ``` ### Consumer Registration (Progress Tracking) Compatible with the Java implementation. ```python stream_builder.with_consumer_id("my-etl-job") async for plan in scan.stream(): process(table_read.to_arrow(plan.splits())) # Persist progress to {table_path}/consumer/consumer-my-etl-job scan.notify_checkpoint_complete(scan.next_snapshot_id) ``` ### Parallel Consumption Expose an API to allow building consumers with subsets of the buckets. I'm not proposing we add any consumer group-membership synchronization in this, just the primitives that would allow folks to build that on top using an external strongly-consistent store like etcd/zookeeper. ```python # Consumer 0 of 4 reads buckets 0, 4, 8, ... stream_builder.with_shard(0, 4) # Or explicit bucket list stream_builder.with_buckets([0, 1, 2]) # Or custom filter stream_builder.with_bucket_filter(lambda b: b % 2 == 0) ``` ## Implementation Details ### Streaming Scan Flow ```mermaid flowchart LR IS[Initial Scan<br/>full state] --> POLL[Poll for New<br/>Snapshots] POLL --> PLAN[Create Plan from<br/>Delta/Changelog] POLL --> SHOULD{should_scan?} SHOULD -->|APPEND| YES[Yes - scan] SHOULD -->|COMPACT| NO[No - skip] SHOULD -->|OVERWRITE| NO ``` ### Changelog Producer Modes | Mode | Scanner | Data Source | |------|---------|-------------| | `none` (default) | `DeltaFollowUpScanner` | `delta_manifest_list` | | `input` | `ChangelogFollowUpScanner` | `changelog_manifest_list` | | `full-compaction` | `ChangelogFollowUpScanner` | `changelog_manifest_list` | | `lookup` | `ChangelogFollowUpScanner` | `changelog_manifest_list` | ### Performance Optimizations These could potentially be added afterwards, but we've already seen these caches benefit us a lot on the Flink side when using S3: 1. **Snapshot Caching**: LRU cache for snapshot metadata 2. **Manifest Caching**: LRU cache for manifest file contents 3. **Batch Lookahead**: Fetch multiple snapshots in parallel to skip COMPACT commits efficiently 4. **Prefetching**: Background thread prefetches next snapshot while current is processed 5. **Diff-based Catch-up**: When >10ish snapshots behind, use `IncrementalDiffScanner` to compute file diff instead of reading N delta_manifest_lists ### Dependencies - `cachetools` for LRU caching (already used by paimon-python) - No additional dependencies required ### Anything else? _No response_ ### Are you willing to submit a PR? - [x] I'm willing to submit a PR! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
