lct45 commented on a change in pull request #9708: URL: https://github.com/apache/kafka/pull/9708#discussion_r540431368
########## File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java ########## @@ -152,6 +155,52 @@ private void shouldLogAndMeterOnSkippedRecordsWithNullValue(final String builtIn } } + @Test + public void shouldDisableLoggingOnStreamJoined() { + + final JoinWindows joinWindows = JoinWindows.of(ofMillis(100)).grace(Duration.ofMillis(50)); + final StreamJoined<String, Integer, Integer> streamJoined = StreamJoined.with(Serdes.String(), Serdes.Integer(), Serdes.Integer()).withStoreName("store").withLoggingDisabled(); + + final StreamsBuilder builder = new StreamsBuilder(); + final KStream<String, Integer> left = builder.stream("left", Consumed.with(Serdes.String(), Serdes.Integer())); + final KStream<String, Integer> right = builder.stream("right", Consumed.with(Serdes.String(), Serdes.Integer())); + + left.join( + right, + (value1, value2) -> value1 + value2, + joinWindows, + streamJoined); + + final Topology topology = builder.build(); + final InternalTopologyBuilder internalTopologyBuilder = TopologyWrapper.getInternalTopologyBuilder(topology); + + assertThat(internalTopologyBuilder.stateStores().get("store-this-join-store").loggingEnabled(), equalTo(false)); + assertThat(internalTopologyBuilder.stateStores().get("store-other-join-store").loggingEnabled(), equalTo(false)); + } + + @Test + public void shouldEnableLoggingWithCustomConfigOnStreamJoined() { + + final JoinWindows joinWindows = JoinWindows.of(ofMillis(100)).grace(Duration.ofMillis(50)); + final StreamJoined<String, Integer, Integer> streamJoined = StreamJoined.with(Serdes.String(), Serdes.Integer(), Serdes.Integer()).withStoreName("store").withLoggingEnabled(Collections.emptyMap()); + + final StreamsBuilder builder = new StreamsBuilder(); + final KStream<String, Integer> left = builder.stream("left", Consumed.with(Serdes.String(), Serdes.Integer())); + final KStream<String, Integer> right = builder.stream("right", Consumed.with(Serdes.String(), Serdes.Integer())); + + left.join( + right, + (value1, value2) -> value1 + value2, + joinWindows, + streamJoined); + + final Topology topology = builder.build(); + final InternalTopologyBuilder internalTopologyBuilder = TopologyWrapper.getInternalTopologyBuilder(topology); + + assertThat(internalTopologyBuilder.stateStores().get("store-this-join-store").loggingEnabled(), equalTo(true)); + assertThat(internalTopologyBuilder.stateStores().get("store-other-join-store").loggingEnabled(), equalTo(true)); Review comment: Yeah it does pass by default. I experimented with passing the logs in but I wasn't able to find a good way to confirm that the logs I was passing were getting set somewhere. Do you know where they're exposed for me to check? Materialized doesn't seem to check this either ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org