ableegoldman commented on a change in pull request #10683:
URL: https://github.com/apache/kafka/pull/10683#discussion_r669252416



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
##########
@@ -1216,39 +1245,26 @@ private void setRegexMatchedTopicToStateStore() {
         }
     }
 
-    public synchronized Pattern earliestResetTopicsPattern() {
-        return resetTopicsPattern(earliestResetTopics, earliestResetPatterns);
+    public boolean hasOffsetResetOverrides() {
+        return !(earliestResetTopics.isEmpty() && 
earliestResetPatterns.isEmpty()
+            && latestResetTopics.isEmpty() && latestResetPatterns.isEmpty());
     }
 
-    public synchronized Pattern latestResetTopicsPattern() {
-        return resetTopicsPattern(latestResetTopics, latestResetPatterns);
-    }
-
-    private Pattern resetTopicsPattern(final Set<String> resetTopics,
-                                       final Set<Pattern> resetPatterns) {
-        final List<String> topics = 
maybeDecorateInternalSourceTopics(resetTopics);
-
-        return buildPattern(topics, resetPatterns);
-    }
-
-    private static Pattern buildPattern(final Collection<String> sourceTopics,
-                                        final Collection<Pattern> 
sourcePatterns) {
-        final StringBuilder builder = new StringBuilder();
-
-        for (final String topic : sourceTopics) {
-            builder.append(topic).append("|");
-        }
-
-        for (final Pattern sourcePattern : sourcePatterns) {
-            builder.append(sourcePattern.pattern()).append("|");
-        }
-
-        if (builder.length() > 0) {
-            builder.setLength(builder.length() - 1);
-            return Pattern.compile(builder.toString());
+    public OffsetResetStrategy offsetResetStrategy(final String topic) {
+        if 
(maybeDecorateInternalSourceTopics(earliestResetTopics).contains(topic) ||
+            earliestResetPatterns.stream().anyMatch(p -> 
p.matcher(topic).matches())) {
+            return EARLIEST;
+        } else if 
(maybeDecorateInternalSourceTopics(latestResetTopics).contains(topic) ||
+            latestResetPatterns.stream().anyMatch(p -> 
p.matcher(topic).matches())) {
+            return LATEST;
+        } else if 
(maybeDecorateInternalSourceTopics(sourceTopicNames).contains(topic)

Review comment:
       The `NONE` case means we do have this topic in this 
InternalTopologyBuilder (as opposed to that of a different NamedTopology) but 
it hasn't set the offset reset strategy to EARLIEST or LATEST. If we fail the 
first two `if` conditions above, then all that's left is to verify whether or 
not we have this topic at all -- which is going to be true if we find it in 
either the source topic set or pattern.
   
   Maybe you were wondering about the `|| !hasNamedTopology()` part? Basically 
if we don't have any NamedTopologies then there is only one 
InternalTopologyBuilder, so all topics should belong to it




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to