guozhangwang commented on a change in pull request #10613:
URL: https://github.com/apache/kafka/pull/10613#discussion_r624104819



##########
File path: streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
##########
@@ -863,6 +920,60 @@ public void 
streamStreamLeftJoinTopologyWithCustomStoresNames() {
             describe.toString());
     }
 
+    @Test
+    public void streamStreamLeftJoinTopologyWithCustomStoresSuppliers() {
+        final StreamsBuilder builder  = new StreamsBuilder();
+        final KStream<Integer, String> stream1;
+        final KStream<Integer, String> stream2;
+
+        stream1 = builder.stream("input-topic1");
+        stream2 = builder.stream("input-topic2");
+
+        final JoinWindows joinWindows = JoinWindows.of(ofMillis(100));
+
+        final WindowBytesStoreSupplier thisStoreSupplier = 
Stores.inMemoryWindowStore("in-memory-join-store",
+            Duration.ofMillis(joinWindows.size() + 
joinWindows.gracePeriodMs()),
+            Duration.ofMillis(joinWindows.size()), true);
+
+        final WindowBytesStoreSupplier otherStoreSupplier = 
Stores.inMemoryWindowStore("in-memory-join-store-other",
+            Duration.ofMillis(joinWindows.size() + 
joinWindows.gracePeriodMs()),
+            Duration.ofMillis(joinWindows.size()), true);
+
+        stream1.leftJoin(
+            stream2,
+            MockValueJoiner.TOSTRING_JOINER,
+            joinWindows,
+            StreamJoined.with(Serdes.Integer(), Serdes.String(), 
Serdes.String())
+                .withThisStoreSupplier(thisStoreSupplier)
+                .withOtherStoreSupplier(otherStoreSupplier));
+
+        final TopologyDescription describe = builder.build().describe();
+
+        assertEquals(
+            "Topologies:\n" +
+                "   Sub-topology: 0\n" +
+                "    Source: KSTREAM-SOURCE-0000000000 (topics: 
[input-topic1])\n" +
+                "      --> KSTREAM-WINDOWED-0000000002\n" +
+                "    Source: KSTREAM-SOURCE-0000000001 (topics: 
[input-topic2])\n" +
+                "      --> KSTREAM-WINDOWED-0000000003\n" +
+                "    Processor: KSTREAM-WINDOWED-0000000002 (stores: 
[in-memory-join-store])\n" +
+                "      --> KSTREAM-JOINTHIS-0000000004\n" +
+                "      <-- KSTREAM-SOURCE-0000000000\n" +
+                "    Processor: KSTREAM-WINDOWED-0000000003 (stores: 
[in-memory-join-store-other])\n" +
+                "      --> KSTREAM-OUTEROTHER-0000000005\n" +
+                "      <-- KSTREAM-SOURCE-0000000001\n" +
+                "    Processor: KSTREAM-JOINTHIS-0000000004 (stores: 
[in-memory-join-store-other, KSTREAM-OUTERSHARED-0000000004-memory-store])\n" +

Review comment:
       Hmm.. this makes me thinking: does it worth "working around" it to move 
the naming mechanism of the shared store to `sharedOuterJoinWindowStoreBuilder` 
above such that it always goes along with the other two store's naming 
patterns? As you can see here, if the store names are not provided but just the 
store suppliers, the existing stores would use customized name but the shared 
store would still use system-provided names.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java
##########
@@ -155,7 +156,7 @@ public long get() {
 
             final String outerJoinStoreName = userProvidedBaseStoreName == 
null ? outerJoinStoreGeneratedName : userProvidedBaseStoreName + 
outerJoinSuffix;
 
-            outerJoinWindowStore = 
Optional.of(sharedOuterJoinWindowStoreBuilder(outerJoinStoreName, windows, 
streamJoinedInternal));
+            outerJoinWindowStore = 
Optional.of(sharedOuterJoinWindowStoreBuilder(outerJoinStoreName, windows, 
streamJoinedInternal, persistent));

Review comment:
       Just curious: any rationale that we rely on the left store's persistence 
only to determine the shared store's persistence?

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java
##########
@@ -266,17 +267,36 @@ private void assertUniqueStoreNames(final 
WindowBytesStoreSupplier supplier,
     @SuppressWarnings("unchecked")
     private static <K, V1, V2> StoreBuilder<WindowStore<KeyAndJoinSide<K>, 
LeftOrRightValue<V1, V2>>> sharedOuterJoinWindowStoreBuilder(final String 
storeName,
                                                                                
                                                         final JoinWindows 
windows,
-                                                                               
                                                         final 
StreamJoinedInternal<K, V1, V2> streamJoinedInternal) {
-        final StoreBuilder<WindowStore<KeyAndJoinSide<K>, LeftOrRightValue<V1, 
V2>>> builder = new TimeOrderedWindowStoreBuilder<KeyAndJoinSide<K>, 
LeftOrRightValue<V1, V2>>(
-            persistentTimeOrderedWindowStore(
-                storeName + "-store",
-                Duration.ofMillis(windows.size() + windows.gracePeriodMs()),
-                Duration.ofMillis(windows.size())
-            ),
-            new KeyAndJoinSideSerde<>(streamJoinedInternal.keySerde()),
-            new LeftOrRightValueSerde(streamJoinedInternal.valueSerde(), 
streamJoinedInternal.otherValueSerde()),
-            Time.SYSTEM
-        );
+                                                                               
                                                         final 
StreamJoinedInternal<K, V1, V2> streamJoinedInternal,
+                                                                               
                                                         final boolean 
persistent) {
+        final KeyAndJoinSideSerde keyAndJoinSideSerde = new 
KeyAndJoinSideSerde<>(streamJoinedInternal.keySerde());
+        final LeftOrRightValueSerde leftOrRightValueSerde = new 
LeftOrRightValueSerde(streamJoinedInternal.valueSerde(), 
streamJoinedInternal.otherValueSerde());
+
+        final StoreBuilder<WindowStore<KeyAndJoinSide<K>, LeftOrRightValue<V1, 
V2>>> builder;
+        if (persistent) {
+            builder = new TimeOrderedWindowStoreBuilder<KeyAndJoinSide<K>, 
LeftOrRightValue<V1, V2>>(
+                persistentTimeOrderedWindowStore(
+                    storeName + "-store",
+                    Duration.ofMillis(windows.size() + 
windows.gracePeriodMs()),
+                    Duration.ofMillis(windows.size())
+                ),
+                keyAndJoinSideSerde,
+                leftOrRightValueSerde,
+                Time.SYSTEM
+            );
+        } else {
+            builder = Stores.windowStoreBuilder(
+                Stores.inMemoryWindowStore(
+                    storeName + "-memory-store",

Review comment:
       I think we should use the same name here; the metrics-scope would 
differentiate rocksdb from in-memory, which is sufficient.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to