[jira] [Commented] (KAFKA-9298) Reuse of a mapped stream causes an Invalid Topology

2020-05-27 Thread Bill Bejeck (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9298?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17118162#comment-17118162
 ] 

Bill Bejeck commented on KAFKA-9298:


cherry-picked to 2.5

> Reuse of a mapped stream causes an Invalid Topology
> ---
>
> Key: KAFKA-9298
> URL: https://issues.apache.org/jira/browse/KAFKA-9298
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Walker Carlson
>Assignee: Bill Bejeck
>Priority: Minor
>  Labels: join, streams
> Fix For: 2.6.0, 2.5.1
>
>
> Can be found with in the KStreamKStreamJoinTest.java
> {code:java}
> @Test
> public void optimizerIsEager() {
>   final StreamsBuilder builder = new StreamsBuilder();
>   final KStream stream1 = builder.stream("topic", 
> Consumed.with(Serdes.String(), Serdes.String()));
>   final KStream stream2 = builder.stream("topic2", 
> Consumed.with(Serdes.String(), Serdes.String()));
>   final KStream stream3 = builder.stream("topic3", 
> Consumed.with(Serdes.String(), Serdes.String()));
>   final KStream newStream = stream1.map((k, v) -> new 
> KeyValue<>(v, k));
>   newStream.join(stream2, (value1, value2) -> value1 + value2, 
> JoinWindows.of(ofMillis(100)), StreamJoined.with(Serdes.String(), 
> Serdes.String(), Serdes.String()));
>   newStream.join(stream3, (value1, value2) -> value1 + value2, 
> JoinWindows.of(ofMillis(100)), StreamJoined.with(Serdes.String(), 
> Serdes.String(), Serdes.String()));
>   System.err.println(builder.build().describe().toString());
> }
>  
> {code}
> **results in 
>  **
>  Invalid topology: Topic KSTREAM-MAP-03-repartition has already been 
> registered by another source.
>  org.apache.kafka.streams.errors.TopologyException: Invalid topology: Topic 
> KSTREAM-MAP-03-repartition has already been registered by another 
> source.
>  at 
> org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.validateTopicNotAlreadyRegistered(InternalTopologyBuilder.java:578)
>  at 
> org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.addSource(InternalTopologyBuilder.java:378)
>  at 
> org.apache.kafka.streams.kstream.internals.graph.OptimizableRepartitionNode.writeToTopology(OptimizableRepartitionNode.java:100)
>  at 
> org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder.buildAndOptimizeTopology(InternalStreamsBuilder.java:303)
>  at org.apache.kafka.streams.StreamsBuilder.build(StreamsBuilder.java:562)
>  at org.apache.kafka.streams.StreamsBuilder.build(StreamsBuilder.java:551)
>  at 
> org.apache.kafka.streams.kstream.internals.KStreamKStreamJoinTest.optimizerIsEager(KStreamKStreamJoinTest.java:136)
>  at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
> Method)
>  at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>  at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>  at java.base/java.lang.reflect.Method.invoke(Method.java:566)
>  at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
>  at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>  at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
>  at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>  at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:305)
>  at 
> org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
>  at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:365)
>  at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
>  at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
>  at org.junit.runners.ParentRunner$4.run(ParentRunner.java:330)
>  at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:78)
>  at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:328)
>  at org.junit.runners.ParentRunner.access$100(ParentRunner.java:65)
>  at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:292)
>  at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:305)
>  at org.junit.runners.ParentRunner.run(ParentRunner.java:412)
>  at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:110)
>  at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58)
>  at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:38)
>  at 
> org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:

[jira] [Commented] (KAFKA-9298) Reuse of a mapped stream causes an Invalid Topology

2020-04-16 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9298?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17085398#comment-17085398
 ] 

ASF GitHub Bot commented on KAFKA-9298:
---

bbejeck commented on pull request #8504: KAFKA-9298: reuse mapped stream error 
in joins
URL: https://github.com/apache/kafka/pull/8504
 
 
   When performing a join with a stream that needs repartitioning, Kafka 
Streams automatically creates a repartition topic.  If the user does not use 
`StreamJoined` to name to repartition topic, Kafka Streams uses the generated 
name of the KStream instance for the repartition topic name.  
   
   If the KStream instance requiring the repartition participates in another 
join, the second repartition topic is created using the name of the operator. 
This name reuse is what causes the `InvalidTopologyException.`  The error 
occurs because the `InternalTopologyBuilder` has already registered the 
repartition source name previously.
   
   For example, this topology will cause an error because Kafka Streams will 
