Hi Giannis, Thanks for the thoughtful reply. A few clarifications. On Option B and log_offset: Option B keeps the same pinned snapshot across all batches, so the log_offset guarantee is identical to Option A.
On FetchLog: FetchLog is stateless on the tablet server. The client holds the offset and sends it with each request. There is no server-side session between RPCs. The ScannerManager would be the first per-client session state on a tablet server. On the iterator being "slightly fewer bytes per session": I ran a test against Fluss's default RocksDB config (64MB write buffer) with separate DB instances and identical workloads. A held iterator pinned 23 MB of memtable memory *rocksdb.size-all-mem-tables) compared to 2 KB with snapshot only, because the iterator pins the physical memtable even after flush. This is consistent with what the RocksDB wiki documents, and why it recommends keeping iterators short-lived. The seek overhead for Option B was 29 µs per batch, under 1% of the time spent reading a 4MB batch. Not suggesting this blocks the vote, so let's proceed with these performance concerns being flagged. KR, Anton пт, 27 мар. 2026 г. в 08:33, Lorenzo Affetti <[email protected] >: > @jark thanks for clarifying. > > 1) Yeah, I see your point. > The first "stateless" design I proposed misses the consistency aspect. > The second "design" for saving snapshots is quite similar to this one, > which is paginated. > Thanks for catching the similarity and clarifying. > > 2) Thanks for highlighting the precise design choice to favor pull vs push. > That definitely makes sense. > > Sorted out, thank you! > > On Fri, Mar 27, 2026 at 3:49 AM Anton Borisov <[email protected]> > wrote: > >> Hi Giannis, >> Thanks for working on this, my concern is the continuation model. >> >> The current design makes the server own both consistency and progress by >> keeping a live iterator session across RPCs. That is efficient, but it is >> also what makes retries awkward and assumes single-threaded iterator >> access, without enforcing it on the server side. More generally, once we >> keep server-side scan state across calls, we take on the usual session >> lifecycle machinery around expiry, fencing, and cleanup. >> >> There’s a middle ground worth evaluating explicitly: keep snapshot >> isolation if we need it, but return an opaque resume token, similar to >> Cassandra paging state, and have the client send it back unchanged on the >> next request. That keeps the consistency guarantee, but moves scan progress >> into the protocol instead of a live iterator session on the server. The >> tradeoff is an extra seek per batch. So in return, retries become much >> easier to reason about and the server has less mutable state to manage >> between calls. >> >> This feels close to what Lorenzo was asking to see evaluated. Also >> continuing Jark's reasoning about `call_seq_id` being simpler than >> key-based cursors - I think the comparison shifts when the token is opaque >> server-produced bytes rather than a client-constructed key, so the client >> never serializes anything, it just echoes the token back. >> >> Also I feel it would be good if the FIP compared: >> >> - live iterator session(HBase approach described in original FIP) >> - snapshot-only with continuation token(Cassandra style) >> - opaque stateless cursor(no snapshot isolation at all) >> >> and say why the extra machinery is justified for the use cases we’re >> targeting. >> >> KR, >> Anton >> >> чт, 26 мар. 2026 г. в 17:11, Giannis Polyzos <[email protected]>: >> >>> Hi Jark, >>> >>> Thank you for your input and for providing more context... I updated the >>> FIP based on your suggestions. >>> >>> I was debating with the API design, and my new design didn't take into >>> account the old design with the BatchScanner. I will follow the old >>> design >>> since it's more consistent to what we have now. >>> >>> Best, >>> Giannis >>> >>> On Thu, Mar 26, 2026 at 5:50 PM Jark Wu <[email protected]> wrote: >>> >>> > Hi Lorenzo, >>> > >>> > I can help clarify some of the design decisions, as I was involved in >>> > the initial design of this FIP. >>> > >>> > (1) paginated requests with snapshot ID >>> > >>> > The current design of this FIP is exactly a paginated request with a >>> > snapshot ID. When a new scan request is initiated, the server takes a >>> > snapshot, and all subsequent streaming scan requests retrieve >>> > paginated data from that specific snapshot. >>> > >>> > The only distinction lies in the pagination mechanism. Your proposal >>> > uses keys, whereas the FIP utilizes `call_seq_id` (effectively a page >>> > ID). Adopting `call_seq_id` is a simpler and more efficient >>> > implementation. Using keys would be heavier, as they may require >>> > serializing multiple fields, significantly increasing the request >>> > payload size. >>> > >>> > >>> > (2) true streaming RPC >>> > IIUC, your design proposes a push-based model, whereas this FIP adopts >>> > a pull-based model, consistent with the architecture of Fluss and >>> > Kafka (e.g., in `FetchLog`). We chose the pull model for two key >>> > reasons: >>> > >>> > * Flow Control: In a push model, the server dictates the rate, >>> > risking consumer overload and causing backpressure that can exhaust >>> > server threads. The pull model allows clients to fetch data only when >>> > they are ready, naturally preventing congestion. >>> > * Replayability: Handling network failures is simpler with pulling. >>> > Clients can easily re-fetch missing data using a `page_id` or log >>> > offset. In contrast, managing state, offsets, and varying consumer >>> > speeds for thousands of clients in a push model adds significant >>> > complexity, effectively turning the server into a stateful dispatcher. >>> > >>> > Best, >>> > Jark >>> > >>> > >>> > >>> > On Fri, 27 Mar 2026 at 00:28, Jark Wu <[email protected]> wrote: >>> > > >>> > > Hi Giannis, >>> > > >>> > > Thanks for driving this. I only have some questions on the FIP. >>> > > >>> > > (1) Scan API >>> > > The FIP says "Remove the LIMIT requirement from >>> > > TableScan.createBatchScanner(TableBucket) for PK tables" which reuses >>> > > the existing "Table.newScan.createBatchScan(..)" API. But the Public >>> > > Interface section introduce a new API "Table.newKvScan.execute()". >>> > > >>> > > I believe extending the existing "Table.newScan().createBatchScan()" >>> > > to support non-LIMIT queries would provide a more unified approach. >>> > > Since this API already handles KV scan functionality, introducing a >>> > > separate KvScan interface could confuse users regarding which method >>> > > to choose. >>> > > >>> > > (2) The protobuf definition currently uses `uint32` and `uint64` for >>> > > several fields. Since Java lacks native support for unsigned >>> integers, >>> > > this forces us to rely on `Integer.toUnsignedLong()` wrappers >>> > > throughout the codebase, adding unnecessary complexity. Switching to >>> > > `int32` and `int64` would significantly simplify the logic and >>> improve >>> > > code readability. >>> > > >>> > > (3) Configuration. >>> > > I have a minor suggestion regarding the configuration naming. Since >>> > > this scanner is designed exclusively for KV tables, and our existing >>> > > server-side KV configurations consistently use the kv.* prefix, I >>> > > recommend changing server.scanner.* to kv.scanner.*. This adjustment >>> > > will ensure better consistency with our current naming conventions." >>> > > >>> > > Best, >>> > > Jark >>> > > >>> > > On Thu, 26 Mar 2026 at 16:44, Lorenzo Affetti via dev >>> > > <[email protected]> wrote: >>> > > > >>> > > > Ehy Giannis! Sorry for the late reply. >>> > > > >>> > > > *Is the ScannerManager / session state actually required?* >>> > > > >>> > > > *The key thing with this design is snapshot isolation across the >>> entire >>> > > > bucket scan.* >>> > > > *A ScannerContext takes a RocksDB snapshot and a >>> ResourceGuard.Lease, >>> > and >>> > > > all subsequent batches read from that snapshot.* >>> > > > >>> > > > I understand, however, with the same approach, I see easier ways to >>> > > > implement this. For example, make the client reply not only with a >>> > > > "last_key" but also with a "snapshot_id". >>> > > > The server would need to store the open snapshots (this is the >>> stateful >>> > > > part) to re-use them upon request. >>> > > > You can drop the iterator, but hold a pointer to the snapshot; this >>> > should >>> > > > tell RocksDB not to compact it out as the snapshot is not released. >>> > > > >>> > > > This approach would drastically reduce server-side complexity and >>> hold >>> > > > significantly less state. >>> > > > I think this should be worth exploring, and if really impossible >>> added >>> > as a >>> > > > rejected alternative. >>> > > > >>> > > > >>> > > > *Could it be a single streaming RPC?* >>> > > > >>> > > > *Since the Fluss RPC layer is a custom Netty-based request/response >>> > > > protocol, not gRPC, introducing a server-streaming primitive would >>> > require >>> > > > many changes and effort.* >>> > > > >>> > > > I understand, however, missing such a primitive still impacts >>> Fluss, >>> > and it >>> > > > seems to me that we are still paying here the cost of implementing >>> a >>> > custom >>> > > > streaming RPC for a special case. >>> > > > It may be worth investing in the transport primitive once rather >>> than >>> > > > duplicating this pattern every time a streaming operation is >>> needed. >>> > > > One example is log tailing, from polling to true streaming RPC. >>> Another >>> > > > example is changelog streaming. >>> > > > >>> > > > What if we opt for streaming RPC FIP before this one so that this >>> can >>> > > > directly benefit from that? >>> > > > >>> > > > >>> > > > On Fri, Mar 13, 2026 at 7:12 PM Giannis Polyzos < >>> [email protected] >>> > > >>> > > > wrote: >>> > > > >>> > > > > Hi Lorenzo, >>> > > > > >>> > > > > And thank you for your comments. Lots of cool things to digest >>> here, >>> > and I >>> > > > > would be eager to hear more people's thoughts. >>> > > > > >>> > > > > Regarding the API >>> > > > > table.newScan() >>> > > > > .withSource(ScanSource.LIVE_KV) // vs SNAPSHOT, LOG >>> > > > > .execute() >>> > > > > >>> > > > > Currently, the scan method is based on the table creation, so I >>> want >>> > to >>> > > > > differentiate between a scan (streaming consumption of a log) and >>> > KvScan (a >>> > > > > bounded rocksdb full table query). Then there is also some >>> > functionality >>> > > > > that we might not want to expose to the user, for example about >>> the >>> > > > > snapshot. This should be internal functionality. >>> > > > > >>> > > > > *Is the ScannerManager / session state actually required?* >>> > > > > Pagination can definitely be a simple approach. However, the key >>> > thing >>> > > > > with this design is snapshot isolation across the entire bucket >>> > scan. A >>> > > > > ScannerContext takes a RocksDB snapshot and a >>> ResourceGuard.Lease, >>> > and all >>> > > > > subsequent batches read from that snapshot. This means: >>> > > > > 1. Every key that existed at scan-open time is returned exactly >>> once. >>> > > > > 2. No key is duplicated because it was compacted and rewritten >>> > between two >>> > > > > requests. >>> > > > > 3. No key is skipped because it was deleted and reinserted >>> between >>> > two >>> > > > > requests. >>> > > > > With cursor pagination, you lose that guarantee. >>> > > > > >>> > > > > >>> > > > > *Could it be a single streaming RPC?* >>> > > > > This would be a great approach, but my understanding is that we >>> can't >>> > > > > support this. Since the Fluss RPC layer is a custom Netty-based >>> > > > > request/response protocol, not gRPC, introducing a >>> server-streaming >>> > > > > primitive would require many changes and effort. Let me know if >>> I'm >>> > > > > mistaken. >>> > > > > >>> > > > > Best, >>> > > > > Giannis >>> > > > > >>> > > > > >>> > > > > On Fri, Mar 13, 2026 at 2:34 PM Lorenzo Affetti via dev < >>> > > > > [email protected]> wrote: >>> > > > > >>> > > > >> Hello Giannis! >>> > > > >> Very instructive read, I went through that and I was actually >>> > astonished >>> > > > >> that this was not the "default" mode that Fluss operates on PK >>> > tables when >>> > > > >> a snapshot is requested. >>> > > > >> Very good that we have this initiative. >>> > > > >> >>> > > > >> *Is a new scanner type explicitly required?* >>> > > > >> >>> > > > >> I don't think it needs to be addressed now, but a good thing to >>> > keep in >>> > > > >> mind after this FIP gets implemented. >>> > > > >> >>> > > > >> >>> > > > >> The cleaner design would arguably be to unify these under a >>> single >>> > > > >> abstraction where the scan source (live RocksDB vs. remote >>> > snapshot) and >>> > > > >> chunking behavior are implementation details, not separate >>> scanner >>> > > > >> classes. >>> > > > >> The KvScan interface being introduced (table.newKvScan()) is a >>> step >>> > in >>> > > > >> that >>> > > > >> direction but it still sits alongside rather than replacing the >>> > existing >>> > > > >> scan path. >>> > > > >> >>> > > > >> A truly unified design might look like: >>> > > > >> >>> > > > >> table.newScan() >>> > > > >> .withSource(ScanSource.LIVE_KV) // vs SNAPSHOT, LOG >>> > > > >> .execute() >>> > > > >> >>> > > > >> I hope we keep this in mind for future, relevant situations. >>> > > > >> >>> > > > >> --------- >>> > > > >> >>> > > > >> *Is the ScannerManager / session state actually required?* >>> > > > >> >>> > > > >> The session exists because of the chunked, multi-RPC design -- >>> the >>> > server >>> > > > >> needs to remember where the iterator is between calls. >>> > > > >> But is that statefulness necessary at all? >>> > > > >> >>> > > > >> The stateless alternative: cursor-based pagination >>> > > > >> >>> > > > >> Instead of keeping a server-side iterator session, you could do >>> > stateless >>> > > > >> resumption using a last-seen key as a cursor: >>> > > > >> >>> > > > >> Request 1: scan from beginning -> returns rows + >>> > last_key="key_0500" >>> > > > >> Request 2: scan from "key_0500" -> returns rows + >>> > last_key="key_1000" >>> > > > >> Request 3: scan from "key_1000" -> ... >>> > > > >> >>> > > > >> Each request is fully independent. The server opens a fresh >>> RocksDB >>> > > > >> iterator, seeks to the cursor key, reads a batch, and closes >>> it. No >>> > > > >> session, no TTL, no ScannerManager. >>> > > > >> >>> > > > >> Advantages: >>> > > > >> >>> > > > >> - Massively simpler server side -- no session lifecycle, no >>> TTL >>> > reaper, >>> > > > >> no leadership fencing complexity >>> > > > >> - Naturally resilient to server restarts and leadership >>> changes >>> > -- >>> > > > >> client just retries from the last cursor >>> > > > >> - No SCANNER_EXPIRED / UNKNOWN_SCANNER_ID error classes >>> needed >>> > > > >> >>> > > > >> Tradeoffs: >>> > > > >> >>> > > > >> - *Consistency weakens slightly* -- each batch opens a fresh >>> > RocksDB >>> > > > >> snapshot, so you might see a key move between batches if it >>> was >>> > updated >>> > > > >> between requests. With the session approach, the entire >>> bucket >>> > scan is >>> > > > >> under one snapshot. >>> > > > >> - Seek cost -- RocksDB iterator seeks are not free, >>> especially >>> > across >>> > > > >> many SST files. For very large tables with many chunks this >>> adds >>> > up, >>> > > > >> though >>> > > > >> for the small key spaces FIP-17 targets it's likely >>> negligible. >>> > > > >> - Cursor encoding needs care for binary keys. >>> > > > >> >>> > > > >> Could it be a single streaming RPC? >>> > > > >> >>> > > > >> Rather than a request/response sequence with session state, >>> you'd >>> > have a >>> > > > >> single server-streaming RPC: >>> > > > >> >>> > > > >> client -> ScanKvRequest (open) >>> > > > >> server -> stream of ScanKvResponse chunks >>> > > > >> server -> closes stream when exhausted >>> > > > >> >>> > > > >> If this is possible, the entire ScannerManager session >>> complexity >>> > > > >> evaporates -- the iterator just lives for the duration of the >>> > stream, held >>> > > > >> naturally by the connection. >>> > > > >> >>> > > > >> >>> > > > >> >>> > > > >> On Tue, Mar 10, 2026 at 9:59 AM Keith Lee < >>> > [email protected]> >>> > > > >> wrote: >>> > > > >> >>> > > > >> > Hello Giannis, >>> > > > >> > >>> > > > >> > Thank you for the update to the proposal! Quickly skimmed >>> through >>> > and I >>> > > > >> > like the updates that you’ve made! Questions / comments below: >>> > > > >> > >>> > > > >> > 1. You mentioned an extra section on heartbeat on the FIP, >>> but I >>> > do not >>> > > > >> see >>> > > > >> > heartbeat being mentioned on the latest version of the FIP? >>> +1 >>> > If the >>> > > > >> > proposal is updated to rely solely on last scan for TTL and >>> remove >>> > > > >> > heartbeat, it’s a great change. If I remember correctly, the >>> > previous >>> > > > >> > version was to use heartbeat as keepalive, there is a risk of >>> > unclosed, >>> > > > >> > idle scanner holding resources on server side indefinitely and >>> > causing >>> > > > >> > leak. >>> > > > >> > >>> > > > >> > 2. On continuation request, should we check lastAccessTimeMs >>> and >>> > reject >>> > > > >> if >>> > > > >> > elapsed time is larger than TTL? Otherwise sessions can idle >>> > between 60 >>> > > > >> and >>> > > > >> > 90 (TTL+ reaper interval). This might be exacerbated if user >>> > configure >>> > > > >> > particularly high TTL and reaper interval. >>> > > > >> > >>> > > > >> > 3. On SCANNER_EXPIRED, is it necessary to have a separate >>> error >>> > for >>> > > > >> expired >>> > > > >> > scanner? We can have a single UNKNOWN_OR_EXPIRED_SCANNER >>> (renaming >>> > > > >> > UNKNOWN_SCANNER_ID). These are both terminal and non >>> retriable, I >>> > > > >> imagine >>> > > > >> > that handling it from client side would not differ. It’s also >>> a >>> > small >>> > > > >> > simplification to the implementation. >>> > > > >> > >>> > > > >> > 4. On pipelining. If the user queries for top-n every 10 >>> seconds >>> > to >>> > > > >> update >>> > > > >> > leaderboard, would pipelining cause higher unnecessary >>> traffic? >>> > E.g. >>> > > > >> they >>> > > > >> > only care about n records but pipelining automatically fetch >>> up >>> > to 8mb. >>> > > > >> > >>> > > > >> > 5. Also on pipelining, while it seems that we’re keeping Flink >>> > connector >>> > > > >> > out of scope, IIRC Flink split fetcher also pipelines. If we >>> use >>> > this to >>> > > > >> > update Flink connector, we’d have higher amount buffered in >>> > pipeline. >>> > > > >> > >>> > > > >> > 6. On expiration interval, should we hide that configuration >>> and >>> > choose >>> > > > >> to >>> > > > >> > expose it if there’s a strong need for it? It’s fewer config >>> for >>> > users >>> > > > >> to >>> > > > >> > reason about and 30s expiration sounds like a good starting >>> point. >>> > > > >> > >>> > > > >> > Best regards >>> > > > >> > >>> > > > >> > Keith Lee >>> > > > >> > >>> > > > >> > >>> > > > >> > On Tue, 10 Mar 2026 at 08:49, Giannis Polyzos < >>> > [email protected]> >>> > > > >> > wrote: >>> > > > >> > >>> > > > >> > > Hi devs, >>> > > > >> > > Let me know if there are any comments here, otherwise I >>> would >>> > like to >>> > > > >> > start >>> > > > >> > > a vote thread. >>> > > > >> > > >>> > > > >> > > Best, >>> > > > >> > > Giannis >>> > > > >> > > >>> > > > >> > > On Thu, 5 Mar 2026 at 3:38 PM, Giannis Polyzos < >>> > [email protected] >>> > > > >> > >>> > > > >> > > wrote: >>> > > > >> > > >>> > > > >> > > > Hi devs, >>> > > > >> > > > >>> > > > >> > > > After a long time, i will like to reinitiate the >>> discussions >>> > on >>> > > > >> FIP-17. >>> > > > >> > > > >>> > > > >> > > > I made quite a few updates on the FIP, which you can find >>> > here: >>> > > > >> > > > >>> > > > >> > > > >>> > > > >> > > >>> > > > >> > >>> > > > >> >>> > >>> https://cwiki.apache.org/confluence/display/FLUSS/FIP-17+Primary+Key+Table+Snapshot+Queries >>> > > > >> > > > and updated the title to better reflect the goal. Let me >>> know >>> > if it >>> > > > >> > makes >>> > > > >> > > > sense. >>> > > > >> > > > >>> > > > >> > > > Moreover in the end of the proposal, you will find a >>> section >>> > as >>> > > > >> *extras >>> > > > >> > > *which >>> > > > >> > > > has a suggestion for a heartbeat mechanism. However, >>> during >>> > my PoC, >>> > > > >> I >>> > > > >> > > found >>> > > > >> > > > that this is not really needed, but >>> > > > >> > > > I would like your thoughts and feedback first. >>> > > > >> > > > >>> > > > >> > > > Best, >>> > > > >> > > > Giannis >>> > > > >> > > > >>> > > > >> > > > On Wed, Oct 29, 2025 at 2:45 PM Giannis Polyzos < >>> > > > >> [email protected] >>> > > > >> > > >>> > > > >> > > > wrote: >>> > > > >> > > > >>> > > > >> > > >> Yang, thank you for your thoughtful comments. >>> > > > >> > > >> >>> > > > >> > > >> Indeed, we are streaming the results to the client; >>> however, >>> > it's >>> > > > >> > still >>> > > > >> > > a >>> > > > >> > > >> batch operation. We could use "KV store (or PK table) >>> > Snapshot >>> > > > >> Query" >>> > > > >> > > or >>> > > > >> > > >> something similar, since we are querying a RocksDB >>> snapshot. >>> > WDYT? >>> > > > >> > > >> The newly introduced KvBatchScanner should be able to be >>> > reused >>> > > > >> from >>> > > > >> > > both >>> > > > >> > > >> the client itself - assume a scenario that I want to >>> > periodically >>> > > > >> > query >>> > > > >> > > the >>> > > > >> > > >> full RocksDB KV store to power real-time dashboards - as >>> > well as >>> > > > >> Flink >>> > > > >> > > >> (with more engines to follow later). >>> > > > >> > > >> It issues requests to fetch the results per bucket and >>> > transmit >>> > > > >> them >>> > > > >> > > back >>> > > > >> > > >> to the client. >>> > > > >> > > >> >>> > > > >> > > >> > Could you elaborate on why the new KvBatchScanner isn't >>> > reusable? >>> > > > >> > > >> I think the reasoning here is that reach requests create >>> a >>> > new >>> > > > >> > > >> KvBatchScanner, which polls the records and then closes >>> > > > >> automatically. >>> > > > >> > > Any >>> > > > >> > > >> reason you see this as a limitation, and we should >>> consider >>> > making >>> > > > >> it >>> > > > >> > > >> reusable? >>> > > > >> > > >> >>> > > > >> > > >> The design aims mainly for the Fluss client API.. Should >>> we >>> > add an >>> > > > >> > > >> integration design with Flink? Wang Cheng, WDYT? >>> > > > >> > > >> >>> > > > >> > > >> Best, >>> > > > >> > > >> Giannis >>> > > > >> > > >> >>> > > > >> > > >> >>> > > > >> > > >> >>> > > > >> > > >> On Tue, Oct 28, 2025 at 4:44 AM Yang Wang < >>> > > > >> [email protected]> >>> > > > >> > > >> wrote: >>> > > > >> > > >> >>> > > > >> > > >>> Hi Cheng, >>> > > > >> > > >>> >>> > > > >> > > >>> Thank you for driving this excellent work! Your FIP >>> > document shows >>> > > > >> > > great >>> > > > >> > > >>> thought and initiative. I've gone through it and have >>> some >>> > > > >> questions >>> > > > >> > > and >>> > > > >> > > >>> suggestions that I hope can further enhance this >>> valuable >>> > > > >> > contribution. >>> > > > >> > > >>> >>> > > > >> > > >>> 1、Regarding the Title, I believe we could consider >>> changing >>> > it to >>> > > > >> > > >>> "Support >>> > > > >> > > >>> full scan in batch mode for PrimaryKey Table". The term >>> > > > >> "Streaming" >>> > > > >> > > might >>> > > > >> > > >>> cause confusion with Flink's streaming/batch modes, and >>> this >>> > > > >> revised >>> > > > >> > > >>> title >>> > > > >> > > >>> would provide better clarity. >>> > > > >> > > >>> >>> > > > >> > > >>> 2、In the Motivation section, I think there are two >>> > particularly >>> > > > >> > > important >>> > > > >> > > >>> benefits worth highlighting: (1) OLAP engines will be >>> able >>> > to >>> > > > >> perform >>> > > > >> > > >>> full >>> > > > >> > > >>> snapshot reads on Fluss primary-key tables. (2) This >>> > approach can >>> > > > >> > > replace >>> > > > >> > > >>> the current KvSnapshotBatchScanner, allowing the Fluss >>> > client to >>> > > > >> > > >>> eliminate >>> > > > >> > > >>> its RocksDB dependency entirely. >>> > > > >> > > >>> >>> > > > >> > > >>> 3、Concerning the Proposed Changes, could you clarify >>> when >>> > exactly >>> > > > >> the >>> > > > >> > > >>> client creates a KV snapshot on the server side, and >>> when >>> > we send >>> > > > >> the >>> > > > >> > > >>> bucket_scan_req? >>> > > > >> > > >>> >>> > > > >> > > >>> Let me share my thinking on this: When Flink attempts to >>> > read >>> > > > >> from a >>> > > > >> > > >>> PrimaryKey table, the FlinkSourceEnumerator in the >>> JobMaster >>> > > > >> > generates >>> > > > >> > > >>> HybridSnapshotLogSplit and dispatches them to >>> SplitReaders >>> > > > >> running on >>> > > > >> > > the >>> > > > >> > > >>> TaskManager. The JobMaster doesn't actually read data—it >>> > merely >>> > > > >> > defines >>> > > > >> > > >>> and >>> > > > >> > > >>> manages the splits. Therefore, we need to ensure the JM >>> has >>> > > > >> > sufficient >>> > > > >> > > >>> information to determine the boundary of the KV snapshot >>> > and the >>> > > > >> > > >>> startOffset of the LogSplit. >>> > > > >> > > >>> >>> > > > >> > > >>> I suggest we explicitly create a snapshot (or as you've >>> > termed >>> > > > >> it, a >>> > > > >> > > >>> new_scan_request) on the server side. This way, the >>> > > > >> > > FlinkSourceEnumerator >>> > > > >> > > >>> can use it to define a HybridSnapshotLogSplit, and the >>> > > > >> SplitReaders >>> > > > >> > can >>> > > > >> > > >>> perform pollBatch operations on this snapshot (which >>> would >>> > be >>> > > > >> bound >>> > > > >> > to >>> > > > >> > > >>> the >>> > > > >> > > >>> specified scanner_id). >>> > > > >> > > >>> >>> > > > >> > > >>> 4、 Could you elaborate on why the new KvBatchScanner >>> isn't >>> > > > >> reusable? >>> > > > >> > > >>> What's >>> > > > >> > > >>> the reasoning behind this limitation? (I believe RocksDB >>> > > > >> iterators do >>> > > > >> > > >>> support the seekToFirst operation.) If a TaskManager >>> fails >>> > over >>> > > > >> > before >>> > > > >> > > a >>> > > > >> > > >>> checkpoint, rescanning an existing snapshot seems like a >>> > natural >>> > > > >> > > >>> requirement. >>> > > > >> > > >>> >>> > > > >> > > >>> 5、I think it would be beneficial to include some >>> detailed >>> > design >>> > > > >> > > aspects >>> > > > >> > > >>> regarding Flink's integration with the new BatchScanner. >>> > > > >> > > >>> >>> > > > >> > > >>> Overall, this is a solid foundation for an important >>> > enhancement. >>> > > > >> > > Looking >>> > > > >> > > >>> forward to discussing these points further! >>> > > > >> > > >>> >>> > > > >> > > >>> Best regards, Yang >>> > > > >> > > >>> >>> > > > >> > > >>> Wang Cheng <[email protected]> 于2025年10月22日周三 >>> > 17:09写道: >>> > > > >> > > >>> >>> > > > >> > > >>> > Hi all, >>> > > > >> > > >>> > >>> > > > >> > > >>> > >>> > > > >> > > >>> > As of v0.8, Fluss only supports KV snapshot batch >>> scan and >>> > > > >> limit KV >>> > > > >> > > >>> batch >>> > > > >> > > >>> > scan. The former approach is constrained by snapshot >>> > > > >> availability >>> > > > >> > and >>> > > > >> > > >>> > remote storage performance, while the later one is >>> only >>> > > > >> applicable >>> > > > >> > to >>> > > > >> > > >>> > queries with LIMIT clause and risks high memory >>> pressure. >>> > > > >> > > >>> > >>> > > > >> > > >>> > >>> > > > >> > > >>> > To address those limitations, Giannis Polyzos and I >>> are >>> > writing >>> > > > >> to >>> > > > >> > > >>> propose >>> > > > >> > > >>> > FIP-17: a general-purpose streaming KV scan for Fluss >>> [1]. >>> > > > >> > > >>> > >>> > > > >> > > >>> > >>> > > > >> > > >>> > Any feedback and suggestions on this proposal are >>> welcome! >>> > > > >> > > >>> > >>> > > > >> > > >>> > >>> > > > >> > > >>> > [1]: >>> > > > >> > > >>> > >>> > > > >> > > >>> >>> > > > >> > > >>> > > > >> > >>> > > > >> >>> > >>> https://cwiki.apache.org/confluence/display/FLUSS/FIP-17+Streaming+KV+Scan+RPC >>> > > > >> > > >>> > >>> > > > >> > > >>> > Regards, >>> > > > >> > > >>> > Cheng >>> > > > >> > > >>> > >>> > > > >> > > >>> > >>> > > > >> > > >>> > >>> > > > >> > > >>> > >>> > > > >> > > >>> >>> > > > >> > > >> >>> > > > >> > > >>> > > > >> > >>> > > > >> >>> > > > >> >>> > > > >> -- >>> > > > >> Lorenzo Affetti >>> > > > >> Senior Software Engineer @ Flink Team >>> > > > >> Ververica <http://www.ververica.com> >>> > > > >> >>> > > > > >>> > > > >>> > > > -- >>> > > > Lorenzo Affetti >>> > > > Senior Software Engineer @ Flink Team >>> > > > Ververica <http://www.ververica.com> >>> > >>> >> > > -- > Lorenzo Affetti > Senior Software Engineer @ Flink Team > Ververica <http://www.ververica.com> >
