vvcephei commented on a change in pull request #8994: URL: https://github.com/apache/kafka/pull/8994#discussion_r453107700
########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java ########## @@ -760,44 +763,62 @@ void runOnce() { try { records = mainConsumer.poll(pollTime); } catch (final InvalidOffsetException e) { - resetInvalidOffsets(e); + resetOffsets(e.partitions(), e); } return records; } - private void resetInvalidOffsets(final InvalidOffsetException e) { - final Set<TopicPartition> partitions = e.partitions(); + private void resetOffsets(final Set<TopicPartition> partitions, final Exception cause) { final Set<String> loggedTopics = new HashSet<>(); final Set<TopicPartition> seekToBeginning = new HashSet<>(); final Set<TopicPartition> seekToEnd = new HashSet<>(); + final Set<TopicPartition> notReset = new HashSet<>(); for (final TopicPartition partition : partitions) { if (builder.earliestResetTopicsPattern().matcher(partition.topic()).matches()) { addToResetList(partition, seekToBeginning, "Setting topic '{}' to consume from {} offset", "earliest", loggedTopics); } else if (builder.latestResetTopicsPattern().matcher(partition.topic()).matches()) { addToResetList(partition, seekToEnd, "Setting topic '{}' to consume from {} offset", "latest", loggedTopics); } else { - if (originalReset == null || (!originalReset.equals("earliest") && !originalReset.equals("latest"))) { - final String errorMessage = "No valid committed offset found for input topic %s (partition %s) and no valid reset policy configured." + - " You need to set configuration parameter \"auto.offset.reset\" or specify a topic specific reset " + - "policy via StreamsBuilder#stream(..., Consumed.with(Topology.AutoOffsetReset)) or StreamsBuilder#table(..., Consumed.with(Topology.AutoOffsetReset))"; - throw new StreamsException(String.format(errorMessage, partition.topic(), partition.partition()), e); - } - - if (originalReset.equals("earliest")) { + if ("earliest".equals(originalReset)) { addToResetList(partition, seekToBeginning, "No custom setting defined for topic '{}' using original config '{}' for offset reset", "earliest", loggedTopics); - } else { // can only be "latest" + } else if ("latest".equals(originalReset)) { addToResetList(partition, seekToEnd, "No custom setting defined for topic '{}' using original config '{}' for offset reset", "latest", loggedTopics); + } else { + notReset.add(partition); } } } - if (!seekToBeginning.isEmpty()) { - mainConsumer.seekToBeginning(seekToBeginning); - } - if (!seekToEnd.isEmpty()) { - mainConsumer.seekToEnd(seekToEnd); + if (notReset.isEmpty()) { + if (!seekToBeginning.isEmpty()) { Review comment: Huh, I didn't wonder that before, but ... dear god. From the javadoc on KafkaConsumer: > If no partitions are provided, seek to the first offset for all of the currently assigned partitions. What a dangerous API! ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org