attempt to create two repartition topics (which is correct behavior) but using 
the _**same name**_ each time which causes the error.
   ``` java
   KStream newStream = stream1.map((k, v) -> new KeyValue<>(v, 
k));
   newStream.join(stream2, (value1, value2) -> value1 + value2, 
JoinWindows.of(ofMillis(100))).to("out-one");
   newStream.join(stream3, (value1, value2) -> value1 + value2, 
JoinWindows.of(ofMillis(100))).to("out-to");
   ```
   However this topology, which is the same except the user has provided 
repartition topic names, is fine.  Note the use of `StreamJoined.withName` here
   ```java
   KStream newStream = stream1.map((k, v) -> new KeyValue<>(v, 
k));
   final StreamJoined streamJoined = 
StreamJoined.with(Serdes.String(), Serdes.String(), Serdes.String());
   newStream.join(stream2, (value1, value2) -> value1 + value2, 
JoinWindows.of(ofMillis(100)), 
streamJoined.withName("first-join")).to("out-one");
   newStream.join(stream3, (value1, value2) -> value1 + value2, 
JoinWindows.of(ofMillis(100)), 
streamJoined.withName("second-join")).to("out-two");
   ```
   This bug has been present for some time as I tested this out on `2.0` before 
we added the optimization layer.
   
   Ideally, the fix should be to generate a repartition topic name each time to 
avoid such issues.  But IMHO that ship has already sailed because by 
introducing a new name generation will cause compatibility issues for existing 
topologies.  So generating new names is out for now, at least.
   
   The proposed fix is:
   
   1.  For KStream objects needing repartitioning _**and  using generated 
names**, reuse the repartition topic node in any additional joins.
   2. For KStream instances needing repartitioning _**using user-provided 
names**_ always create a new repartition topic node for each join as each one 
will have a unique name
   
   
   
   I've added tests confirming the expected behavior.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Reuse of a mapped stream causes an Invalid Topology
> ---
>
> Key: KAFKA-9298
> URL: https://issues.apache.org/jira/browse/KAFKA-9298
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Walker Carlson
>Assignee: Bill Bejeck
>Priority: Minor
>  Labels: join, streams
>
> Can be found with in the KStreamKStreamJoinTest.java
> {code:java}
> @Test
> public void optimizerIsEager() {
>   final StreamsBuilder builder = new StreamsBuilder();
>   final KStream stream1 = builder.stream("topic", 
> Consumed.with(Serdes.String(), Serdes.String()));
>   final KStream stream2 = builder.stream("topic2", 
> Consumed.with(Serdes.String(), Serdes.String()));
>   final KStream stream3 = builder.stream("topic3", 
> Consumed.with(Serdes.String(), Serdes.String()));
>   final KStream newStream = stream1.map((k, v) -> new 
> KeyValue<>(v, k));
>   newStream.join(stream2, (value1, value2) -> value1 + value2, 
> JoinWindows.of(ofMillis(100)), StreamJoined.with(Serdes.String(), 
> Serdes.String(), Serdes.String()));
>   newStream.join(stream3, (value1, value2) -> value1 + value2, 
> JoinWindows.of(ofMillis(100)), StreamJoined.with(Serdes.String(), 
> Serdes.String(), Serdes.String()));
>   System.err.println(builder.build().describe().toString());
> }
>  
> 

[jira] [Commented] (KAFKA-9298) Reuse of a mapped stream causes an Invalid Topology

2020-04-14 Thread Bill Bejeck (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9298?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17083736#comment-17083736
 ] 

Bill Bejeck commented on KAFKA-9298:


In Investigating this more to nail down when this bug occurred, it seems this 
has been an existing issue.  The following test fails with the same exception 
in 2.0, which is before the optimization layer was added.
{noformat}
@Test
public void testReuseJoin() {
final StreamsBuilder builder = new StreamsBuilder();
KStream stream1 = builder.stream("input");
KStream stream2 = builder.stream("input2");
KStream stream3 = builder.stream("input3");

KStream mappedStream = stream1.map(KeyValue::pair);
KStream joinOne = mappedStream.join(stream2, (v1, v2)-> v1 
+ v2, JoinWindows.of(5000));
KStream joinTwp = mappedStream.join(stream3, (v1, v2)-> v1 
+ v2, JoinWindows.of(5000));

System.out.println(builder.build().describe().toString());
}{noformat}

