[ https://issues.apache.org/jira/browse/KAFKA-9298?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16995970#comment-16995970 ]
Walker Carlson commented on KAFKA-9298: --------------------------------------- We solved a similar bug for cogoup in https://github.com/apache/kafka/pull/7792 #CogroupedStreamAggregateBuilder.java > Reuse of a mapped stream causes an Invalid Topology > --------------------------------------------------- > > Key: KAFKA-9298 > URL: https://issues.apache.org/jira/browse/KAFKA-9298 > Project: Kafka > Issue Type: Bug > Components: streams > Reporter: Walker Carlson > Priority: Minor > Labels: join, streams > > Can be found with in the KStreamKStreamJoinTest.java > @Test > public void optimizerIsEager() { > final StreamsBuilder builder = new StreamsBuilder(); > final KStream<String, String> stream1 = builder.stream("topic", > Consumed.with(Serdes.String(), Serdes.String())); > final KStream<String, String> stream2 = builder.stream("topic2", > Consumed.with(Serdes.String(), Serdes.String())); > final KStream<String, String> stream3 = builder.stream("topic3", > Consumed.with(Serdes.String(), Serdes.String())); > final KStream<String, String> newStream = stream1.map((k, v) -> new > KeyValue<>(v, k)); > newStream.join(stream2, > (value1, value2) -> value1 + value2, > JoinWindows.of(ofMillis(100)), > StreamJoined.with(Serdes.String(), Serdes.String(), > Serdes.String())); > newStream.join(stream3, > (value1, value2) -> value1 + value2, > JoinWindows.of(ofMillis(100)), > StreamJoined.with(Serdes.String(), Serdes.String(), > Serdes.String())); > System.err.println(builder.build().describe().toString()); > } > **results in > ** > Invalid topology: Topic KSTREAM-MAP-0000000003-repartition has already been > registered by another source. > org.apache.kafka.streams.errors.TopologyException: Invalid topology: Topic > KSTREAM-MAP-0000000003-repartition has already been registered by another > source. > at > org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.validateTopicNotAlreadyRegistered(InternalTopologyBuilder.java:578) > at > org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.addSource(InternalTopologyBuilder.java:378) > at > org.apache.kafka.streams.kstream.internals.graph.OptimizableRepartitionNode.writeToTopology(OptimizableRepartitionNode.java:100) > at > org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder.buildAndOptimizeTopology(InternalStreamsBuilder.java:303) > at > org.apache.kafka.streams.StreamsBuilder.build(StreamsBuilder.java:562) > at > org.apache.kafka.streams.StreamsBuilder.build(StreamsBuilder.java:551) > at > org.apache.kafka.streams.kstream.internals.KStreamKStreamJoinTest.optimizerIsEager(KStreamKStreamJoinTest.java:136) > at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.base/java.lang.reflect.Method.invoke(Method.java:566) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:305) > at > org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:365) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63) > at org.junit.runners.ParentRunner$4.run(ParentRunner.java:330) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:78) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:328) > at org.junit.runners.ParentRunner.access$100(ParentRunner.java:65) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:292) > at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:305) > at org.junit.runners.ParentRunner.run(ParentRunner.java:412) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:110) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:38) > at > org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:62) > at > org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51) > at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.base/java.lang.reflect.Method.invoke(Method.java:566) > at > org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:36) > at > org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24) > at > org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:33) > at > org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:94) > at com.sun.proxy.$Proxy5.processTestClass(Unknown Source) > at > org.gradle.api.internal.tasks.testing.worker.TestWorker.processTestClass(TestWorker.java:118) > at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.base/java.lang.reflect.Method.invoke(Method.java:566) > at > org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:36) > at > org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24) > at > org.gradle.internal.remote.internal.hub.MessageHubBackedObjectConnection$DispatchWrapper.dispatch(MessageHubBackedObjectConnection.java:182) > at > org.gradle.internal.remote.internal.hub.MessageHubBackedObjectConnection$DispatchWrapper.dispatch(MessageHubBackedObjectConnection.java:164) > at > org.gradle.internal.remote.internal.hub.MessageHub$Handler.run(MessageHub.java:412) > at > org.gradle.internal.concurrent.ExecutorPolicy$CatchAndRecordFailures.onExecute(ExecutorPolicy.java:64) > at > org.gradle.internal.concurrent.ManagedExecutorImpl$1.run(ManagedExecutorImpl.java:48) > at > java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) > at > java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) > at > org.gradle.internal.concurrent.ThreadFactoryImpl$ManagedThreadRunnable.run(ThreadFactoryImpl.java:56) > at java.base/java.lang.Thread.run(Thread.java:834) -- This message was sent by Atlassian Jira (v8.3.4#803005)