ableegoldman commented on code in PR #18115:
URL: https://github.com/apache/kafka/pull/18115#discussion_r1881105321
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java:
##########
@@ -1326,6 +1376,50 @@ private void resetOffsets(final Set<TopicPartition>
partitions, final Exception
if (!seekToEnd.isEmpty()) {
mainConsumer.seekToEnd(seekToEnd);
}
+
+ if (!seekByDuration.isEmpty()) {
+ final long nowMs = time.milliseconds();
+ final Map<TopicPartition, Long> seekToTimestamps =
seekByDuration.entrySet().stream()
+ .map(e -> {
+ long seekMs = nowMs - e.getValue().toMillis();
+ if (seekMs < 0L) {
+ log.debug("Cannot reset offset to negative
timestamp {} for partition {}. Seeking to timestamp 0 instead.", seekMs,
e.getKey());
+ seekMs = 0L;
+ }
+ return Map.entry(e.getKey(), seekMs);
+ })
+ .collect(HashMap::new, (m, e) -> m.put(e.getKey(),
e.getValue()), Map::putAll);
+
+ try {
+ for (final Map.Entry<TopicPartition, OffsetAndTimestamp>
partitionAndOffset : mainConsumer.offsetsForTimes(seekToTimestamps).entrySet())
{
+ final TopicPartition partition =
partitionAndOffset.getKey();
+ final OffsetAndTimestamp seekOffset =
partitionAndOffset.getValue();
+ if (seekOffset != null) {
+ mainConsumer.seek(partition, new
OffsetAndMetadata(seekOffset.offset()));
+ } else {
+ log.debug(
+ "Cannot reset offset to non-existing timestamp
{} (larger than timestamp of last record)" +
+ " for partition {}. Seeking to end
instead.",
+ seekToTimestamps.get(partition),
+ partition
+ );
+
mainConsumer.seekToEnd(Collections.singleton(partitionAndOffset.getKey()));
+ }
+ }
+ } catch (final TimeoutException timeoutException) {
+ for (final TopicPartition partition :
seekByDuration.keySet()) {
+ final Task task = taskManager.getActiveTask(partition);
+ task.maybeInitTaskTimeoutOrThrow(now,
timeoutException);
+ stateUpdater.add(task);
Review Comment:
fine with me
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]