https://issues.apache.org/jira/browse/BEAM-14518

https://github.com/apache/beam/blob/fd8546355523f67eaddc22249606fdb982fe4938/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ConsumerSpEL.java#L180-L198

Right now the 'startReadTime' config for KafkaIO.Read looks up an offset in
every topic partition that is newer or equal to that timestamp. The problem
is that if we use a timestamp that is so new, that we don't have any
newer/equal message in the partition. In that case the code fails with an
exception. Meanwhile in certain cases it makes no sense as we could
actually make it work.

If we don't get an offset from calling `consumer.offsetsForTimes`, we
should call `endOffsets`, and use the returned offset + 1. That is actually
the offset we will have to read next time.

Even if `endOffsets` can't return an offset we could use 0 as the offset to
read from.



Am I missing something here? Is it okay to contribute this?

Reply via email to