ableegoldman commented on a change in pull request #10683:
URL: https://github.com/apache/kafka/pull/10683#discussion_r669252416
##########
File path:
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
##########
@@ -1216,39 +1245,26 @@ private void setRegexMatchedTopicToStateStore() {
}
}
- public synchronized Pattern earliestResetTopicsPattern() {
- return resetTopicsPattern(earliestResetTopics, earliestResetPatterns);
+ public boolean hasOffsetResetOverrides() {
+ return !(earliestResetTopics.isEmpty() &&
earliestResetPatterns.isEmpty()
+ && latestResetTopics.isEmpty() && latestResetPatterns.isEmpty());
}
- public synchronized Pattern latestResetTopicsPattern() {
- return resetTopicsPattern(latestResetTopics, latestResetPatterns);
- }
-
- private Pattern resetTopicsPattern(final Set<String> resetTopics,
- final Set<Pattern> resetPatterns) {
- final List<String> topics =
maybeDecorateInternalSourceTopics(resetTopics);
-
- return buildPattern(topics, resetPatterns);
- }
-
- private static Pattern buildPattern(final Collection<String> sourceTopics,
- final Collection<Pattern>
sourcePatterns) {
- final StringBuilder builder = new StringBuilder();
-
- for (final String topic : sourceTopics) {
- builder.append(topic).append("|");
- }
-
- for (final Pattern sourcePattern : sourcePatterns) {
- builder.append(sourcePattern.pattern()).append("|");
- }
-
- if (builder.length() > 0) {
- builder.setLength(builder.length() - 1);
- return Pattern.compile(builder.toString());
+ public OffsetResetStrategy offsetResetStrategy(final String topic) {
+ if
(maybeDecorateInternalSourceTopics(earliestResetTopics).contains(topic) ||
+ earliestResetPatterns.stream().anyMatch(p ->
p.matcher(topic).matches())) {
+ return EARLIEST;
+ } else if
(maybeDecorateInternalSourceTopics(latestResetTopics).contains(topic) ||
+ latestResetPatterns.stream().anyMatch(p ->
p.matcher(topic).matches())) {
+ return LATEST;
+ } else if
(maybeDecorateInternalSourceTopics(sourceTopicNames).contains(topic)
Review comment:
The `NONE` case means we do have this topic in this
InternalTopologyBuilder (as opposed to that of a different NamedTopology) but
it hasn't set the offset reset strategy to EARLIEST or LATEST. If we fail the
first two `if` conditions above, then all that's left is to verify whether or
not we have this topic at all -- which is going to be true if we find it in
either the source topic set or pattern.
Maybe you were wondering about the `|| !hasNamedTopology()` part? Basically
if we don't have any NamedTopologies then there is only one
InternalTopologyBuilder, so all topics should belong to it
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]