guozhangwang commented on a change in pull request #10613: URL: https://github.com/apache/kafka/pull/10613#discussion_r624104819
########## File path: streams/src/test/java/org/apache/kafka/streams/TopologyTest.java ########## @@ -863,6 +920,60 @@ public void streamStreamLeftJoinTopologyWithCustomStoresNames() { describe.toString()); } + @Test + public void streamStreamLeftJoinTopologyWithCustomStoresSuppliers() { + final StreamsBuilder builder = new StreamsBuilder(); + final KStream<Integer, String> stream1; + final KStream<Integer, String> stream2; + + stream1 = builder.stream("input-topic1"); + stream2 = builder.stream("input-topic2"); + + final JoinWindows joinWindows = JoinWindows.of(ofMillis(100)); + + final WindowBytesStoreSupplier thisStoreSupplier = Stores.inMemoryWindowStore("in-memory-join-store", + Duration.ofMillis(joinWindows.size() + joinWindows.gracePeriodMs()), + Duration.ofMillis(joinWindows.size()), true); + + final WindowBytesStoreSupplier otherStoreSupplier = Stores.inMemoryWindowStore("in-memory-join-store-other", + Duration.ofMillis(joinWindows.size() + joinWindows.gracePeriodMs()), + Duration.ofMillis(joinWindows.size()), true); + + stream1.leftJoin( + stream2, + MockValueJoiner.TOSTRING_JOINER, + joinWindows, + StreamJoined.with(Serdes.Integer(), Serdes.String(), Serdes.String()) + .withThisStoreSupplier(thisStoreSupplier) + .withOtherStoreSupplier(otherStoreSupplier)); + + final TopologyDescription describe = builder.build().describe(); + + assertEquals( + "Topologies:\n" + + " Sub-topology: 0\n" + + " Source: KSTREAM-SOURCE-0000000000 (topics: [input-topic1])\n" + + " --> KSTREAM-WINDOWED-0000000002\n" + + " Source: KSTREAM-SOURCE-0000000001 (topics: [input-topic2])\n" + + " --> KSTREAM-WINDOWED-0000000003\n" + + " Processor: KSTREAM-WINDOWED-0000000002 (stores: [in-memory-join-store])\n" + + " --> KSTREAM-JOINTHIS-0000000004\n" + + " <-- KSTREAM-SOURCE-0000000000\n" + + " Processor: KSTREAM-WINDOWED-0000000003 (stores: [in-memory-join-store-other])\n" + + " --> KSTREAM-OUTEROTHER-0000000005\n" + + " <-- KSTREAM-SOURCE-0000000001\n" + + " Processor: KSTREAM-JOINTHIS-0000000004 (stores: [in-memory-join-store-other, KSTREAM-OUTERSHARED-0000000004-memory-store])\n" + Review comment: Hmm.. this makes me thinking: does it worth "working around" it to move the naming mechanism of the shared store to `sharedOuterJoinWindowStoreBuilder` above such that it always goes along with the other two store's naming patterns? As you can see here, if the store names are not provided but just the store suppliers, the existing stores would use customized name but the shared store would still use system-provided names. ########## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java ########## @@ -155,7 +156,7 @@ public long get() { final String outerJoinStoreName = userProvidedBaseStoreName == null ? outerJoinStoreGeneratedName : userProvidedBaseStoreName + outerJoinSuffix; - outerJoinWindowStore = Optional.of(sharedOuterJoinWindowStoreBuilder(outerJoinStoreName, windows, streamJoinedInternal)); + outerJoinWindowStore = Optional.of(sharedOuterJoinWindowStoreBuilder(outerJoinStoreName, windows, streamJoinedInternal, persistent)); Review comment: Just curious: any rationale that we rely on the left store's persistence only to determine the shared store's persistence? ########## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java ########## @@ -266,17 +267,36 @@ private void assertUniqueStoreNames(final WindowBytesStoreSupplier supplier, @SuppressWarnings("unchecked") private static <K, V1, V2> StoreBuilder<WindowStore<KeyAndJoinSide<K>, LeftOrRightValue<V1, V2>>> sharedOuterJoinWindowStoreBuilder(final String storeName, final JoinWindows windows, - final StreamJoinedInternal<K, V1, V2> streamJoinedInternal) { - final StoreBuilder<WindowStore<KeyAndJoinSide<K>, LeftOrRightValue<V1, V2>>> builder = new TimeOrderedWindowStoreBuilder<KeyAndJoinSide<K>, LeftOrRightValue<V1, V2>>( - persistentTimeOrderedWindowStore( - storeName + "-store", - Duration.ofMillis(windows.size() + windows.gracePeriodMs()), - Duration.ofMillis(windows.size()) - ), - new KeyAndJoinSideSerde<>(streamJoinedInternal.keySerde()), - new LeftOrRightValueSerde(streamJoinedInternal.valueSerde(), streamJoinedInternal.otherValueSerde()), - Time.SYSTEM - ); + final StreamJoinedInternal<K, V1, V2> streamJoinedInternal, + final boolean persistent) { + final KeyAndJoinSideSerde keyAndJoinSideSerde = new KeyAndJoinSideSerde<>(streamJoinedInternal.keySerde()); + final LeftOrRightValueSerde leftOrRightValueSerde = new LeftOrRightValueSerde(streamJoinedInternal.valueSerde(), streamJoinedInternal.otherValueSerde()); + + final StoreBuilder<WindowStore<KeyAndJoinSide<K>, LeftOrRightValue<V1, V2>>> builder; + if (persistent) { + builder = new TimeOrderedWindowStoreBuilder<KeyAndJoinSide<K>, LeftOrRightValue<V1, V2>>( + persistentTimeOrderedWindowStore( + storeName + "-store", + Duration.ofMillis(windows.size() + windows.gracePeriodMs()), + Duration.ofMillis(windows.size()) + ), + keyAndJoinSideSerde, + leftOrRightValueSerde, + Time.SYSTEM + ); + } else { + builder = Stores.windowStoreBuilder( + Stores.inMemoryWindowStore( + storeName + "-memory-store", Review comment: I think we should use the same name here; the metrics-scope would differentiate rocksdb from in-memory, which is sufficient. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org