[jira] [Commented] (KAFKA-9298) Reuse of a mapped stream causes an Invalid Topology
[ https://issues.apache.org/jira/browse/KAFKA-9298?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17118162#comment-17118162 ] Bill Bejeck commented on KAFKA-9298: cherry-picked to 2.5 > Reuse of a mapped stream causes an Invalid Topology > --- > > Key: KAFKA-9298 > URL: https://issues.apache.org/jira/browse/KAFKA-9298 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Walker Carlson >Assignee: Bill Bejeck >Priority: Minor > Labels: join, streams > Fix For: 2.6.0, 2.5.1 > > > Can be found with in the KStreamKStreamJoinTest.java > {code:java} > @Test > public void optimizerIsEager() { > final StreamsBuilder builder = new StreamsBuilder(); > final KStream stream1 = builder.stream("topic", > Consumed.with(Serdes.String(), Serdes.String())); > final KStream stream2 = builder.stream("topic2", > Consumed.with(Serdes.String(), Serdes.String())); > final KStream stream3 = builder.stream("topic3", > Consumed.with(Serdes.String(), Serdes.String())); > final KStream newStream = stream1.map((k, v) -> new > KeyValue<>(v, k)); > newStream.join(stream2, (value1, value2) -> value1 + value2, > JoinWindows.of(ofMillis(100)), StreamJoined.with(Serdes.String(), > Serdes.String(), Serdes.String())); > newStream.join(stream3, (value1, value2) -> value1 + value2, > JoinWindows.of(ofMillis(100)), StreamJoined.with(Serdes.String(), > Serdes.String(), Serdes.String())); > System.err.println(builder.build().describe().toString()); > } > > {code} > **results in > ** > Invalid topology: Topic KSTREAM-MAP-03-repartition has already been > registered by another source. > org.apache.kafka.streams.errors.TopologyException: Invalid topology: Topic > KSTREAM-MAP-03-repartition has already been registered by another > source. > at > org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.validateTopicNotAlreadyRegistered(InternalTopologyBuilder.java:578) > at > org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.addSource(InternalTopologyBuilder.java:378) > at > org.apache.kafka.streams.kstream.internals.graph.OptimizableRepartitionNode.writeToTopology(OptimizableRepartitionNode.java:100) > at > org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder.buildAndOptimizeTopology(InternalStreamsBuilder.java:303) > at org.apache.kafka.streams.StreamsBuilder.build(StreamsBuilder.java:562) > at org.apache.kafka.streams.StreamsBuilder.build(StreamsBuilder.java:551) > at > org.apache.kafka.streams.kstream.internals.KStreamKStreamJoinTest.optimizerIsEager(KStreamKStreamJoinTest.java:136) > at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native > Method) > at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.base/java.lang.reflect.Method.invoke(Method.java:566) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:305) > at > org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:365) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63) > at org.junit.runners.ParentRunner$4.run(ParentRunner.java:330) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:78) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:328) > at org.junit.runners.ParentRunner.access$100(ParentRunner.java:65) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:292) > at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:305) > at org.junit.runners.ParentRunner.run(ParentRunner.java:412) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:110) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:38) > at > org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:
[jira] [Commented] (KAFKA-9298) Reuse of a mapped stream causes an Invalid Topology
[ https://issues.apache.org/jira/browse/KAFKA-9298?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17085398#comment-17085398 ] ASF GitHub Bot commented on KAFKA-9298: --- bbejeck commented on pull request #8504: KAFKA-9298: reuse mapped stream error in joins URL: https://github.com/apache/kafka/pull/8504 When performing a join with a stream that needs repartitioning, Kafka Streams automatically creates a repartition topic. If the user does not use `StreamJoined` to name to repartition topic, Kafka Streams uses the generated name of the KStream instance for the repartition topic name. If the KStream instance requiring the repartition participates in another join, the second repartition topic is created using the name of the operator. This name reuse is what causes the `InvalidTopologyException.` The error occurs because the `InternalTopologyBuilder` has already registered the repartition source name previously. For example, this topology will cause an error because Kafka Streams will attempt to create two repartition topics (which is correct behavior) but using the _**same name**_ each time which causes the error. ``` java KStream newStream = stream1.map((k, v) -> new KeyValue<>(v, k)); newStream.join(stream2, (value1, value2) -> value1 + value2, JoinWindows.of(ofMillis(100))).to("out-one"); newStream.join(stream3, (value1, value2) -> value1 + value2, JoinWindows.of(ofMillis(100))).to("out-to"); ``` However this topology, which is the same except the user has provided repartition topic names, is fine. Note the use of `StreamJoined.withName` here ```java KStream newStream = stream1.map((k, v) -> new KeyValue<>(v, k)); final StreamJoined streamJoined = StreamJoined.with(Serdes.String(), Serdes.String(), Serdes.String()); newStream.join(stream2, (value1, value2) -> value1 + value2, JoinWindows.of(ofMillis(100)), streamJoined.withName("first-join")).to("out-one"); newStream.join(stream3, (value1, value2) -> value1 + value2, JoinWindows.of(ofMillis(100)), streamJoined.withName("second-join")).to("out-two"); ``` This bug has been present for some time as I tested this out on `2.0` before we added the optimization layer. Ideally, the fix should be to generate a repartition topic name each time to avoid such issues. But IMHO that ship has already sailed because by introducing a new name generation will cause compatibility issues for existing topologies. So generating new names is out for now, at least. The proposed fix is: 1. For KStream objects needing repartitioning _**and using generated names**, reuse the repartition topic node in any additional joins. 2. For KStream instances needing repartitioning _**using user-provided names**_ always create a new repartition topic node for each join as each one will have a unique name I've added tests confirming the expected behavior. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Reuse of a mapped stream causes an Invalid Topology > --- > > Key: KAFKA-9298 > URL: https://issues.apache.org/jira/browse/KAFKA-9298 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Walker Carlson >Assignee: Bill Bejeck >Priority: Minor > Labels: join, streams > > Can be found with in the KStreamKStreamJoinTest.java > {code:java} > @Test > public void optimizerIsEager() { > final StreamsBuilder builder = new StreamsBuilder(); > final KStream stream1 = builder.stream("topic", > Consumed.with(Serdes.String(), Serdes.String())); > final KStream stream2 = builder.stream("topic2", > Consumed.with(Serdes.String(), Serdes.String())); > final KStream stream3 = builder.stream("topic3", > Consumed.with(Serdes.String(), Serdes.String())); > final KStream newStream = stream1.map((k, v) -> new > KeyValue<>(v, k)); > newStream.join(stream2, (value1, value2) -> value1 + value2, > JoinWindows.of(ofMillis(100)), StreamJoined.with(Serdes.String(), > Serdes.String(), Serdes.String())); > newStream.join(stream3, (value1, value2) -> value1 + value2, > JoinWindows.of(ofMillis(100)), StreamJoined.with(Serdes.String(), > Serdes.String(), Serdes.String())); > System.err.println(builder.build().describe().toString()); > } > >
[jira] [Commented] (KAFKA-9298) Reuse of a mapped stream causes an Invalid Topology
[ https://issues.apache.org/jira/browse/KAFKA-9298?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17083736#comment-17083736 ] Bill Bejeck commented on KAFKA-9298: In Investigating this more to nail down when this bug occurred, it seems this has been an existing issue. The following test fails with the same exception in 2.0, which is before the optimization layer was added. {noformat} @Test public void testReuseJoin() { final StreamsBuilder builder = new StreamsBuilder(); KStream stream1 = builder.stream("input"); KStream stream2 = builder.stream("input2"); KStream stream3 = builder.stream("input3"); KStream mappedStream = stream1.map(KeyValue::pair); KStream joinOne = mappedStream.join(stream2, (v1, v2)-> v1 + v2, JoinWindows.of(5000)); KStream joinTwp = mappedStream.join(stream3, (v1, v2)-> v1 + v2, JoinWindows.of(5000)); System.out.println(builder.build().describe().toString()); }{noformat} > Reuse of a mapped stream causes an Invalid Topology > --- > > Key: KAFKA-9298 > URL: https://issues.apache.org/jira/browse/KAFKA-9298 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Walker Carlson >Assignee: Bill Bejeck >Priority: Minor > Labels: join, streams > > Can be found with in the KStreamKStreamJoinTest.java > {code:java} > @Test > public void optimizerIsEager() { > final StreamsBuilder builder = new StreamsBuilder(); > final KStream stream1 = builder.stream("topic", > Consumed.with(Serdes.String(), Serdes.String())); > final KStream stream2 = builder.stream("topic2", > Consumed.with(Serdes.String(), Serdes.String())); > final KStream stream3 = builder.stream("topic3", > Consumed.with(Serdes.String(), Serdes.String())); > final KStream newStream = stream1.map((k, v) -> new > KeyValue<>(v, k)); > newStream.join(stream2, (value1, value2) -> value1 + value2, > JoinWindows.of(ofMillis(100)), StreamJoined.with(Serdes.String(), > Serdes.String(), Serdes.String())); > newStream.join(stream3, (value1, value2) -> value1 + value2, > JoinWindows.of(ofMillis(100)), StreamJoined.with(Serdes.String(), > Serdes.String(), Serdes.String())); > System.err.println(builder.build().describe().toString()); > } > > {code} > **results in > ** > Invalid topology: Topic KSTREAM-MAP-03-repartition has already been > registered by another source. > org.apache.kafka.streams.errors.TopologyException: Invalid topology: Topic > KSTREAM-MAP-03-repartition has already been registered by another > source. > at > org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.validateTopicNotAlreadyRegistered(InternalTopologyBuilder.java:578) > at > org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.addSource(InternalTopologyBuilder.java:378) > at > org.apache.kafka.streams.kstream.internals.graph.OptimizableRepartitionNode.writeToTopology(OptimizableRepartitionNode.java:100) > at > org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder.buildAndOptimizeTopology(InternalStreamsBuilder.java:303) > at org.apache.kafka.streams.StreamsBuilder.build(StreamsBuilder.java:562) > at org.apache.kafka.streams.StreamsBuilder.build(StreamsBuilder.java:551) > at > org.apache.kafka.streams.kstream.internals.KStreamKStreamJoinTest.optimizerIsEager(KStreamKStreamJoinTest.java:136) > at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native > Method) > at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.base/java.lang.reflect.Method.invoke(Method.java:566) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:305) > at > org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:365) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63) > at org.junit.runners.ParentRunner$4.run(ParentRunner.java:330) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:78) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:328) > at org.junit.runners.ParentRunner.access$10
[jira] [Commented] (KAFKA-9298) Reuse of a mapped stream causes an Invalid Topology
[ https://issues.apache.org/jira/browse/KAFKA-9298?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17075954#comment-17075954 ] Bill Bejeck commented on KAFKA-9298: Correct, I wasn't trying to suggest otherwise. I could be splitting hairs here, but I just wanted to be clear the issue was related to building the "standard" topology vs using the optimization process itself. As I said in my previous comment, it's still a bug that needs fixing. > Reuse of a mapped stream causes an Invalid Topology > --- > > Key: KAFKA-9298 > URL: https://issues.apache.org/jira/browse/KAFKA-9298 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Walker Carlson >Assignee: Bill Bejeck >Priority: Minor > Labels: join, streams > > Can be found with in the KStreamKStreamJoinTest.java > {code:java} > @Test > public void optimizerIsEager() { > final StreamsBuilder builder = new StreamsBuilder(); > final KStream stream1 = builder.stream("topic", > Consumed.with(Serdes.String(), Serdes.String())); > final KStream stream2 = builder.stream("topic2", > Consumed.with(Serdes.String(), Serdes.String())); > final KStream stream3 = builder.stream("topic3", > Consumed.with(Serdes.String(), Serdes.String())); > final KStream newStream = stream1.map((k, v) -> new > KeyValue<>(v, k)); > newStream.join(stream2, (value1, value2) -> value1 + value2, > JoinWindows.of(ofMillis(100)), StreamJoined.with(Serdes.String(), > Serdes.String(), Serdes.String())); > newStream.join(stream3, (value1, value2) -> value1 + value2, > JoinWindows.of(ofMillis(100)), StreamJoined.with(Serdes.String(), > Serdes.String(), Serdes.String())); > System.err.println(builder.build().describe().toString()); > } > > {code} > **results in > ** > Invalid topology: Topic KSTREAM-MAP-03-repartition has already been > registered by another source. > org.apache.kafka.streams.errors.TopologyException: Invalid topology: Topic > KSTREAM-MAP-03-repartition has already been registered by another > source. > at > org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.validateTopicNotAlreadyRegistered(InternalTopologyBuilder.java:578) > at > org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.addSource(InternalTopologyBuilder.java:378) > at > org.apache.kafka.streams.kstream.internals.graph.OptimizableRepartitionNode.writeToTopology(OptimizableRepartitionNode.java:100) > at > org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder.buildAndOptimizeTopology(InternalStreamsBuilder.java:303) > at org.apache.kafka.streams.StreamsBuilder.build(StreamsBuilder.java:562) > at org.apache.kafka.streams.StreamsBuilder.build(StreamsBuilder.java:551) > at > org.apache.kafka.streams.kstream.internals.KStreamKStreamJoinTest.optimizerIsEager(KStreamKStreamJoinTest.java:136) > at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native > Method) > at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.base/java.lang.reflect.Method.invoke(Method.java:566) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:305) > at > org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:365) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63) > at org.junit.runners.ParentRunner$4.run(ParentRunner.java:330) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:78) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:328) > at org.junit.runners.ParentRunner.access$100(ParentRunner.java:65) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:292) > at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:305) > at org.junit.runners.ParentRunner.run(ParentRunner.java:412) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:110) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58) > at > org.gradle.api.inter
[jira] [Commented] (KAFKA-9298) Reuse of a mapped stream causes an Invalid Topology
[ https://issues.apache.org/jira/browse/KAFKA-9298?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17075943#comment-17075943 ] Matthias J. Sax commented on KAFKA-9298: Interesting finding [~bbejeck] – the error seems to be in the "optimization layer" though (even if it's not related to rewrite rules): even if optimization is turned off, we still build up the GraphNodes DAG to create the `Topology` lazily at the end and the bug seems to be in this part of the code. > Reuse of a mapped stream causes an Invalid Topology > --- > > Key: KAFKA-9298 > URL: https://issues.apache.org/jira/browse/KAFKA-9298 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Walker Carlson >Assignee: Bill Bejeck >Priority: Minor > Labels: join, streams > > Can be found with in the KStreamKStreamJoinTest.java > {code:java} > @Test > public void optimizerIsEager() { > final StreamsBuilder builder = new StreamsBuilder(); > final KStream stream1 = builder.stream("topic", > Consumed.with(Serdes.String(), Serdes.String())); > final KStream stream2 = builder.stream("topic2", > Consumed.with(Serdes.String(), Serdes.String())); > final KStream stream3 = builder.stream("topic3", > Consumed.with(Serdes.String(), Serdes.String())); > final KStream newStream = stream1.map((k, v) -> new > KeyValue<>(v, k)); > newStream.join(stream2, (value1, value2) -> value1 + value2, > JoinWindows.of(ofMillis(100)), StreamJoined.with(Serdes.String(), > Serdes.String(), Serdes.String())); > newStream.join(stream3, (value1, value2) -> value1 + value2, > JoinWindows.of(ofMillis(100)), StreamJoined.with(Serdes.String(), > Serdes.String(), Serdes.String())); > System.err.println(builder.build().describe().toString()); > } > > {code} > **results in > ** > Invalid topology: Topic KSTREAM-MAP-03-repartition has already been > registered by another source. > org.apache.kafka.streams.errors.TopologyException: Invalid topology: Topic > KSTREAM-MAP-03-repartition has already been registered by another > source. > at > org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.validateTopicNotAlreadyRegistered(InternalTopologyBuilder.java:578) > at > org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.addSource(InternalTopologyBuilder.java:378) > at > org.apache.kafka.streams.kstream.internals.graph.OptimizableRepartitionNode.writeToTopology(OptimizableRepartitionNode.java:100) > at > org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder.buildAndOptimizeTopology(InternalStreamsBuilder.java:303) > at org.apache.kafka.streams.StreamsBuilder.build(StreamsBuilder.java:562) > at org.apache.kafka.streams.StreamsBuilder.build(StreamsBuilder.java:551) > at > org.apache.kafka.streams.kstream.internals.KStreamKStreamJoinTest.optimizerIsEager(KStreamKStreamJoinTest.java:136) > at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native > Method) > at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.base/java.lang.reflect.Method.invoke(Method.java:566) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:305) > at > org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:365) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63) > at org.junit.runners.ParentRunner$4.run(ParentRunner.java:330) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:78) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:328) > at org.junit.runners.ParentRunner.access$100(ParentRunner.java:65) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:292) > at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:305) > at org.junit.runners.ParentRunner.run(ParentRunner.java:412) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:110) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58) >
[jira] [Commented] (KAFKA-9298) Reuse of a mapped stream causes an Invalid Topology
[ https://issues.apache.org/jira/browse/KAFKA-9298?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17075592#comment-17075592 ] Bill Bejeck commented on KAFKA-9298: The test method should not be {{optimizerIsEager }}as it's not using the optimizer. The default setting in the configs is to not optimize. Updating the test above to use optimization, no error occurs. There's still a bug, but it's not related to optimization. {noformat} final StreamsBuilder builder = new StreamsBuilder(); final Properties props = new Properties(); props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION, StreamsConfig.OPTIMIZE); final KStream stream1 = builder.stream("topic", Consumed.with(Serdes.String(), Serdes.String())); final KStream stream2 = builder.stream("topic2", Consumed.with(Serdes.String(), Serdes.String())); final KStream stream3 = builder.stream("topic3", Consumed.with(Serdes.String(), Serdes.String())); final KStream newStream = stream1.map((k, v) -> new KeyValue<>(v, k)); newStream.join(stream2, (value1, value2) -> value1 + value2, JoinWindows.of(ofMillis(100)), StreamJoined.with(Serdes.String(), Serdes.String(), Serdes.String())); newStream.join(stream3, (value1, value2) -> value1 + value2, JoinWindows.of(ofMillis(100)), StreamJoined.with(Serdes.String(), Serdes.String(), Serdes.String())); System.out.println(builder.build(props).describe().toString());{noformat} > Reuse of a mapped stream causes an Invalid Topology > --- > > Key: KAFKA-9298 > URL: https://issues.apache.org/jira/browse/KAFKA-9298 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Walker Carlson >Assignee: Bill Bejeck >Priority: Minor > Labels: join, streams > > Can be found with in the KStreamKStreamJoinTest.java > @Test > public void optimizerIsEager() { > final StreamsBuilder builder = new StreamsBuilder(); > final KStream stream1 = builder.stream("topic", > Consumed.with(Serdes.String(), Serdes.String())); > final KStream stream2 = builder.stream("topic2", > Consumed.with(Serdes.String(), Serdes.String())); > final KStream stream3 = builder.stream("topic3", > Consumed.with(Serdes.String(), Serdes.String())); > final KStream newStream = stream1.map((k, v) -> new > KeyValue<>(v, k)); > newStream.join(stream2, > (value1, value2) -> value1 + value2, > JoinWindows.of(ofMillis(100)), > StreamJoined.with(Serdes.String(), Serdes.String(), > Serdes.String())); > newStream.join(stream3, > (value1, value2) -> value1 + value2, > JoinWindows.of(ofMillis(100)), > StreamJoined.with(Serdes.String(), Serdes.String(), > Serdes.String())); > System.err.println(builder.build().describe().toString()); > } > **results in > ** > Invalid topology: Topic KSTREAM-MAP-03-repartition has already been > registered by another source. > org.apache.kafka.streams.errors.TopologyException: Invalid topology: Topic > KSTREAM-MAP-03-repartition has already been registered by another > source. > at > org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.validateTopicNotAlreadyRegistered(InternalTopologyBuilder.java:578) > at > org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.addSource(InternalTopologyBuilder.java:378) > at > org.apache.kafka.streams.kstream.internals.graph.OptimizableRepartitionNode.writeToTopology(OptimizableRepartitionNode.java:100) > at > org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder.buildAndOptimizeTopology(InternalStreamsBuilder.java:303) > at > org.apache.kafka.streams.StreamsBuilder.build(StreamsBuilder.java:562) > at > org.apache.kafka.streams.StreamsBuilder.build(StreamsBuilder.java:551) > at > org.apache.kafka.streams.kstream.internals.KStreamKStreamJoinTest.optimizerIsEager(KStreamKStreamJoinTest.java:136) > at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.base/java.lang.reflect.Method.invoke(Method.java:566) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at org.junit.runners.ParentRunner
[jira] [Commented] (KAFKA-9298) Reuse of a mapped stream causes an Invalid Topology
[ https://issues.apache.org/jira/browse/KAFKA-9298?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16995970#comment-16995970 ] Walker Carlson commented on KAFKA-9298: --- We solved a similar bug for cogoup in https://github.com/apache/kafka/pull/7792 #CogroupedStreamAggregateBuilder.java > Reuse of a mapped stream causes an Invalid Topology > --- > > Key: KAFKA-9298 > URL: https://issues.apache.org/jira/browse/KAFKA-9298 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Walker Carlson >Priority: Minor > Labels: join, streams > > Can be found with in the KStreamKStreamJoinTest.java > @Test > public void optimizerIsEager() { > final StreamsBuilder builder = new StreamsBuilder(); > final KStream stream1 = builder.stream("topic", > Consumed.with(Serdes.String(), Serdes.String())); > final KStream stream2 = builder.stream("topic2", > Consumed.with(Serdes.String(), Serdes.String())); > final KStream stream3 = builder.stream("topic3", > Consumed.with(Serdes.String(), Serdes.String())); > final KStream newStream = stream1.map((k, v) -> new > KeyValue<>(v, k)); > newStream.join(stream2, > (value1, value2) -> value1 + value2, > JoinWindows.of(ofMillis(100)), > StreamJoined.with(Serdes.String(), Serdes.String(), > Serdes.String())); > newStream.join(stream3, > (value1, value2) -> value1 + value2, > JoinWindows.of(ofMillis(100)), > StreamJoined.with(Serdes.String(), Serdes.String(), > Serdes.String())); > System.err.println(builder.build().describe().toString()); > } > **results in > ** > Invalid topology: Topic KSTREAM-MAP-03-repartition has already been > registered by another source. > org.apache.kafka.streams.errors.TopologyException: Invalid topology: Topic > KSTREAM-MAP-03-repartition has already been registered by another > source. > at > org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.validateTopicNotAlreadyRegistered(InternalTopologyBuilder.java:578) > at > org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.addSource(InternalTopologyBuilder.java:378) > at > org.apache.kafka.streams.kstream.internals.graph.OptimizableRepartitionNode.writeToTopology(OptimizableRepartitionNode.java:100) > at > org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder.buildAndOptimizeTopology(InternalStreamsBuilder.java:303) > at > org.apache.kafka.streams.StreamsBuilder.build(StreamsBuilder.java:562) > at > org.apache.kafka.streams.StreamsBuilder.build(StreamsBuilder.java:551) > at > org.apache.kafka.streams.kstream.internals.KStreamKStreamJoinTest.optimizerIsEager(KStreamKStreamJoinTest.java:136) > at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.base/java.lang.reflect.Method.invoke(Method.java:566) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:305) > at > org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:365) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63) > at org.junit.runners.ParentRunner$4.run(ParentRunner.java:330) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:78) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:328) > at org.junit.runners.ParentRunner.access$100(ParentRunner.java:65) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:292) > at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:305) > at org.junit.runners.ParentRunner.run(ParentRunner.java:412) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:110) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.exec