mjsax commented on a change in pull request #8994: URL: https://github.com/apache/kafka/pull/8994#discussion_r452995040
########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java ########## @@ -359,11 +359,8 @@ public static StreamThread create(final InternalTopologyBuilder builder, consumerConfigs.put(StreamsConfig.InternalConfig.ASSIGNMENT_ERROR_CODE, assignmentErrorCode); final AtomicLong nextScheduledRebalanceMs = new AtomicLong(Long.MAX_VALUE); consumerConfigs.put(StreamsConfig.InternalConfig.NEXT_SCHEDULED_REBALANCE_MS, nextScheduledRebalanceMs); - String originalReset = null; - if (!builder.latestResetTopicsPattern().pattern().equals("") || !builder.earliestResetTopicsPattern().pattern().equals("")) { - originalReset = (String) consumerConfigs.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG); - consumerConfigs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none"); - } Review comment: If users specify an in-code overwrite via `Consumed` the user might do this only for _some_ topics. For all other topics, the configures reset policy should be used. Hence, we just "remember" the default config in `originalReset` and set the consumer config to "none". If there is no in-code overwrite, we don't need to remember the original strategy because all topics use the same strategy and thus we rely on the consumer anyway to do the reset for us. With this change, we now _always_ set the reset policy to "none". What is still possible of course, but it implies, we never rely on the consumer any longer to do any reset and we always to it manually. (This might be "cleaner" as it might be easier to reason about.) -- But the old code was correct, too. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org