mjsax commented on a change in pull request #8994:
URL: https://github.com/apache/kafka/pull/8994#discussion_r453131671



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##########
@@ -760,44 +763,62 @@ void runOnce() {
         try {
             records = mainConsumer.poll(pollTime);
         } catch (final InvalidOffsetException e) {
-            resetInvalidOffsets(e);
+            resetOffsets(e.partitions(), e);
         }
 
         return records;
     }
 
-    private void resetInvalidOffsets(final InvalidOffsetException e) {
-        final Set<TopicPartition> partitions = e.partitions();
+    private void resetOffsets(final Set<TopicPartition> partitions, final 
Exception cause) {
         final Set<String> loggedTopics = new HashSet<>();
         final Set<TopicPartition> seekToBeginning = new HashSet<>();
         final Set<TopicPartition> seekToEnd = new HashSet<>();
+        final Set<TopicPartition> notReset = new HashSet<>();
 
         for (final TopicPartition partition : partitions) {
             if 
(builder.earliestResetTopicsPattern().matcher(partition.topic()).matches()) {
                 addToResetList(partition, seekToBeginning, "Setting topic '{}' 
to consume from {} offset", "earliest", loggedTopics);
             } else if 
(builder.latestResetTopicsPattern().matcher(partition.topic()).matches()) {
                 addToResetList(partition, seekToEnd, "Setting topic '{}' to 
consume from {} offset", "latest", loggedTopics);
             } else {
-                if (originalReset == null || 
(!originalReset.equals("earliest") && !originalReset.equals("latest"))) {
-                    final String errorMessage = "No valid committed offset 
found for input topic %s (partition %s) and no valid reset policy configured." +
-                        " You need to set configuration parameter 
\"auto.offset.reset\" or specify a topic specific reset " +
-                        "policy via StreamsBuilder#stream(..., 
Consumed.with(Topology.AutoOffsetReset)) or StreamsBuilder#table(..., 
Consumed.with(Topology.AutoOffsetReset))";
-                    throw new StreamsException(String.format(errorMessage, 
partition.topic(), partition.partition()), e);
-                }
-
-                if (originalReset.equals("earliest")) {
+                if ("earliest".equals(originalReset)) {
                     addToResetList(partition, seekToBeginning, "No custom 
setting defined for topic '{}' using original config '{}' for offset reset", 
"earliest", loggedTopics);
-                } else { // can only be "latest"
+                } else if ("latest".equals(originalReset)) {
                     addToResetList(partition, seekToEnd, "No custom setting 
defined for topic '{}' using original config '{}' for offset reset", "latest", 
loggedTopics);
+                } else {
+                    notReset.add(partition);
                 }
             }
         }
 
-        if (!seekToBeginning.isEmpty()) {
-            mainConsumer.seekToBeginning(seekToBeginning);
-        }
-        if (!seekToEnd.isEmpty()) {
-            mainConsumer.seekToEnd(seekToEnd);
+        if (notReset.isEmpty()) {
+            if (!seekToBeginning.isEmpty()) {

Review comment:
       Awesome! Thanks for double checking!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to