Do you mean if the "startReadTime" as a configuration on its own shifts the
watermark already even if we receive an older entry first? That I do not
know, but based on the code I would say it does not. So if I get it right,
if it would, then my original idea would work fine, and we have to think
further only if this doesn't happen?

For SDF:
@GetInitialWatermarkEstimatorState is the one that has to consider
startReadTime(), similarly to
https://github.com/apache/beam/blob/25ca4f0fddd011bfc593e72eea6d32c040808b29/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/ReadChangeStreamPartitionDoFn.java#L103-L106
?

For legacy read:
Setting the PartitionState.lastWatermark (not only the nextOffset) in
org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.setupInitialOffset(PartitionState)
and/or modifying the return value
of org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.getWatermark() should
be it then?



Kenneth Knowles <[email protected]> ezt írta (időpont: 2022. jún. 16., Cs,
23:57):

> These last two emails make sense to me, from both of you.
>
> Does startReadTime impact the watermark in the right way so that late data
> is dropped, or whatnot? Because even if you have a partition and you _do_
> find an offset that works, late data could come in. So if you restrict the
> time to be in the past, and you treat late data consistently across
> partitions, then it will not be observable.
>
> I think it would be good to try to come up with some way of describing
> this as an invariant like "the observed behavior of startReadTime is the
> same even when X Y Z are different".
>
> Kenn
>
> On Wed, Jun 15, 2022 at 3:43 PM Balázs Németh <[email protected]>
> wrote:
>
>> Well, a sanity check to only allow startReadTime in the past seemed
>> obvious to me. I can't really think of any common use-case when you want to
>> start processing after a specific time in the future and only idle until
>> that, but the other direction has multiple.
>>
>> The question regarding this IMO is how much of a "safety delay" from
>> "now" - if any - we have to keep if we were unable to find any newer
>> offset. I mean a message that is older than the startReadTime might be
>> still in processing and might not be available at the kafka cluster yet
>> before getting the offset. It has to be either without any check like that
>> and let the developers care about this, or it needs a filter that drops
>> messages prior to the timestamp.
>>
>> Daniel Collins <[email protected]> ezt írta (időpont: 2022. jún. 16.,
>> Cs, 0:19):
>>
>>> This may or may not be reasonable. Lets assume I pick startReadTime =
>>> now + 1 minute. Your logic will start returning a lot of records with
>>> values == now + 1ms. Is that reasonable for users of this API? Maybe, maybe
>>> not.
>>>
>>> -Daniel
>>>
>>> On Wed, Jun 15, 2022 at 6:16 PM Balázs Németh <[email protected]>
>>> wrote:
>>>
>>>> Not a single objection means my idea seems okay then? :)
>>>>
>>>> Balázs Németh <[email protected]> ezt írta (időpont: 2022. máj.
>>>> 26., Cs, 5:59):
>>>>
>>>>> https://issues.apache.org/jira/browse/BEAM-14518
>>>>>
>>>>>
>>>>> https://github.com/apache/beam/blob/fd8546355523f67eaddc22249606fdb982fe4938/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ConsumerSpEL.java#L180-L198
>>>>>
>>>>> Right now the 'startReadTime' config for KafkaIO.Read looks up an
>>>>> offset in every topic partition that is newer or equal to that timestamp.
>>>>> The problem is that if we use a timestamp that is so new, that we don't
>>>>> have any newer/equal message in the partition. In that case the code fails
>>>>> with an exception. Meanwhile in certain cases it makes no sense as we 
>>>>> could
>>>>> actually make it work.
>>>>>
>>>>> If we don't get an offset from calling `consumer.offsetsForTimes`, we
>>>>> should call `endOffsets`, and use the returned offset + 1. That is 
>>>>> actually
>>>>> the offset we will have to read next time.
>>>>>
>>>>> Even if `endOffsets` can't return an offset we could use 0 as the
>>>>> offset to read from.
>>>>>
>>>>>
>>>>>
>>>>> Am I missing something here? Is it okay to contribute this?
>>>>>
>>>>

Reply via email to