[
https://issues.apache.org/jira/browse/BEAM-14518?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17550040#comment-17550040
]
Danny McCormick commented on BEAM-14518:
----------------------------------------
This issue has been migrated to https://github.com/apache/beam/issues/21610
> Support for reading Kafka topics from any startReadTime in Java
> ---------------------------------------------------------------
>
> Key: BEAM-14518
> URL: https://issues.apache.org/jira/browse/BEAM-14518
> Project: Beam
> Issue Type: Bug
> Components: io-java-kafka
> Reporter: Balázs Németh
> Priority: P2
>
> [https://github.com/apache/beam/blob/fd8546355523f67eaddc22249606fdb982fe4938/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ConsumerSpEL.java#L180-L198]
>
> Right now the 'startReadTime' config for KafkaIO.Read looks up an offset in
> every topic partition that is newer or equal to that timestamp. The problem
> is that if we use a timestamp that is so new, that we don't have any
> newer/equal message in the partition. In that case the code fails with an
> exception. Meanwhile in certain cases it makes no sense as we could actually
> make it work.
> If we don't get an offset from calling `consumer.offsetsForTimes`, we should
> call `endOffsets`, and use the returned offset + 1. That is actually the
> offset we will have to read next time.
> Even if `endOffsets` can't return an offset we could use 0 as the offset to
> read from.
>
> Am I missing something here? Is it okay to contribute this?
--
This message was sent by Atlassian Jira
(v8.20.7#820007)