MarcoLotz commented on a change in pull request #10042: URL: https://github.com/apache/kafka/pull/10042#discussion_r579502090
########## File path: core/src/main/scala/kafka/tools/StreamsResetter.java ########## @@ -503,7 +506,16 @@ private void resetToDatetime(final Consumer<byte[], byte[]> client, final Map<TopicPartition, OffsetAndTimestamp> topicPartitionsAndOffset = client.offsetsForTimes(topicPartitionsAndTimes); for (final TopicPartition topicPartition : inputTopicPartitions) { - client.seek(topicPartition, topicPartitionsAndOffset.get(topicPartition).offset()); + final Optional<Long> partitionOffset = Optional.ofNullable(topicPartitionsAndOffset.get(topicPartition)) + .map(OffsetAndTimestamp::offset) + .filter(offset -> offset != ListOffsetsResponse.UNKNOWN_OFFSET); + if (partitionOffset.isPresent()) { + client.seek(topicPartition, partitionOffset.get()); + } else { + client.seekToEnd(Collections.singletonList(topicPartition)); + System.out.println("Partition " + topicPartition.partition() + " from topic " + topicPartition.topic() + + " is empty, without a committed record. Falling back to latest known offset."); Review comment: I see your point, I don't mind removing "without a committed record" part of the message. @jeqo This would have to be updated on the scala code too, since I saw that the messages are about the same. Would that be ok? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org