[ 
https://issues.apache.org/jira/browse/KAFKA-9298?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16995970#comment-16995970
 ] 

Walker Carlson commented on KAFKA-9298:
---------------------------------------

 We solved a similar bug for cogoup in 
https://github.com/apache/kafka/pull/7792 #CogroupedStreamAggregateBuilder.java

> Reuse of a mapped stream causes an Invalid Topology
> ---------------------------------------------------
>
>                 Key: KAFKA-9298
>                 URL: https://issues.apache.org/jira/browse/KAFKA-9298
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>            Reporter: Walker Carlson
>            Priority: Minor
>              Labels: join, streams
>
> Can be found with in the KStreamKStreamJoinTest.java
>     @Test
>     public void optimizerIsEager() {
>         final StreamsBuilder builder = new StreamsBuilder();
>         final KStream<String, String> stream1 = builder.stream("topic", 
> Consumed.with(Serdes.String(), Serdes.String()));
>         final KStream<String, String> stream2 = builder.stream("topic2", 
> Consumed.with(Serdes.String(), Serdes.String()));
>         final KStream<String, String> stream3 = builder.stream("topic3", 
> Consumed.with(Serdes.String(), Serdes.String()));
>         final KStream<String, String> newStream = stream1.map((k, v) -> new 
> KeyValue<>(v, k));
>         newStream.join(stream2,
>             (value1, value2) -> value1 + value2,
>             JoinWindows.of(ofMillis(100)),
>             StreamJoined.with(Serdes.String(), Serdes.String(), 
> Serdes.String()));
>         newStream.join(stream3,
>             (value1, value2) -> value1 + value2,
>             JoinWindows.of(ofMillis(100)),
>             StreamJoined.with(Serdes.String(), Serdes.String(), 
> Serdes.String()));
>         System.err.println(builder.build().describe().toString());
>     }
> **results in 
> **
> Invalid topology: Topic KSTREAM-MAP-0000000003-repartition has already been 
> registered by another source.
> org.apache.kafka.streams.errors.TopologyException: Invalid topology: Topic 
> KSTREAM-MAP-0000000003-repartition has already been registered by another 
> source.
>       at 
> org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.validateTopicNotAlreadyRegistered(InternalTopologyBuilder.java:578)
>       at 
> org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.addSource(InternalTopologyBuilder.java:378)
>       at 
> org.apache.kafka.streams.kstream.internals.graph.OptimizableRepartitionNode.writeToTopology(OptimizableRepartitionNode.java:100)
>       at 
> org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder.buildAndOptimizeTopology(InternalStreamsBuilder.java:303)
>       at 
> org.apache.kafka.streams.StreamsBuilder.build(StreamsBuilder.java:562)
>       at 
> org.apache.kafka.streams.StreamsBuilder.build(StreamsBuilder.java:551)
>       at 
> org.apache.kafka.streams.kstream.internals.KStreamKStreamJoinTest.optimizerIsEager(KStreamKStreamJoinTest.java:136)
>       at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>       at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>       at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>       at java.base/java.lang.reflect.Method.invoke(Method.java:566)
>       at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
>       at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>       at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
>       at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>       at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:305)
>       at 
> org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
>       at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:365)
>       at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
>       at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
>       at org.junit.runners.ParentRunner$4.run(ParentRunner.java:330)
>       at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:78)
>       at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:328)
>       at org.junit.runners.ParentRunner.access$100(ParentRunner.java:65)
>       at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:292)
>       at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:305)
>       at org.junit.runners.ParentRunner.run(ParentRunner.java:412)
>       at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:110)
>       at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58)
>       at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:38)
>       at 
> org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:62)
>       at 
> org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51)
>       at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>       at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>       at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>       at java.base/java.lang.reflect.Method.invoke(Method.java:566)
>       at 
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:36)
>       at 
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
>       at 
> org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:33)
>       at 
> org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:94)
>       at com.sun.proxy.$Proxy5.processTestClass(Unknown Source)
>       at 
> org.gradle.api.internal.tasks.testing.worker.TestWorker.processTestClass(TestWorker.java:118)
>       at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>       at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>       at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>       at java.base/java.lang.reflect.Method.invoke(Method.java:566)
>       at 
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:36)
>       at 
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
>       at 
> org.gradle.internal.remote.internal.hub.MessageHubBackedObjectConnection$DispatchWrapper.dispatch(MessageHubBackedObjectConnection.java:182)
>       at 
> org.gradle.internal.remote.internal.hub.MessageHubBackedObjectConnection$DispatchWrapper.dispatch(MessageHubBackedObjectConnection.java:164)
>       at 
> org.gradle.internal.remote.internal.hub.MessageHub$Handler.run(MessageHub.java:412)
>       at 
> org.gradle.internal.concurrent.ExecutorPolicy$CatchAndRecordFailures.onExecute(ExecutorPolicy.java:64)
>       at 
> org.gradle.internal.concurrent.ManagedExecutorImpl$1.run(ManagedExecutorImpl.java:48)
>       at 
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>       at 
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>       at 
> org.gradle.internal.concurrent.ThreadFactoryImpl$ManagedThreadRunnable.run(ThreadFactoryImpl.java:56)
>       at java.base/java.lang.Thread.run(Thread.java:834)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to