> Reuse of a mapped stream causes an Invalid Topology
> ---
>
> Key: KAFKA-9298
> URL: https://issues.apache.org/jira/browse/KAFKA-9298
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Walker Carlson
>Assignee: Bill Bejeck
>Priority: Minor
>  Labels: join, streams
>
> Can be found with in the KStreamKStreamJoinTest.java
> {code:java}
> @Test
> public void optimizerIsEager() {
>   final StreamsBuilder builder = new StreamsBuilder();
>   final KStream stream1 = builder.stream("topic", 
> Consumed.with(Serdes.String(), Serdes.String()));
>   final KStream stream2 = builder.stream("topic2", 
> Consumed.with(Serdes.String(), Serdes.String()));
>   final KStream stream3 = builder.stream("topic3", 
> Consumed.with(Serdes.String(), Serdes.String()));
>   final KStream newStream = stream1.map((k, v) -> new 
> KeyValue<>(v, k));
>   newStream.join(stream2, (value1, value2) -> value1 + value2, 
> JoinWindows.of(ofMillis(100)), StreamJoined.with(Serdes.String(), 
> Serdes.String(), Serdes.String()));
>   newStream.join(stream3, (value1, value2) -> value1 + value2, 
> JoinWindows.of(ofMillis(100)), StreamJoined.with(Serdes.String(), 
> Serdes.String(), Serdes.String()));
>   System.err.println(builder.build().describe().toString());
> }
>  
> {code}
> **results in 
>  **
>  Invalid topology: Topic KSTREAM-MAP-03-repartition has already been 
> registered by another source.
>  org.apache.kafka.streams.errors.TopologyException: Invalid topology: Topic 
> KSTREAM-MAP-03-repartition has already been registered by another 
> source.
>  at 
> org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.validateTopicNotAlreadyRegistered(InternalTopologyBuilder.java:578)
>  at 
> org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.addSource(InternalTopologyBuilder.java:378)
>  at 
> org.apache.kafka.streams.kstream.internals.graph.OptimizableRepartitionNode.writeToTopology(OptimizableRepartitionNode.java:100)
>  at 
> org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder.buildAndOptimizeTopology(InternalStreamsBuilder.java:303)
>  at org.apache.kafka.streams.StreamsBuilder.build(StreamsBuilder.java:562)
>  at org.apache.kafka.streams.StreamsBuilder.build(StreamsBuilder.java:551)
>  at 
> org.apache.kafka.streams.kstream.internals.KStreamKStreamJoinTest.optimizerIsEager(KStreamKStreamJoinTest.java:136)
>  at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
> Method)
>  at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>  at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>  at java.base/java.lang.reflect.Method.invoke(Method.java:566)
>  at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
>  at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>  at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
>  at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>  at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:305)
>  at 
> org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
>  at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:365)
>  at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
>  at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
>  at org.junit.runners.ParentRunner$4.run(ParentRunner.java:330)
>  at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:78)
>  at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:328)
>  at org.junit.runners.ParentRunner.access$10

[jira] [Commented] (KAFKA-9298) Reuse of a mapped stream causes an Invalid Topology

2020-04-05 Thread Bill Bejeck (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9298?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17075954#comment-17075954
 ] 

Bill Bejeck commented on KAFKA-9298:


Correct, I wasn't trying to suggest otherwise. 

I could be splitting hairs here, but I just wanted to be clear the issue was 
related to building the "standard" topology vs using the optimization process 
itself.  As I said in my previous comment, it's still a bug that needs fixing.

