[ 
https://issues.apache.org/jira/browse/KAFKA-9298?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17085398#comment-17085398
 ] 

ASF GitHub Bot commented on KAFKA-9298:
---------------------------------------

bbejeck commented on pull request #8504: KAFKA-9298: reuse mapped stream error 
in joins
URL: https://github.com/apache/kafka/pull/8504
 
 
   When performing a join with a stream that needs repartitioning, Kafka 
Streams automatically creates a repartition topic.  If the user does not use 
`StreamJoined` to name to repartition topic, Kafka Streams uses the generated 
name of the KStream instance for the repartition topic name.  
   
   If the KStream instance requiring the repartition participates in another 
join, the second repartition topic is created using the name of the operator. 
This name reuse is what causes the `InvalidTopologyException.`  The error 
occurs because the `InternalTopologyBuilder` has already registered the 
repartition source name previously.
   
   For example, this topology will cause an error because Kafka Streams will 
attempt to create two repartition topics (which is correct behavior) but using 
the _**same name**_ each time which causes the error.
   ``` java
   KStream<String, String> newStream = stream1.map((k, v) -> new KeyValue<>(v, 
k));
   newStream.join(stream2, (value1, value2) -> value1 + value2, 
JoinWindows.of(ofMillis(100))).to("out-one");
   newStream.join(stream3, (value1, value2) -> value1 + value2, 
JoinWindows.of(ofMillis(100))).to("out-to");
   ```
   However this topology, which is the same except the user has provided 
repartition topic names, is fine.  Note the use of `StreamJoined.withName` here
   ```java
   KStream<String, String> newStream = stream1.map((k, v) -> new KeyValue<>(v, 
k));
   final StreamJoined<String, String, String> streamJoined = 
StreamJoined.with(Serdes.String(), Serdes.String(), Serdes.String());
   newStream.join(stream2, (value1, value2) -> value1 + value2, 
JoinWindows.of(ofMillis(100)), 
streamJoined.withName("first-join")).to("out-one");
   newStream.join(stream3, (value1, value2) -> value1 + value2, 
JoinWindows.of(ofMillis(100)), 
streamJoined.withName("second-join")).to("out-two");
   ```
   This bug has been present for some time as I tested this out on `2.0` before 
we added the optimization layer.
   
   Ideally, the fix should be to generate a repartition topic name each time to 
avoid such issues.  But IMHO that ship has already sailed because by 
introducing a new name generation will cause compatibility issues for existing 
topologies.  So generating new names is out for now, at least.
   
   The proposed fix is:
   
   1.  For KStream objects needing repartitioning _**and  using generated 
names**, reuse the repartition topic node in any additional joins.
   2. For KStream instances needing repartitioning _**using user-provided 
names**_ always create a new repartition topic node for each join as each one 
will have a unique name
   
   
   
   I've added tests confirming the expected behavior.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Reuse of a mapped stream causes an Invalid Topology
