Do you mean if the "startReadTime" as a configuration on its own shifts the watermark already even if we receive an older entry first? That I do not know, but based on the code I would say it does not. So if I get it right, if it would, then my original idea would work fine, and we have to think further only if this doesn't happen?
For SDF: @GetInitialWatermarkEstimatorState is the one that has to consider startReadTime(), similarly to https://github.com/apache/beam/blob/25ca4f0fddd011bfc593e72eea6d32c040808b29/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/ReadChangeStreamPartitionDoFn.java#L103-L106 ? For legacy read: Setting the PartitionState.lastWatermark (not only the nextOffset) in org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.setupInitialOffset(PartitionState) and/or modifying the return value of org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.getWatermark() should be it then? Kenneth Knowles <[email protected]> ezt írta (időpont: 2022. jún. 16., Cs, 23:57): > These last two emails make sense to me, from both of you. > > Does startReadTime impact the watermark in the right way so that late data > is dropped, or whatnot? Because even if you have a partition and you _do_ > find an offset that works, late data could come in. So if you restrict the > time to be in the past, and you treat late data consistently across > partitions, then it will not be observable. > > I think it would be good to try to come up with some way of describing > this as an invariant like "the observed behavior of startReadTime is the > same even when X Y Z are different". > > Kenn > > On Wed, Jun 15, 2022 at 3:43 PM Balázs Németh <[email protected]> > wrote: > >> Well, a sanity check to only allow startReadTime in the past seemed >> obvious to me. I can't really think of any common use-case when you want to >> start processing after a specific time in the future and only idle until >> that, but the other direction has multiple. >> >> The question regarding this IMO is how much of a "safety delay" from >> "now" - if any - we have to keep if we were unable to find any newer >> offset. I mean a message that is older than the startReadTime might be >> still in processing and might not be available at the kafka cluster yet >> before getting the offset. It has to be either without any check like that >> and let the developers care about this, or it needs a filter that drops >> messages prior to the timestamp. >> >> Daniel Collins <[email protected]> ezt írta (időpont: 2022. jún. 16., >> Cs, 0:19): >> >>> This may or may not be reasonable. Lets assume I pick startReadTime = >>> now + 1 minute. Your logic will start returning a lot of records with >>> values == now + 1ms. Is that reasonable for users of this API? Maybe, maybe >>> not. >>> >>> -Daniel >>> >>> On Wed, Jun 15, 2022 at 6:16 PM Balázs Németh <[email protected]> >>> wrote: >>> >>>> Not a single objection means my idea seems okay then? :) >>>> >>>> Balázs Németh <[email protected]> ezt írta (időpont: 2022. máj. >>>> 26., Cs, 5:59): >>>> >>>>> https://issues.apache.org/jira/browse/BEAM-14518 >>>>> >>>>> >>>>> https://github.com/apache/beam/blob/fd8546355523f67eaddc22249606fdb982fe4938/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ConsumerSpEL.java#L180-L198 >>>>> >>>>> Right now the 'startReadTime' config for KafkaIO.Read looks up an >>>>> offset in every topic partition that is newer or equal to that timestamp. >>>>> The problem is that if we use a timestamp that is so new, that we don't >>>>> have any newer/equal message in the partition. In that case the code fails >>>>> with an exception. Meanwhile in certain cases it makes no sense as we >>>>> could >>>>> actually make it work. >>>>> >>>>> If we don't get an offset from calling `consumer.offsetsForTimes`, we >>>>> should call `endOffsets`, and use the returned offset + 1. That is >>>>> actually >>>>> the offset we will have to read next time. >>>>> >>>>> Even if `endOffsets` can't return an offset we could use 0 as the >>>>> offset to read from. >>>>> >>>>> >>>>> >>>>> Am I missing something here? Is it okay to contribute this? >>>>> >>>>
