Hi Hongshun Thanks for your explanation, I have got your point. I review the FLIP again and only have one minor comment which won't block this FLIP: should we need in `OffsetsInitializer newDiscoveryOffsetsInitializer` in the constructor of `KafkaSourceEnumerator`? I think we can remove it if we always use EARLIEST for new discovery partitions.
Best, Shammon FY On Tue, Apr 18, 2023 at 4:59 PM Hongshun Wang <loserwang1...@gmail.com> wrote: > Hi Shammon, > > Thank you for your advice.I have carefully considered whether to show this > in SQL DDL. Therefore, I carefully studied whether it is feasible Recently > > However, after reading the corresponding code more thoroughly, it appears > that SpecifiedOffsetsInitializer and TimestampOffsetsInitializer do not > work as we initially thought. Finally, I have decided to only use > "EARLIEST" instead of allowing the user to make a free choice. > > Now, let me show my new understanding. > > The actual work of SpecifiedOffsetsInitializer and > TimestampOffsetsInitializer: > > > - *SpecifiedOffsetsInitializer*: Use *Specified offset* for specified > partitions while use *EARLIEST* for unspecified partitions. Specified > partitions offset should be less than the latest offset, otherwise it > will > start from the *EARLIEST*. > - *TimestampOffsetsInitializer*: Initialize the offsets based on a > timestamp. If the message meeting the requirement of the timestamp have > not > been produced to Kafka yet, just use the *LATEST* offset. > > So, some problems will occur when new partition use > SpecifiedOffsetsInitializer or TimestampOffsetsInitializer. You can find > more information in the "Rejected Alternatives" section of Flip-288, which > includes details of the code and process of deductive reasoning. > All these problems can be reproducible in the current version. The reason > why they haven't been exposed is probably because users usually set the > existing specified offset or timestamp, so it appears as earliest in > production. > > WDYT? > CC:Ruan, Shammon, Gordon, Leonard and Qingsheng. > > Yours > > Hongshun > > > > > On Fri, Apr 14, 2023 at 5:48 PM Shammon FY <zjur...@gmail.com> wrote: > > > Hi Hongshun > > > > Thanks for updating the FLIP, it totally sounds good to me. > > > > I just have one comment: How does sql job set new discovery offsets > > initializer? > > I found `DataStream` jobs can set different offsets initializers for new > > discovery partitions in `KafkaSourceBuilder.setNewDiscoveryOffsets`. Do > SQL > > jobs need to support this feature? > > > > Best, > > Shammon FY > > > > On Wed, Apr 12, 2023 at 2:27 PM Hongshun Wang <loserwang1...@gmail.com> > > wrote: > > > > > Hi everyone, > > > > > > I have already modified FLIP-288 to provide a > > > newDiscoveryOffsetsInitializer in the KafkaSourceBuilder and > > > KafkaSourceEnumerator. Users can use > > > KafkaSourceBuilder#setNewDiscoveryOffsets to change the strategy for > new > > > partitions. > > > > > > Surely, enabling the partition discovery strategy by default and > > modifying > > > the offset strategy for new partitions should be brought to the user's > > > attention. Therefore, it will be explained in the 1.18 release notes. > > > > > > WDYT?CC, Ruan, Shammon, Gordon and Leonard. > > > > > > > > > Best, > > > > > > Hongshun > > > > > > On Fri, Mar 31, 2023 at 2:56 PM Hongshun Wang <loserwang1...@gmail.com > > > > > wrote: > > > > > > > Hi everyone, > > > > Thanks for your participation. > > > > > > > > @Gordon, I looked at the several questions you raised: > > > > > > > > 1. Should we use the firstDiscovery flag or two separate > > > > OffsetsInitializers? Actually, I have considered later. If we > follow > > > > my initial idea, we can provide a default earliest > > OffsetsInitializer > > > > for a new partition. However, According to @Shammon's suggestion, > > > different > > > > startup OffsetsInitializers correspond to different post-startup > > > > OffsetsInitializers for Flink's built-in offset strategies. > > > > 2. "Future-time" TIMESTAMP OffsetsInitializer. I looked at the > code > > > > again, and it seems that neither @Shammon nor I have figured out . > > > > TimestampOffsetsInitializer#getPartitionOffsets has a comment: > > "First > > > > get the current end offsets of the partitions. This is going to be > > > used in > > > > case we cannot find a suitable offset based on the timestamp, > i.e., > > > the > > > > message meeting the requirement of the timestamp has not been > > > produced to > > > > Kafka yet. *In this case, we just use the latest offset*." > > Therefore, > > > > using the TimestampOffsetsInitializer will always have an offset > at > > > > startup. > > > > 3. Clarification on coupling SPECIFIC-OFFSET startup with > > > > SPECIFIC-OFFSET post-startup. SPECIFIC-OFFSET strategy already > uses > > > > "auto.offset.reset" position for partitions that are not hit. > > > > > > > > @Gordon, @Shammon, @Leonard, the core issue we are concerned about is > > > > whether the offset specified at the beginning includes non-exist > > > > partitions. The previous design may have SPECIFIC-OFFSET startup with > > > > future partition. However, I think since different strategies have > been > > > > used for the first discovered partition and the later discovered > > > partition, > > > > the specified offset at startup should be the partitions that have > been > > > > confirmed to exist, if not an error will be thrown. If partitions > still > > > not > > > > exist, it should be specified in the post-startup OffsetsInitializers > > > > (default EARLIEST). > > > > > > > > Best > > > > Hongshun > > > > > > > > > > > > On Thu, Mar 30, 2023 at 1:43 PM Shammon FY <zjur...@gmail.com> > wrote: > > > > > > > >> Thanks Gordon and Leonard > > > >> > > > >> I'm sorry that there is no specific case from my side, but I > consider > > > the > > > >> issue as follows > > > >> > > > >> 1. Users may set an offset later than the current time because Flink > > > does > > > >> not limit it > > > >> 2. If we use EARLIEST for a newly discovered partition with > different > > > >> OFFSETs, which may be different from the previous strategy. I think > > it's > > > >> best to keep the same strategy as before if it does not cause data > > > losing > > > >> 3. I think support different OFFSETs in the FLIP will not make the > > > >> implementation more complexity > > > >> > > > >> Of course, if it is confirmed that this is an illegal Timestamp > OFFSET > > > and > > > >> Flink validate it. Then we can use the same strategy to apply to the > > > newly > > > >> discovered partition, I think this will be nice too > > > >> > > > >> Best, > > > >> Shammon FY > > > >> > > > >> > > > >> On Thu, Mar 30, 2023 at 12:29 PM Leonard Xu <xbjt...@gmail.com> > > wrote: > > > >> > > > >> > Thanks Hongshun and Shammon for driving the FLIP! > > > >> > > > > >> > > > > >> > > *2. Clarification on "future-time" TIMESTAMP OffsetsInitializer* > > > >> > > *3. Clarification on coupling SPECIFIC-OFFSET startup with > > > >> > SPECIFIC-OFFSET > > > >> > > post-startup* > > > >> > > > > >> > Grodan raised a good point about the future TIMESTAMP and > > > >> SPECIFIC-OFFSET, > > > >> > the timestamps/offset of the newly added partition is undetermined > > > when > > > >> the > > > >> > job starts (the partition has not been created yet), and it is the > > > >> > timestamps/offset in the future. > > > >> > > > > >> > I used many message queue systems like Kafka, Pulsar, xxMQ. In my > > > past > > > >> > experience, TIMESTAMP and SPECIFIC-OFFSET startup modes are > usually > > > >> used > > > >> > to specify existing timestamps/offset, which are used for business > > > >> > scenarios such as backfilling data and re-refreshing data. At > > present, > > > >> It's > > > >> > hard to imagine a user scenario specifying a future timestamp to > > > filter > > > >> > data in the current topic of message queue system. Is it > > overthinking > > > to > > > >> > consider future future TIMESTAMP and SPECIFIC-OFFSET? > > > >> > > > > >> > > > > >> > Best, > > > >> > Leonard > > > >> > > > > > > > > > >