ableegoldman commented on code in PR #18048:
URL: https://github.com/apache/kafka/pull/18048#discussion_r1879285311
##########
streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java:
##########
@@ -1816,6 +1816,102 @@ public void
shouldWrapProcessorsForStreamTableJoinWithGracePeriod() {
assertThat(counter.numConnectedStateStores(), CoreMatchers.is(2));
}
+ @Test
+ public void shouldWrapProcessorsForTableTableInnerJoin() {
+ final Map<Object, Object> props = dummyStreamsConfigMap();
+ props.put(PROCESSOR_WRAPPER_CLASS_CONFIG,
RecordingProcessorWrapper.class);
+
+ final WrapperRecorder counter = new WrapperRecorder();
+ props.put(PROCESSOR_WRAPPER_COUNTER_CONFIG, counter);
+
+ final StreamsBuilder builder = new StreamsBuilder(new
TopologyConfig(new StreamsConfig(props)));
+
+ final KTable<String, String> t1 = builder.table("input1",
Consumed.as("input1")); // 1
+ final KTable<String, String> t2 = builder.table("input2",
Consumed.as("input2")); // 2
+
+ t1.join(t2, (v1, v2) -> v1 + v2, Named.as("join-processor")) // 3
(this), 4 (other), 5 (merger)
Review Comment:
ok while investigating
[this](https://github.com/apache/kafka/pull/18048/files#r1878665564) my whole
understanding of table-table joins unraveled and came back together, which lead
me to realize that we're actually not testing the new `#stores` methods being
implemented in this PR because we aren't materializing the join. Apparently the
two stores here are just the original upstream KTable stores being
materialized, and the join is performed via a "value getter" on the upstream
KTable state store rather than on a state store belonging to the join itself
(it's an optimization)
The stores we're returning in the new `#stores` implementations are not
what's used to perform the join itself, but essentially contain the output of
the join. If you don't materialize the join, this output just gets forwarded
but not persisted -- all this is to say, we should pass a `Materialized` into
these joins
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]