Hi Yang,

Thanks for the thoughtful feedback. Your analysis has significantly improved 
the stability of introducing the this function. I will address each problem 
individually: 

Re for question 1:  

Good catch!  I encountered this issue during testing as well. The key problem 
is the time gap between fetching the latest kv snapshot and registering the kv 
snapshot consumer, which are not atomic operations. This was considered during 
the design phase, but to avoid altering the semantics of 
Admin#getLatestKvSnapshots, we chose to keep the register kv snapshot consumer 
as a separate API.

The solution is to add a retry mechanism: the failedTableBucketSet in the 
RegisterKvSnapshotResult returned by registerKvSnapshotConsumer must be retried 
repeatedly (Get latest snapshot -> register kv snapshot consumer) until all 
buckets are successfully registered.

------
Re for question 2: 

It's indeed worth considering continuing to retain multiple snapshots for one 
table bucket to reduce the impact on stability. The configuration 
'kv.snapshot.num-retained' will also be kept as is for now. However, there 
might still be a potential issue: excessive ZooKeeper metadata, which can be 
addressed as a separate FIP or issue.

------
Re for question 3:

For the following scenarios, we can consider them as rare cases or misuse:

1. A job continue failover and is restored after a day or longer.
2. Consuming a snapshot takes an exceptionally long time.
3. The same consumerId is used by multiple jobs to consume multiple tables.

These scenarios may lead to the snapshot being cleared before the user consumes 
it. For such cases, we can document them in the user guide, and users can also 
configure a larger expiration time to mitigate this problem.


-----
Re for question 4:

This question involves a trade-off between two strategies:

1. Expiration time based on the first registration: This accelerates the 
consumer's TTL, preventing snapshots from being retained for extended periods 
and avoiding metadata bloat.
2. Expiration time based on the latest registration: This minimizes the risk of 
snapshots being cleaned up during consumption.

I personally lean toward the first strategy, but the second is also valid.
 
Best regards,
Yunhong

On 2025/12/16 08:19:54 Yang Wang wrote:
> Hi Yunhong,
> 
> Thank you for driving this very useful initiative—it will significantly
> improve the stability of data consumption and the overall user experience.
> I generally agree with most aspects of the design, but I have a few minor
> questions:
> 
>    1.
> 
>    When Flink enumerates the snapshots it needs to subscribe to and
>    attempts to subscribe, what is the expected behavior if a snapshot gets
>    deleted just before the subscription request is sent? (This scenario might
>    occur in certain edge cases.)
>    2.
> 
>    Retaining only one snapshot by default could be risky. Currently,
>    neither the CoordinatorServer nor the TabletServer can fully guarantee that
>    snapshots registered in ZooKeeper are always reliable and valid (although
>    this is the design intention, there might still be undiscovered bugs).
>    3.
> 
>    I didn’t see any API for updating subscriptions. If a Flink job
>    experiences a prolonged failover for some reason, would the unread
>    snapshots expire as a result?
>    4.
> 
>    Related to the previous question: Is the TTL of a snapshot calculated
>    from the time it was registered, or from the last time a consumer updated
>    its subscription?
> 
> Best regards,
> Yang
> 
> yunhong Zheng <[email protected]> 于2025年12月11日周四 22:40写道:
> 
> > Hi all,
> >
> > Currently, for Fluss PrimaryKey Table, the number of kv snapshots retained
> > per bucket is controlled by the server
> > option `kv.snapshot.num-retained`  (default value: 1). If this value is set
> > too small, Kv snapshots that are being actively consumed may be deleted
> > while a consumer is still consuming them.
> >
> > This case will cause a Flink job which read PrimaryKey table fail and
> > cannot be restarted from its previous state.
> >
> > To avoid this case, the fluss server needs to be aware of which consumers
> > are actively consuming the corresponding kv snapshots, and can not delete
> > these kv snapshots that are currently being consumed.
> >
> > So, I'd like to propose FIP-22: Support Kv Snapshot Consumer[1].
> >
> > Any feedback are suggestions on this proposal are welcome!
> >
> > [1]:
> >
> > https://cwiki.apache.org/confluence/display/FLUSS/FIP-22+Support+Kv+Snapshot+Consumer
> >
> > Regards,
> > Yunhong
> >
> 

Reply via email to