Qingsheng, For the scenario described by Mason in the original email, I think it is safe to remove split/topic upson recovery without worrying about data loss, since it is a conscious choice by the user to switch to a different set of topics.
I thought the problem is that KafkaSourceReader just restores the splits/partitions and reads from them without checking if they are still valid (belong to subscribed topics). Not sure if this requires change in the Kafka source checkpointing or not. Currently, I believe both the enumerator and readers checkpoint their own states. if readers don't checkpoint and always wait for the enumerator to assign splits/partitions upon recovery, this may be easier as the filter/check can just be done by the enumerator. Thanks, Steven On Tue, Nov 16, 2021 at 7:17 PM Qingsheng Ren <renqs...@gmail.com> wrote: > Hi Mason, > > Sorry for my late response! > > “there was no logic to filter/remove splits” > > > Yes we indeed miss a split removal mechanism. Actually this is quite a > tricky one considering exactly-once semantic: there’s risk of losing data > if we remove a partition / topic from Kafka. There was a discussion about > this topic in the user mailing list: > https://lists.apache.org/thread/7r4h7v5k281w9cnbfw9lb8tp56r30lwt > > An immature solution in my mind is that we can remove a split with the > help of watermark. Once the watermark in a split has been pushed to end of > global window, then we can assume that there’s no more new records in the > split and we can remove it safely. But, this will invalidate all previous > checkpoints because these split might not exist anymore in the external > system (like topic has been removed in Kafka). > > Hope this could answer your question and looking forward to your inspiring > ideas! > > -- > Best Regards, > > Qingsheng Ren > Email: renqs...@gmail.com > On Nov 10, 2021, 11:32 PM +0800, Mason Chen <mas.chen6...@gmail.com>, > wrote: > > > there was no logic to filter/remove splits > >