ableegoldman commented on a change in pull request #10683: URL: https://github.com/apache/kafka/pull/10683#discussion_r669252416
########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java ########## @@ -1216,39 +1245,26 @@ private void setRegexMatchedTopicToStateStore() { } } - public synchronized Pattern earliestResetTopicsPattern() { - return resetTopicsPattern(earliestResetTopics, earliestResetPatterns); + public boolean hasOffsetResetOverrides() { + return !(earliestResetTopics.isEmpty() && earliestResetPatterns.isEmpty() + && latestResetTopics.isEmpty() && latestResetPatterns.isEmpty()); } - public synchronized Pattern latestResetTopicsPattern() { - return resetTopicsPattern(latestResetTopics, latestResetPatterns); - } - - private Pattern resetTopicsPattern(final Set<String> resetTopics, - final Set<Pattern> resetPatterns) { - final List<String> topics = maybeDecorateInternalSourceTopics(resetTopics); - - return buildPattern(topics, resetPatterns); - } - - private static Pattern buildPattern(final Collection<String> sourceTopics, - final Collection<Pattern> sourcePatterns) { - final StringBuilder builder = new StringBuilder(); - - for (final String topic : sourceTopics) { - builder.append(topic).append("|"); - } - - for (final Pattern sourcePattern : sourcePatterns) { - builder.append(sourcePattern.pattern()).append("|"); - } - - if (builder.length() > 0) { - builder.setLength(builder.length() - 1); - return Pattern.compile(builder.toString()); + public OffsetResetStrategy offsetResetStrategy(final String topic) { + if (maybeDecorateInternalSourceTopics(earliestResetTopics).contains(topic) || + earliestResetPatterns.stream().anyMatch(p -> p.matcher(topic).matches())) { + return EARLIEST; + } else if (maybeDecorateInternalSourceTopics(latestResetTopics).contains(topic) || + latestResetPatterns.stream().anyMatch(p -> p.matcher(topic).matches())) { + return LATEST; + } else if (maybeDecorateInternalSourceTopics(sourceTopicNames).contains(topic) Review comment: The `NONE` case means we do have this topic in this InternalTopologyBuilder (as opposed to that of a different NamedTopology) but it hasn't set the offset reset strategy to EARLIEST or LATEST. If we fail the first two `if` conditions above, then all that's left is to verify whether or not we have this topic at all -- which is going to be true if we find it in either the source topic set or pattern. Maybe you were wondering about the `|| !hasNamedTopology()` part? Basically if we don't have any NamedTopologies then there is only one InternalTopologyBuilder, so all topics should belong to it -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org