mjsax commented on a change in pull request #8994:
URL: https://github.com/apache/kafka/pull/8994#discussion_r452995040



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##########
@@ -359,11 +359,8 @@ public static StreamThread create(final 
InternalTopologyBuilder builder,
         
consumerConfigs.put(StreamsConfig.InternalConfig.ASSIGNMENT_ERROR_CODE, 
assignmentErrorCode);
         final AtomicLong nextScheduledRebalanceMs = new 
AtomicLong(Long.MAX_VALUE);
         
consumerConfigs.put(StreamsConfig.InternalConfig.NEXT_SCHEDULED_REBALANCE_MS, 
nextScheduledRebalanceMs);
-        String originalReset = null;
-        if (!builder.latestResetTopicsPattern().pattern().equals("") || 
!builder.earliestResetTopicsPattern().pattern().equals("")) {
-            originalReset = (String) 
consumerConfigs.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG);
-            consumerConfigs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"none");
-        }

Review comment:
       If users specify an in-code overwrite via `Consumed` the user might do 
this only for _some_ topics. For all other topics, the configures reset policy 
should be used. Hence, we just "remember" the default config in `originalReset` 
and set the consumer config to "none".
   
   If there is no in-code overwrite, we don't need to remember the original 
strategy because all topics use the same strategy and thus we rely on the 
consumer anyway to do the reset for us.
   
   With this change, we now _always_ set the reset policy to "none". What is 
still possible of course, but it implies, we never rely on the consumer any 
longer to do any reset and we always to it manually. (This might be "cleaner" 
as it might be easier to reason about.) -- But the old code was correct, too.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to