Well, a sanity check to only allow startReadTime in the past seemed obvious
to me. I can't really think of any common use-case when you want to start
processing after a specific time in the future and only idle until that,
but the other direction has multiple.

The question regarding this IMO is how much of a "safety delay" from "now"
- if any - we have to keep if we were unable to find any newer offset. I
mean a message that is older than the startReadTime might be still in
processing and might not be available at the kafka cluster yet before
getting the offset. It has to be either without any check like that and let
the developers care about this, or it needs a filter that drops messages
prior to the timestamp.

Daniel Collins <dpcoll...@google.com> ezt írta (időpont: 2022. jún. 16.,
Cs, 0:19):

> This may or may not be reasonable. Lets assume I pick startReadTime =
> now + 1 minute. Your logic will start returning a lot of records with
> values == now + 1ms. Is that reasonable for users of this API? Maybe, maybe
> not.
>
> -Daniel
>
> On Wed, Jun 15, 2022 at 6:16 PM Balázs Németh <balazs.nem...@aliz.ai>
> wrote:
>
>> Not a single objection means my idea seems okay then? :)
>>
>> Balázs Németh <balazs.nem...@aliz.ai> ezt írta (időpont: 2022. máj. 26.,
>> Cs, 5:59):
>>
>>> https://issues.apache.org/jira/browse/BEAM-14518
>>>
>>>
>>> https://github.com/apache/beam/blob/fd8546355523f67eaddc22249606fdb982fe4938/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ConsumerSpEL.java#L180-L198
>>>
>>> Right now the 'startReadTime' config for KafkaIO.Read looks up an offset
>>> in every topic partition that is newer or equal to that timestamp. The
>>> problem is that if we use a timestamp that is so new, that we don't have
>>> any newer/equal message in the partition. In that case the code fails with
>>> an exception. Meanwhile in certain cases it makes no sense as we could
>>> actually make it work.
>>>
>>> If we don't get an offset from calling `consumer.offsetsForTimes`, we
>>> should call `endOffsets`, and use the returned offset + 1. That is actually
>>> the offset we will have to read next time.
>>>
>>> Even if `endOffsets` can't return an offset we could use 0 as the offset
>>> to read from.
>>>
>>>
>>>
>>> Am I missing something here? Is it okay to contribute this?
>>>
>>

Reply via email to