> ---------------------------------------------------
>
>                 Key: KAFKA-9298
>                 URL: https://issues.apache.org/jira/browse/KAFKA-9298
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>            Reporter: Walker Carlson
>            Assignee: Bill Bejeck
>            Priority: Minor
>              Labels: join, streams
>
> Can be found with in the KStreamKStreamJoinTest.java
> {code:java}
> @Test
> public void optimizerIsEager() {
>   final StreamsBuilder builder = new StreamsBuilder();
>   final KStream<String, String> stream1 = builder.stream("topic", 
> Consumed.with(Serdes.String(), Serdes.String()));
>   final KStream<String, String> stream2 = builder.stream("topic2", 
> Consumed.with(Serdes.String(), Serdes.String()));
>   final KStream<String, String> stream3 = builder.stream("topic3", 
> Consumed.with(Serdes.String(), Serdes.String()));
>   final KStream<String, String> newStream = stream1.map((k, v) -> new 
> KeyValue<>(v, k));
>   newStream.join(stream2, (value1, value2) -> value1 + value2, 
> JoinWindows.of(ofMillis(100)), StreamJoined.with(Serdes.String(), 
> Serdes.String(), Serdes.String()));
>   newStream.join(stream3, (value1, value2) -> value1 + value2, 
> JoinWindows.of(ofMillis(100)), StreamJoined.with(Serdes.String(), 
> Serdes.String(), Serdes.String()));
>   System.err.println(builder.build().describe().toString());
> }
>  
> {code}
> **results in 
>  **
>  Invalid topology: Topic KSTREAM-MAP-0000000003-repartition has already been 
> registered by another source.
>  org.apache.kafka.streams.errors.TopologyException: Invalid topology: Topic 
> KSTREAM-MAP-0000000003-repartition has already been registered by another 
> source.
>  at 
> org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.validateTopicNotAlreadyRegistered(InternalTopologyBuilder.java:578)
>  at 
> org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.addSource(InternalTopologyBuilder.java:378)
>  at 
> org.apache.kafka.streams.kstream.internals.graph.OptimizableRepartitionNode.writeToTopology(OptimizableRepartitionNode.java:100)
>  at 
> org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder.buildAndOptimizeTopology(InternalStreamsBuilder.java:303)
>  at org.apache.kafka.streams.StreamsBuilder.build(StreamsBuilder.java:562)
>  at org.apache.kafka.streams.StreamsBuilder.build(StreamsBuilder.java:551)
>  at 
> org.apache.kafka.streams.kstream.internals.KStreamKStreamJoinTest.optimizerIsEager(KStreamKStreamJoinTest.java:136)
>  at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
> Method)
>  at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>  at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>  at java.base/java.lang.reflect.Method.invoke(Method.java:566)
>  at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
>  at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>  at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
>  at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>  at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:305)
>  at 
> org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
>  at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:365)
>  at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
>  at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
>  at org.junit.runners.ParentRunner$4.run(ParentRunner.java:330)
>  at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:78)
>  at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:328)
>  at org.junit.runners.ParentRunner.access$100(ParentRunner.java:65)
>  at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:292)
>  at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:305)
>  at org.junit.runners.ParentRunner.run(ParentRunner.java:412)
>  at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:110)
>  at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58)
>  at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:38)
>  at 
> org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:62)
>  at 
> org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51)
>  at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
> Method)
>  at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>  at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>  at java.base/java.lang.reflect.Method.invoke(Method.java:566)
>  at 
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:36)
>  at 
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
>  at 
> org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:33)
>  at 
> org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:94)
>  at com.sun.proxy.$Proxy5.processTestClass(Unknown Source)
>  at 
> org.gradle.api.internal.tasks.testing.worker.TestWorker.processTestClass(TestWorker.java:118)
>  at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
> Method)
>  at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>  at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>  at java.base/java.lang.reflect.Method.invoke(Method.java:566)
>  at 
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:36)
>  at 
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
>  at 
> org.gradle.internal.remote.internal.hub.MessageHubBackedObjectConnection$DispatchWrapper.dispatch(MessageHubBackedObjectConnection.java:182)
>  at 
> org.gradle.internal.remote.internal.hub.MessageHubBackedObjectConnection$DispatchWrapper.dispatch(MessageHubBackedObjectConnection.java:164)
>  at 
> org.gradle.internal.remote.internal.hub.MessageHub$Handler.run(MessageHub.java:412)
>  at 
> org.gradle.internal.concurrent.ExecutorPolicy$CatchAndRecordFailures.onExecute(ExecutorPolicy.java:64)
>  at 
> org.gradle.internal.concurrent.ManagedExecutorImpl$1.run(ManagedExecutorImpl.java:48)
>  at 
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>  at 
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>  at 
> org.gradle.internal.concurrent.ThreadFactoryImpl$ManagedThreadRunnable.run(ThreadFactoryImpl.java:56)
>  at java.base/java.lang.Thread.run(Thread.java:834)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to