ableegoldman commented on a change in pull request #9582:
URL: https://github.com/apache/kafka/pull/9582#discussion_r522381803
##########
File path:
streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamSourceNode.java
##########
@@ -71,6 +78,21 @@ public Pattern topicPattern() {
return consumedInternal.valueSerde();
}
+ // We "merge" source nodes into a single node under the hood if a user
tries to read in a source topic multiple times
+ public void merge(final StreamSourceNode<?, ?> other) {
+ final AutoOffsetReset resetPolicy =
consumedInternal.offsetResetPolicy();
+ if (resetPolicy != null &&
!resetPolicy.equals(other.consumedInternal().offsetResetPolicy())) {
+ log.error("Tried to merge source nodes {} and {} which are
subscribed to the same topic/pattern, but "
+ + "the offset reset policies do not match", this,
other);
+ throw new TopologyException("Can't configure different offset
reset policies on the same input topic(s)");
+ }
+ for (final StreamsGraphNode otherChild : other.children()) {
+ // Move children from other to this, these calls take care of
resetting the child's parents to this
Review comment:
I'll remove it
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]