vvcephei commented on a change in pull request #8994:
URL: https://github.com/apache/kafka/pull/8994#discussion_r451860750
##########
File path:
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##########
@@ -766,6 +769,24 @@ void runOnce() {
return records;
}
+ private OffsetResetStrategy getResetStrategy(final TopicPartition
partition) {
+ if
(builder.earliestResetTopicsPattern().matcher(partition.topic()).matches()) {
+ return OffsetResetStrategy.EARLIEST;
+ } else if
(builder.latestResetTopicsPattern().matcher(partition.topic()).matches()) {
+ return OffsetResetStrategy.LATEST;
+ } else {
+ if (originalReset == null || (!originalReset.equals("earliest") &&
!originalReset.equals("latest"))) {
+ return OffsetResetStrategy.EARLIEST;
+ }
Review comment:
I honestly couldn't figure out what is the default default default reset
strategy... It seems (from the behavior of the test when we first start up)
that if there's no strategy set, and no committed offset, then the client
starts at the beginning, but the ClientConfig has the default policy as
"latest"... What gives?
##########
File path:
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -193,6 +197,28 @@ private void closeAndRevive(final Map<Task,
Collection<TopicPartition>> taskWith
log.error("Error suspending corrupted task {} ", task.id(),
swallow);
}
task.closeDirty();
+ // Pause so we won't poll any more records for this task until it
has been re-initialized
+ // Note, closeDirty already clears the partitiongroup for the task.
+ mainConsumer().pause(task.inputPartitions());
+ final Map<TopicPartition, OffsetAndMetadata> committed =
mainConsumer().committed(task.inputPartitions());
+ for (final TopicPartition topicPartition : task.inputPartitions())
{
+ final OffsetAndMetadata offsetAndMetadata =
committed.get(topicPartition);
+ if (offsetAndMetadata == null) {
+ final OffsetResetStrategy strategy =
resetStrategy.apply(topicPartition);
+ switch (strategy) {
+ case EARLIEST:
+
mainConsumer().seekToBeginning(Collections.singleton(topicPartition));
+ break;
+ case LATEST:
+
mainConsumer().seekToBeginning(Collections.singleton(topicPartition));
+ break;
+ default:
+ throw new IllegalArgumentException("Unexpected
reset strategy: " + strategy);
+ }
+ } else {
+ mainConsumer().seek(topicPartition, offsetAndMetadata);
+ }
+ }
Review comment:
This might be the worst thing I've ever proposed for AK... I can't
figure out a better way to just "reset" the offset.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]