cadonna commented on a change in pull request #9582:
URL: https://github.com/apache/kafka/pull/9582#discussion_r522027942
##########
File path:
streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
##########
@@ -314,6 +317,50 @@ public void buildAndOptimizeTopology(final Properties
props) {
internalTopologyBuilder.validateCopartition();
}
+ private void mergeDuplicateSourceNodes() {
+ final Map<String, StreamSourceNode<?, ?>> topicsToSourceNodes = new
HashMap<>();
+
+ // We don't really care about the order here, but since Pattern does
not implement equals() we can't rely on
+ // a regular HashMap and containsKey(Pattern). But for our purposes
it's sufficient to compare the compiled
+ // string and flags to determine if two pattern subscriptions can be
merged into a single source node
+ final Map<Pattern, StreamSourceNode<?, ?>> patternsToSourceNodes =
+ new
TreeMap<>(Comparator.comparing(Pattern::pattern).thenComparing(Pattern::flags));
Review comment:
Just to be clear. This improves the situation but it is not a complete
solution, right? Assume we have a topic `topicA`. Patterns `topic*` and `topi*`
both match `topicA` but they are different when compared with this comparator.
In that case a `TopologyException` would be thrown in the
`InternalTopologyBuilder`, right?
##########
File path:
streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamSourceNode.java
##########
@@ -71,6 +78,21 @@ public Pattern topicPattern() {
return consumedInternal.valueSerde();
}
+ // We "merge" source nodes into a single node under the hood if a user
tries to read in a source topic multiple times
+ public void merge(final StreamSourceNode<?, ?> other) {
+ final AutoOffsetReset resetPolicy =
consumedInternal.offsetResetPolicy();
+ if (resetPolicy != null &&
!resetPolicy.equals(other.consumedInternal().offsetResetPolicy())) {
+ log.error("Tried to merge source nodes {} and {} which are
subscribed to the same topic/pattern, but "
+ + "the offset reset policies do not match", this,
other);
+ throw new TopologyException("Can't configure different offset
reset policies on the same input topic(s)");
+ }
+ for (final StreamsGraphNode otherChild : other.children()) {
+ // Move children from other to this, these calls take care of
resetting the child's parents to this
Review comment:
Do we really need this comment and the comment on line 81. We get the
same information when we navigate to the call and to the implementation of the
methods with the difference that comments can start to be outdated without us
noticing it.
##########
File path:
streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
##########
@@ -895,6 +898,55 @@ public void
shouldUseSpecifiedNameForAggregateOperationGivenTable() {
STREAM_OPERATION_NAME);
}
+ @Test
+ public void shouldAllowReadingFromSameTopic() {
+ builder.stream("topic");
+ builder.stream("topic");
+ builder.build();
+ }
+
+ @Test
+ public void shouldAllowSubscribingToSamePattern() {
+ builder.stream(Pattern.compile("some-regex"));
+ builder.stream(Pattern.compile("some-regex"));
+ builder.build();
+ }
+
+ @Test
+ public void shouldAllowReadingFromSameCollectionOfTopics() {
+ builder.stream(Collections.singletonList("topic"));
+ builder.stream(Collections.singletonList("topic"));
Review comment:
Please use a collection with at least two topics to test the loop over
the collections.
##########
File path:
streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
##########
@@ -895,6 +898,55 @@ public void
shouldUseSpecifiedNameForAggregateOperationGivenTable() {
STREAM_OPERATION_NAME);
}
+ @Test
+ public void shouldAllowReadingFromSameTopic() {
+ builder.stream("topic");
+ builder.stream("topic");
+ builder.build();
+ }
+
+ @Test
+ public void shouldAllowSubscribingToSamePattern() {
+ builder.stream(Pattern.compile("some-regex"));
+ builder.stream(Pattern.compile("some-regex"));
+ builder.build();
+ }
+
+ @Test
+ public void shouldAllowReadingFromSameCollectionOfTopics() {
+ builder.stream(Collections.singletonList("topic"));
+ builder.stream(Collections.singletonList("topic"));
+ builder.build();
+ }
+
+ @Test
+ public void
shouldNotAllowReadingFromOverlappingAndUnequalCollectionOfTopics() {
+ builder.stream(Collections.singletonList("topic"));
+ builder.stream(asList("topic", "anotherTopic"));
+ assertThrows(TopologyException.class, builder::build);
+ }
+
+ @Test
+ public void shouldThrowWhenSubscribedToATopicWithDifferentResetPolicies() {
+ builder.stream("topic", Consumed.with(AutoOffsetReset.EARLIEST));
+ builder.stream("topic", Consumed.with(AutoOffsetReset.LATEST));
+ assertThrows(TopologyException.class, builder::build);
+ }
+
+ @Test
+ public void
shouldThrowWhenSubscribedToATopicWithSetAndUnsetResetPolicies() {
+ builder.stream("topic", Consumed.with(AutoOffsetReset.EARLIEST));
+ builder.stream("topic");
+ assertThrows(TopologyException.class, builder::build);
+ }
+
+ @Test
+ public void
shouldThrowWhenSubscribedToAPatternWithDifferentResetPolicies() {
+ builder.stream(Pattern.compile("some-regex"),
Consumed.with(AutoOffsetReset.EARLIEST));
+ builder.stream(Pattern.compile("some-regex"),
Consumed.with(AutoOffsetReset.LATEST));
+ assertThrows(TopologyException.class, builder::build);
+ }
+
Review comment:
Could you also add a test with two patterns with the same string but one
with a set reset policy and one with unset reset policy like you did for the
non-pattern case. Just to make it clear it should also throw in that case.
##########
File path:
streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
##########
@@ -895,6 +898,55 @@ public void
shouldUseSpecifiedNameForAggregateOperationGivenTable() {
STREAM_OPERATION_NAME);
}
+ @Test
+ public void shouldAllowReadingFromSameTopic() {
+ builder.stream("topic");
+ builder.stream("topic");
+ builder.build();
+ }
+
+ @Test
+ public void shouldAllowSubscribingToSamePattern() {
+ builder.stream(Pattern.compile("some-regex"));
+ builder.stream(Pattern.compile("some-regex"));
+ builder.build();
+ }
+
+ @Test
+ public void shouldAllowReadingFromSameCollectionOfTopics() {
+ builder.stream(Collections.singletonList("topic"));
+ builder.stream(Collections.singletonList("topic"));
+ builder.build();
+ }
+
+ @Test
+ public void
shouldNotAllowReadingFromOverlappingAndUnequalCollectionOfTopics() {
+ builder.stream(Collections.singletonList("topic"));
+ builder.stream(asList("topic", "anotherTopic"));
+ assertThrows(TopologyException.class, builder::build);
+ }
+
+ @Test
+ public void shouldThrowWhenSubscribedToATopicWithDifferentResetPolicies() {
+ builder.stream("topic", Consumed.with(AutoOffsetReset.EARLIEST));
+ builder.stream("topic", Consumed.with(AutoOffsetReset.LATEST));
+ assertThrows(TopologyException.class, builder::build);
+ }
+
+ @Test
+ public void
shouldThrowWhenSubscribedToATopicWithSetAndUnsetResetPolicies() {
+ builder.stream("topic", Consumed.with(AutoOffsetReset.EARLIEST));
+ builder.stream("topic");
+ assertThrows(TopologyException.class, builder::build);
+ }
Review comment:
What should happen in this case? See also my comment in `merge()`.
```
@Test
public void
shouldThrowWhenSubscribedToATopicWithSetAndUnsetResetPolicies() {
builder.stream("topic");
builder.stream("topic", Consumed.with(AutoOffsetReset.EARLIEST));
assertThrows(TopologyException.class, builder::build);
}
```
##########
File path:
streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamSourceNode.java
##########
@@ -71,6 +78,21 @@ public Pattern topicPattern() {
return consumedInternal.valueSerde();
}
+ // We "merge" source nodes into a single node under the hood if a user
tries to read in a source topic multiple times
+ public void merge(final StreamSourceNode<?, ?> other) {
+ final AutoOffsetReset resetPolicy =
consumedInternal.offsetResetPolicy();
+ if (resetPolicy != null &&
!resetPolicy.equals(other.consumedInternal().offsetResetPolicy())) {
Review comment:
What should happen if this reset policy is `null` and the other is not
`null`? I guess we should also throw in that case, don't we?
##########
File path:
streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
##########
@@ -314,6 +317,50 @@ public void buildAndOptimizeTopology(final Properties
props) {
internalTopologyBuilder.validateCopartition();
}
+ private void mergeDuplicateSourceNodes() {
+ final Map<String, StreamSourceNode<?, ?>> topicsToSourceNodes = new
HashMap<>();
+
+ // We don't really care about the order here, but since Pattern does
not implement equals() we can't rely on
+ // a regular HashMap and containsKey(Pattern). But for our purposes
it's sufficient to compare the compiled
+ // string and flags to determine if two pattern subscriptions can be
merged into a single source node
+ final Map<Pattern, StreamSourceNode<?, ?>> patternsToSourceNodes =
+ new
TreeMap<>(Comparator.comparing(Pattern::pattern).thenComparing(Pattern::flags));
+
+ for (final StreamsGraphNode graphNode : root.children()) {
+ if (graphNode instanceof StreamSourceNode) {
+ final StreamSourceNode<?, ?> currentSourceNode =
(StreamSourceNode<?, ?>) graphNode;
Review comment:
We could avoid the `instanceof` and the casting if we introduce a
`RootGraphNode` with a method `sourceNodes()`. Since a root can only have
source nodes and state stores as children, we could make the topology code in
general a bit more type safe. As far as I can see that would need some
additional changes outside the scope of this PR. So, feel free to not consider
this comment for this PR and we can do another PR for that.
##########
File path:
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
##########
@@ -410,18 +410,6 @@ public final void addSource(final Topology.AutoOffsetReset
offsetReset,
}
}
- for (final Pattern otherPattern : earliestResetPatterns) {
- if (topicPattern.pattern().contains(otherPattern.pattern()) ||
otherPattern.pattern().contains(topicPattern.pattern())) {
- throw new TopologyException("Pattern " + topicPattern + " will
overlap with another pattern " + otherPattern + " already been registered by
another source");
- }
- }
-
- for (final Pattern otherPattern : latestResetPatterns) {
- if (topicPattern.pattern().contains(otherPattern.pattern()) ||
otherPattern.pattern().contains(topicPattern.pattern())) {
- throw new TopologyException("Pattern " + topicPattern + " will
overlap with another pattern " + otherPattern + " already been registered by
another source");
- }
- }
-
Review comment:
I agree on the first part.
Regarding the second part, I had similiar thoughts when I wrote my comment
in `mergeDuplicateSourceNodes()`.
But I might also be missing something here.
##########
File path:
streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
##########
@@ -895,6 +898,55 @@ public void
shouldUseSpecifiedNameForAggregateOperationGivenTable() {
STREAM_OPERATION_NAME);
}
+ @Test
+ public void shouldAllowReadingFromSameTopic() {
+ builder.stream("topic");
+ builder.stream("topic");
+ builder.build();
+ }
Review comment:
Could you please add a try-catch clause to better document the test?
For example:
```suggestion
public void shouldAllowReadingFromSameTopic() {
builder.stream("topic");
builder.stream("topic");
try {
builder.build();
} catch (final TopologyException topologyException) {
fail("TopologyException not expected");
}
}
```
This applies also to the other tests.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]