> Reuse of a mapped stream causes an Invalid Topology
> ---
>
> Key: KAFKA-9298
> URL: https://issues.apache.org/jira/browse/KAFKA-9298
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Walker Carlson
>Assignee: Bill Bejeck
>Priority: Minor
>  Labels: join, streams
>
> Can be found with in the KStreamKStreamJoinTest.java
> {code:java}
> @Test
> public void optimizerIsEager() {
>   final StreamsBuilder builder = new StreamsBuilder();
>   final KStream stream1 = builder.stream("topic", 
> Consumed.with(Serdes.String(), Serdes.String()));
>   final KStream stream2 = builder.stream("topic2", 
> Consumed.with(Serdes.String(), Serdes.String()));
>   final KStream stream3 = builder.stream("topic3", 
> Consumed.with(Serdes.String(), Serdes.String()));
>   final KStream newStream = stream1.map((k, v) -> new 
> KeyValue<>(v, k));
>   newStream.join(stream2, (value1, value2) -> value1 + value2, 
> JoinWindows.of(ofMillis(100)), StreamJoined.with(Serdes.String(), 
> Serdes.String(), Serdes.String()));
>   newStream.join(stream3, (value1, value2) -> value1 + value2, 
> JoinWindows.of(ofMillis(100)), StreamJoined.with(Serdes.String(), 
> Serdes.String(), Serdes.String()));
>   System.err.println(builder.build().describe().toString());
> }
>  
> {code}
> **results in 
>  **
>  Invalid topology: Topic KSTREAM-MAP-03-repartition has already been 
> registered by another source.
>  org.apache.kafka.streams.errors.TopologyException: Invalid topology: Topic 
> KSTREAM-MAP-03-repartition has already been registered by another 
> source.
>  at 
> org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.validateTopicNotAlreadyRegistered(InternalTopologyBuilder.java:578)
>  at 
> org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.addSource(InternalTopologyBuilder.java:378)
>  at 
> org.apache.kafka.streams.kstream.internals.graph.OptimizableRepartitionNode.writeToTopology(OptimizableRepartitionNode.java:100)
>  at 
> org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder.buildAndOptimizeTopology(InternalStreamsBuilder.java:303)
>  at org.apache.kafka.streams.StreamsBuilder.build(StreamsBuilder.java:562)
>  at org.apache.kafka.streams.StreamsBuilder.build(StreamsBuilder.java:551)
>  at 
> org.apache.kafka.streams.kstream.internals.KStreamKStreamJoinTest.optimizerIsEager(KStreamKStreamJoinTest.java:136)
>  at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
> Method)
>  at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>  at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>  at java.base/java.lang.reflect.Method.invoke(Method.java:566)
>  at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
>  at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>  at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
>  at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>  at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:305)
>  at 
> org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
>  at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:365)
>  at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
>  at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
>  at org.junit.runners.ParentRunner$4.run(ParentRunner.java:330)
>  at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:78)
>  at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:328)
>  at org.junit.runners.ParentRunner.access$100(ParentRunner.java:65)
>  at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:292)
>  at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:305)
>  at org.junit.runners.ParentRunner.run(ParentRunner.java:412)
>  at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:110)
>  at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58)
>  at 
> org.gradle.api.inter

[jira] [Commented] (KAFKA-9298) Reuse of a mapped stream causes an Invalid Topology

2020-04-05 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9298?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17075943#comment-17075943
 ] 

Matthias J. Sax commented on KAFKA-9298:


Interesting finding [~bbejeck] – the error seems to be in the "optimization 
layer" though (even if it's not related to rewrite rules): even if optimization 
is turned off, we still build up the GraphNodes DAG to create the `Topology` 
lazily at the end and the bug seems to be in this part of the code.

> Reuse of a mapped stream causes an Invalid Topology
> ---
>
> Key: KAFKA-9298
> URL: https://issues.apache.org/jira/browse/KAFKA-9298
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Walker Carlson
>Assignee: Bill Bejeck
>Priority: Minor
>  Labels: join, streams
>
> Can be found with in the KStreamKStreamJoinTest.java
> {code:java}
> @Test
> public void optimizerIsEager() {
>   final StreamsBuilder builder = new StreamsBuilder();
>   final KStream stream1 = builder.stream("topic", 
> Consumed.with(Serdes.String(), Serdes.String()));
>   final KStream stream2 = builder.stream("topic2", 
> Consumed.with(Serdes.String(), Serdes.String()));
>   final KStream stream3 = builder.stream("topic3", 
> Consumed.with(Serdes.String(), Serdes.String()));
>   final KStream newStream = stream1.map((k, v) -> new 
> KeyValue<>(v, k));
>   newStream.join(stream2, (value1, value2) -> value1 + value2, 
> JoinWindows.of(ofMillis(100)), StreamJoined.with(Serdes.String(), 
> Serdes.String(), Serdes.String()));
>   newStream.join(stream3, (value1, value2) -> value1 + value2, 
> JoinWindows.of(ofMillis(100)), StreamJoined.with(Serdes.String(), 
> Serdes.String(), Serdes.String()));
>   System.err.println(builder.build().describe().toString());
> }
>  
> {code}
> **results in 
>  **
>  Invalid topology: Topic KSTREAM-MAP-03-repartition has already been 
> registered by another source.
>  org.apache.kafka.streams.errors.TopologyException: Invalid topology: Topic 
> KSTREAM-MAP-03-repartition has already been registered by another 
> source.
>  at 
> org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.validateTopicNotAlreadyRegistered(InternalTopologyBuilder.java:578)
>  at 
> org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.addSource(InternalTopologyBuilder.java:378)
>  at 
> org.apache.kafka.streams.kstream.internals.graph.OptimizableRepartitionNode.writeToTopology(OptimizableRepartitionNode.java:100)
>  at 
> org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder.buildAndOptimizeTopology(InternalStreamsBuilder.java:303)
>  at org.apache.kafka.streams.StreamsBuilder.build(StreamsBuilder.java:562)
>  at org.apache.kafka.streams.StreamsBuilder.build(StreamsBuilder.java:551)
>  at 
> org.apache.kafka.streams.kstream.internals.KStreamKStreamJoinTest.optimizerIsEager(KStreamKStreamJoinTest.java:136)
>  at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
> Method)
>  at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>  at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>  at java.base/java.lang.reflect.Method.invoke(Method.java:566)
>  at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
>  at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>  at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
>  at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>  at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:305)
>  at 
> org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
>  at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:365)
>  at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
>  at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
>  at org.junit.runners.ParentRunner$4.run(ParentRunner.java:330)
>  at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:78)
>  at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:328)
>  at org.junit.runners.ParentRunner.access$100(ParentRunner.java:65)
>  at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:292)
>  at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:305)
>  at org.junit.runners.ParentRunner.run(ParentRunner.java:412)
>  at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:110)
>  at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58)
> 

