mjsax commented on a change in pull request #8504:
URL: https://github.com/apache/kafka/pull/8504#discussion_r424060697



##########
File path: 
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
##########
@@ -77,6 +79,38 @@ public void 
shouldLogAndMeterOnSkippedRecordsWithNullValueWithBuiltInMetricsVers
         
shouldLogAndMeterOnSkippedRecordsWithNullValue(StreamsConfig.METRICS_LATEST);
     }
 
+
+    @Test
+    public void shouldReuseRepartitionTopicWithGeneratedName() {
+        final StreamsBuilder builder = new StreamsBuilder();
+        final Properties props = new Properties();
+        props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION, 
StreamsConfig.NO_OPTIMIZATION);
+        final KStream<String, String> stream1 = builder.stream("topic", 
Consumed.with(Serdes.String(), Serdes.String()));
+        final KStream<String, String> stream2 = builder.stream("topic2", 
Consumed.with(Serdes.String(), Serdes.String()));
+        final KStream<String, String> stream3 = builder.stream("topic3", 
Consumed.with(Serdes.String(), Serdes.String()));
+        final KStream<String, String> newStream = stream1.map((k, v) -> new 
KeyValue<>(v, k));
+        newStream.join(stream2, (value1, value2) -> value1 + value2, 
JoinWindows.of(ofMillis(100))).to("out-one");
+        newStream.join(stream3, (value1, value2) -> value1 + value2, 
JoinWindows.of(ofMillis(100))).to("out-to");
+        assertEquals(expectedTopologyWithGeneratedRepartitionTopic, 
builder.build(props).describe().toString());
+    }
+
+    @Test
+    public void shouldCreateRepartitionTopicsWithUserProvidedName() {
+        final StreamsBuilder builder = new StreamsBuilder();
+        final Properties props = new Properties();
+        props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION, 
StreamsConfig.NO_OPTIMIZATION);
+        final KStream<String, String> stream1 = builder.stream("topic", 
Consumed.with(Serdes.String(), Serdes.String()));
+        final KStream<String, String> stream2 = builder.stream("topic2", 
Consumed.with(Serdes.String(), Serdes.String()));
+        final KStream<String, String> stream3 = builder.stream("topic3", 
Consumed.with(Serdes.String(), Serdes.String()));
+        final KStream<String, String> newStream = stream1.map((k, v) -> new 
KeyValue<>(v, k));
+        final StreamJoined<String, String, String> streamJoined = 
StreamJoined.with(Serdes.String(), Serdes.String(), Serdes.String());
+        newStream.join(stream2, (value1, value2) -> value1 + value2, 
JoinWindows.of(ofMillis(100)), 
streamJoined.withName("first-join")).to("out-one");
+        newStream.join(stream3, (value1, value2) -> value1 + value2, 
JoinWindows.of(ofMillis(100)), 
streamJoined.withName("second-join")).to("out-two");
+        final Topology topology =  builder.build(props);
+        System.out.println(topology.describe().toString());
+        assertEquals(expectedTopologyWithUserNamedRepartitionTopics, 
topology.describe().toString());

Review comment:
       Sorry for being undecided... Reading the code now, I am wondering if 
this behavior may become problematic with regard to topology upgrade. Assume, 
the first join is removed. Technically, the new topology is compatible, but we 
would now generate a new repartition topic name, and thus it's not compatible. 
This could be fixed by inserting a `repartition()` in the new code enforcing 
the old name -- however, this make me wonder if we might want to throw a 
"naming conflict" (ie, cannot pick a name) exception based on the original 
topology for this case when both operators are named, and tell people to insert 
`repartition()` right away? For this case, if they later remove a join it's 
clear what is happening to them.
   
   Ie, we should still not create two repartition topics what would be "bad" 
(user could still enforce if by calling `repartition()` twice), but just throw 
with an informative error message? -- Curious what @vvcephei thinks?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to