[ https://issues.apache.org/jira/browse/KAFKA-9298?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17085398#comment-17085398 ]
ASF GitHub Bot commented on KAFKA-9298: --------------------------------------- bbejeck commented on pull request #8504: KAFKA-9298: reuse mapped stream error in joins URL: https://github.com/apache/kafka/pull/8504 When performing a join with a stream that needs repartitioning, Kafka Streams automatically creates a repartition topic. If the user does not use `StreamJoined` to name to repartition topic, Kafka Streams uses the generated name of the KStream instance for the repartition topic name. If the KStream instance requiring the repartition participates in another join, the second repartition topic is created using the name of the operator. This name reuse is what causes the `InvalidTopologyException.` The error occurs because the `InternalTopologyBuilder` has already registered the repartition source name previously. For example, this topology will cause an error because Kafka Streams will attempt to create two repartition topics (which is correct behavior) but using the _**same name**_ each time which causes the error. ``` java KStream<String, String> newStream = stream1.map((k, v) -> new KeyValue<>(v, k)); newStream.join(stream2, (value1, value2) -> value1 + value2, JoinWindows.of(ofMillis(100))).to("out-one"); newStream.join(stream3, (value1, value2) -> value1 + value2, JoinWindows.of(ofMillis(100))).to("out-to"); ``` However this topology, which is the same except the user has provided repartition topic names, is fine. Note the use of `StreamJoined.withName` here ```java KStream<String, String> newStream = stream1.map((k, v) -> new KeyValue<>(v, k)); final StreamJoined<String, String, String> streamJoined = StreamJoined.with(Serdes.String(), Serdes.String(), Serdes.String()); newStream.join(stream2, (value1, value2) -> value1 + value2, JoinWindows.of(ofMillis(100)), streamJoined.withName("first-join")).to("out-one"); newStream.join(stream3, (value1, value2) -> value1 + value2, JoinWindows.of(ofMillis(100)), streamJoined.withName("second-join")).to("out-two"); ``` This bug has been present for some time as I tested this out on `2.0` before we added the optimization layer. Ideally, the fix should be to generate a repartition topic name each time to avoid such issues. But IMHO that ship has already sailed because by introducing a new name generation will cause compatibility issues for existing topologies. So generating new names is out for now, at least. The proposed fix is: 1. For KStream objects needing repartitioning _**and using generated names**, reuse the repartition topic node in any additional joins. 2. For KStream instances needing repartitioning _**using user-provided names**_ always create a new repartition topic node for each join as each one will have a unique name I've added tests confirming the expected behavior. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Reuse of a mapped stream causes an Invalid Topology > --------------------------------------------------- > > Key: KAFKA-9298 > URL: https://issues.apache.org/jira/browse/KAFKA-9298 > Project: Kafka > Issue Type: Bug > Components: streams > Reporter: Walker Carlson > Assignee: Bill Bejeck > Priority: Minor > Labels: join, streams > > Can be found with in the KStreamKStreamJoinTest.java > {code:java} > @Test > public void optimizerIsEager() { > final StreamsBuilder builder = new StreamsBuilder(); > final KStream<String, String> stream1 = builder.stream("topic", > Consumed.with(Serdes.String(), Serdes.String())); > final KStream<String, String> stream2 = builder.stream("topic2", > Consumed.with(Serdes.String(), Serdes.String())); > final KStream<String, String> stream3 = builder.stream("topic3", > Consumed.with(Serdes.String(), Serdes.String())); > final KStream<String, String> newStream = stream1.map((k, v) -> new > KeyValue<>(v, k)); > newStream.join(stream2, (value1, value2) -> value1 + value2, > JoinWindows.of(ofMillis(100)), StreamJoined.with(Serdes.String(), > Serdes.String(), Serdes.String())); > newStream.join(stream3, (value1, value2) -> value1 + value2, > JoinWindows.of(ofMillis(100)), StreamJoined.with(Serdes.String(), > Serdes.String(), Serdes.String())); > System.err.println(builder.build().describe().toString()); > } > > {code} > **results in > ** > Invalid topology: Topic KSTREAM-MAP-0000000003-repartition has already been > registered by another source. > org.apache.kafka.streams.errors.TopologyException: Invalid topology: Topic > KSTREAM-MAP-0000000003-repartition has already been registered by another > source. > at > org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.validateTopicNotAlreadyRegistered(InternalTopologyBuilder.java:578) > at > org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.addSource(InternalTopologyBuilder.java:378) > at > org.apache.kafka.streams.kstream.internals.graph.OptimizableRepartitionNode.writeToTopology(OptimizableRepartitionNode.java:100) > at > org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder.buildAndOptimizeTopology(InternalStreamsBuilder.java:303) > at org.apache.kafka.streams.StreamsBuilder.build(StreamsBuilder.java:562) > at org.apache.kafka.streams.StreamsBuilder.build(StreamsBuilder.java:551) > at > org.apache.kafka.streams.kstream.internals.KStreamKStreamJoinTest.optimizerIsEager(KStreamKStreamJoinTest.java:136) > at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native > Method) > at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.base/java.lang.reflect.Method.invoke(Method.java:566) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:305) > at > org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:365) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63) > at org.junit.runners.ParentRunner$4.run(ParentRunner.java:330) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:78) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:328) > at org.junit.runners.ParentRunner.access$100(ParentRunner.java:65) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:292) > at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:305) > at org.junit.runners.ParentRunner.run(ParentRunner.java:412) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:110) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:38) > at > org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:62) > at > org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51) > at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native > Method) > at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.base/java.lang.reflect.Method.invoke(Method.java:566) > at > org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:36) > at > org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24) > at > org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:33) > at > org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:94) > at com.sun.proxy.$Proxy5.processTestClass(Unknown Source) > at > org.gradle.api.internal.tasks.testing.worker.TestWorker.processTestClass(TestWorker.java:118) > at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native > Method) > at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.base/java.lang.reflect.Method.invoke(Method.java:566) > at > org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:36) > at > org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24) > at > org.gradle.internal.remote.internal.hub.MessageHubBackedObjectConnection$DispatchWrapper.dispatch(MessageHubBackedObjectConnection.java:182) > at > org.gradle.internal.remote.internal.hub.MessageHubBackedObjectConnection$DispatchWrapper.dispatch(MessageHubBackedObjectConnection.java:164) > at > org.gradle.internal.remote.internal.hub.MessageHub$Handler.run(MessageHub.java:412) > at > org.gradle.internal.concurrent.ExecutorPolicy$CatchAndRecordFailures.onExecute(ExecutorPolicy.java:64) > at > org.gradle.internal.concurrent.ManagedExecutorImpl$1.run(ManagedExecutorImpl.java:48) > at > java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) > at > java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) > at > org.gradle.internal.concurrent.ThreadFactoryImpl$ManagedThreadRunnable.run(ThreadFactoryImpl.java:56) > at java.base/java.lang.Thread.run(Thread.java:834) -- This message was sent by Atlassian Jira (v8.3.4#803005)