[jira] [Commented] (KAFKA-9298) Reuse of a mapped stream causes an Invalid Topology

2020-04-04 Thread Bill Bejeck (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9298?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17075592#comment-17075592
 ] 

Bill Bejeck commented on KAFKA-9298:


The test method should not be {{optimizerIsEager }}as it's not using the 
optimizer.  The default setting in the configs is to not optimize.  Updating 
the test above to use optimization, no error occurs.  There's still a bug, but 
it's not related to optimization.
{noformat}
final StreamsBuilder builder = new StreamsBuilder();
final Properties props = new Properties();
props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION, StreamsConfig.OPTIMIZE);
final KStream stream1 = builder.stream("topic", 
Consumed.with(Serdes.String(), Serdes.String()));
final KStream stream2 = builder.stream("topic2", 
Consumed.with(Serdes.String(), Serdes.String()));
final KStream stream3 = builder.stream("topic3", 
Consumed.with(Serdes.String(), Serdes.String()));
final KStream newStream = stream1.map((k, v) -> new 
KeyValue<>(v, k));
newStream.join(stream2, (value1, value2) -> value1 + value2, 
JoinWindows.of(ofMillis(100)), StreamJoined.with(Serdes.String(), 
Serdes.String(), Serdes.String()));
newStream.join(stream3, (value1, value2) -> value1 + value2, 
JoinWindows.of(ofMillis(100)), StreamJoined.with(Serdes.String(), 
Serdes.String(), Serdes.String()));
System.out.println(builder.build(props).describe().toString());{noformat}
 

> Reuse of a mapped stream causes an Invalid Topology
> ---
>
> Key: KAFKA-9298
> URL: https://issues.apache.org/jira/browse/KAFKA-9298
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Walker Carlson
>Assignee: Bill Bejeck
>Priority: Minor
>  Labels: join, streams
>
> Can be found with in the KStreamKStreamJoinTest.java
> @Test
> public void optimizerIsEager() {
> final StreamsBuilder builder = new StreamsBuilder();
> final KStream stream1 = builder.stream("topic", 
> Consumed.with(Serdes.String(), Serdes.String()));
> final KStream stream2 = builder.stream("topic2", 
> Consumed.with(Serdes.String(), Serdes.String()));
> final KStream stream3 = builder.stream("topic3", 
> Consumed.with(Serdes.String(), Serdes.String()));
> final KStream newStream = stream1.map((k, v) -> new 
> KeyValue<>(v, k));
> newStream.join(stream2,
> (value1, value2) -> value1 + value2,
> JoinWindows.of(ofMillis(100)),
> StreamJoined.with(Serdes.String(), Serdes.String(), 
> Serdes.String()));
> newStream.join(stream3,
> (value1, value2) -> value1 + value2,
> JoinWindows.of(ofMillis(100)),
> StreamJoined.with(Serdes.String(), Serdes.String(), 
> Serdes.String()));
> System.err.println(builder.build().describe().toString());
> }
> **results in 
> **
> Invalid topology: Topic KSTREAM-MAP-03-repartition has already been 
> registered by another source.
> org.apache.kafka.streams.errors.TopologyException: Invalid topology: Topic 
> KSTREAM-MAP-03-repartition has already been registered by another 
> source.
>   at 
> org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.validateTopicNotAlreadyRegistered(InternalTopologyBuilder.java:578)
>   at 
> org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.addSource(InternalTopologyBuilder.java:378)
>   at 
> org.apache.kafka.streams.kstream.internals.graph.OptimizableRepartitionNode.writeToTopology(OptimizableRepartitionNode.java:100)
>   at 
> org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder.buildAndOptimizeTopology(InternalStreamsBuilder.java:303)
>   at 
> org.apache.kafka.streams.StreamsBuilder.build(StreamsBuilder.java:562)
>   at 
> org.apache.kafka.streams.StreamsBuilder.build(StreamsBuilder.java:551)
>   at 
> org.apache.kafka.streams.kstream.internals.KStreamKStreamJoinTest.optimizerIsEager(KStreamKStreamJoinTest.java:136)
>   at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.base/java.lang.reflect.Method.invoke(Method.java:566)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at org.junit.runners.ParentRunner

