tub opened a new issue, #7152:
URL: https://github.com/apache/paimon/issues/7152

   ### Search before asking
   
   - [x] I searched in the [issues](https://github.com/apache/paimon/issues) 
and found nothing similar.
   
   
   ### Motivation
   
   Currently, paimon-python only supports batch reads - users must manually 
poll for new snapshots and manage read progress themselves. This makes it 
difficult to build streaming data pipelines in Python that react to changes as 
they occur.
   
   I'm proposing adding an asyncio-based streaming consumer, along with 
consumer-registration support. Wherever possible possible I've referred back to 
the Java implementation to keep them as similar as possible.
   
   I have a PoC branch for this locally, but I want to get some buy-in on the 
idea first before I start polishing it and push it publicly :) 
   
   ### Solution
   
   ### Architecture
   
   ```mermaid
   flowchart TD
       
SRB[StreamReadBuilder<br/>with_poll_interval_ms<br/>with_consumer_id<br/>with_shard<br/>with_include_row_kind]
   
       SRB --> ASTS[AsyncStreamingTableScan<br/>stream / stream_sync]
       SRB --> TR[TableRead<br/>reused from batch]
       SRB --> CM[ConsumerManager<br/>checkpoint / restore]
   
       ASTS --> FUS[FollowUpScanner<br/>per-snapshot]
       ASTS --> IDS[IncrementalDiffScanner<br/>catch-up mode]
   ```
   
   ### Key Classes
   
   | Component | Purpose |
   |-----------|---------|
   | `StreamReadBuilder` | API for configuring streaming reads |
   | `AsyncStreamingTableScan` | Core streaming engine with async iterator |
   | `FollowUpScanner` | Interface for determining which snapshots to scan |
   | `DeltaFollowUpScanner` | Scans APPEND commits (changelog-producer=none) |
   | `ChangelogFollowUpScanner` | Scans changelog manifests 
(changelog-producer=input/full-compaction/lookup) |
   | `IncrementalDiffScanner` | Efficient catch-up when many snapshots behind |
   | `ConsumerManager` | Persists read progress for recovery |
   
   ## API Design
   
   ### Basic Usage
   
   ```python
   table = catalog.get_table('database.table')
   
   # Create streaming read builder
   stream_builder = table.new_stream_read_builder()
   stream_builder.with_poll_interval_ms(1000)
   
   # Create scan and reader
   scan = stream_builder.new_streaming_scan()
   table_read = stream_builder.new_read()
   
   # Async streaming (recommended)
   async for plan in scan.stream():
       arrow_table = table_read.to_arrow(plan.splits())
       process(arrow_table)
   
   # Or synchronous
   for plan in scan.stream_sync():
       arrow_table = table_read.to_arrow(plan.splits())
       process(arrow_table)
   ```
   
   ### Consumer Registration (Progress Tracking)
   
   Compatible with the Java implementation.
   
   ```python
   stream_builder.with_consumer_id("my-etl-job")
   
   async for plan in scan.stream():
       process(table_read.to_arrow(plan.splits()))
       # Persist progress to {table_path}/consumer/consumer-my-etl-job
       scan.notify_checkpoint_complete(scan.next_snapshot_id)
   ```
   
   ### Parallel Consumption
   Expose an API to allow building consumers with subsets of the buckets.
   I'm not proposing we add any consumer group-membership synchronization in 
this, just the primitives that would allow folks to build that on top using an 
external strongly-consistent store like etcd/zookeeper.
   
   ```python
   # Consumer 0 of 4 reads buckets 0, 4, 8, ...
   stream_builder.with_shard(0, 4)
   
   # Or explicit bucket list
   stream_builder.with_buckets([0, 1, 2])
   
   # Or custom filter
   stream_builder.with_bucket_filter(lambda b: b % 2 == 0)
   ```
   
   ## Implementation Details
   
   ### Streaming Scan Flow
   
   ```mermaid
   flowchart LR
       IS[Initial Scan<br/>full state] --> POLL[Poll for New<br/>Snapshots]
       POLL --> PLAN[Create Plan from<br/>Delta/Changelog]
       POLL --> SHOULD{should_scan?}
       SHOULD -->|APPEND| YES[Yes - scan]
       SHOULD -->|COMPACT| NO[No - skip]
       SHOULD -->|OVERWRITE| NO
   ```
   
   ### Changelog Producer Modes
   
   | Mode | Scanner | Data Source |
   |------|---------|-------------|
   | `none` (default) | `DeltaFollowUpScanner` | `delta_manifest_list` |
   | `input` | `ChangelogFollowUpScanner` | `changelog_manifest_list` |
   | `full-compaction` | `ChangelogFollowUpScanner` | `changelog_manifest_list` 
|
   | `lookup` | `ChangelogFollowUpScanner` | `changelog_manifest_list` |
   
   ### Performance Optimizations 
   
   These could potentially be added afterwards, but we've already seen these 
caches benefit us a lot on the Flink side when using S3:
   
   1. **Snapshot Caching**: LRU cache for snapshot metadata
   2. **Manifest Caching**: LRU cache for manifest file contents
   3. **Batch Lookahead**: Fetch multiple snapshots in parallel to skip COMPACT 
commits efficiently
   4. **Prefetching**: Background thread prefetches next snapshot while current 
is processed
   5. **Diff-based Catch-up**: When >10ish snapshots behind, use 
`IncrementalDiffScanner` to compute file diff instead of reading N 
delta_manifest_lists
   
   ### Dependencies
   
   - `cachetools` for LRU caching (already used by paimon-python)
   - No additional dependencies required
   
   ### Anything else?
   
   _No response_
   
   ### Are you willing to submit a PR?
   
   - [x] I'm willing to submit a PR!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to