[jira] [Commented] (KAFKA-9298) Reuse of a mapped stream causes an Invalid Topology

2019-12-13 Thread Walker Carlson (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9298?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16995970#comment-16995970
 ] 

Walker Carlson commented on KAFKA-9298:
---

 We solved a similar bug for cogoup in 
https://github.com/apache/kafka/pull/7792 #CogroupedStreamAggregateBuilder.java

> Reuse of a mapped stream causes an Invalid Topology
> ---
>
> Key: KAFKA-9298
> URL: https://issues.apache.org/jira/browse/KAFKA-9298
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Walker Carlson
>Priority: Minor
>  Labels: join, streams
>
> Can be found with in the KStreamKStreamJoinTest.java
> @Test
> public void optimizerIsEager() {
> final StreamsBuilder builder = new StreamsBuilder();
> final KStream stream1 = builder.stream("topic", 
> Consumed.with(Serdes.String(), Serdes.String()));
> final KStream stream2 = builder.stream("topic2", 
> Consumed.with(Serdes.String(), Serdes.String()));
> final KStream stream3 = builder.stream("topic3", 
> Consumed.with(Serdes.String(), Serdes.String()));
> final KStream newStream = stream1.map((k, v) -> new 
> KeyValue<>(v, k));
> newStream.join(stream2,
> (value1, value2) -> value1 + value2,
> JoinWindows.of(ofMillis(100)),
> StreamJoined.with(Serdes.String(), Serdes.String(), 
> Serdes.String()));
> newStream.join(stream3,
> (value1, value2) -> value1 + value2,
> JoinWindows.of(ofMillis(100)),
> StreamJoined.with(Serdes.String(), Serdes.String(), 
> Serdes.String()));
> System.err.println(builder.build().describe().toString());
> }
> **results in 
> **
> Invalid topology: Topic KSTREAM-MAP-03-repartition has already been 
> registered by another source.
> org.apache.kafka.streams.errors.TopologyException: Invalid topology: Topic 
> KSTREAM-MAP-03-repartition has already been registered by another 
> source.
>   at 
> org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.validateTopicNotAlreadyRegistered(InternalTopologyBuilder.java:578)
>   at 
> org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.addSource(InternalTopologyBuilder.java:378)
>   at 
> org.apache.kafka.streams.kstream.internals.graph.OptimizableRepartitionNode.writeToTopology(OptimizableRepartitionNode.java:100)
>   at 
> org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder.buildAndOptimizeTopology(InternalStreamsBuilder.java:303)
>   at 
> org.apache.kafka.streams.StreamsBuilder.build(StreamsBuilder.java:562)
>   at 
> org.apache.kafka.streams.StreamsBuilder.build(StreamsBuilder.java:551)
>   at 
> org.apache.kafka.streams.kstream.internals.KStreamKStreamJoinTest.optimizerIsEager(KStreamKStreamJoinTest.java:136)
>   at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.base/java.lang.reflect.Method.invoke(Method.java:566)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:305)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
>   at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:365)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
>   at org.junit.runners.ParentRunner$4.run(ParentRunner.java:330)
>   at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:78)
>   at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:328)
>   at org.junit.runners.ParentRunner.access$100(ParentRunner.java:65)
>   at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:292)
>   at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:305)
>   at org.junit.runners.ParentRunner.run(ParentRunner.java:412)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:110)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.exec