lct45 commented on a change in pull request #9708:
URL: https://github.com/apache/kafka/pull/9708#discussion_r540431368



##########
File path: 
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
##########
@@ -152,6 +155,52 @@ private void 
shouldLogAndMeterOnSkippedRecordsWithNullValue(final String builtIn
         }
     }
 
+    @Test
+    public void shouldDisableLoggingOnStreamJoined() {
+
+        final JoinWindows joinWindows = 
JoinWindows.of(ofMillis(100)).grace(Duration.ofMillis(50));
+        final StreamJoined<String, Integer, Integer> streamJoined = 
StreamJoined.with(Serdes.String(), Serdes.Integer(), 
Serdes.Integer()).withStoreName("store").withLoggingDisabled();
+
+        final StreamsBuilder builder = new StreamsBuilder();
+        final KStream<String, Integer> left = builder.stream("left", 
Consumed.with(Serdes.String(), Serdes.Integer()));
+        final KStream<String, Integer> right = builder.stream("right", 
Consumed.with(Serdes.String(), Serdes.Integer()));
+
+        left.join(
+            right,
+            (value1, value2) -> value1 + value2,
+            joinWindows,
+            streamJoined);
+
+        final Topology topology = builder.build();
+        final InternalTopologyBuilder internalTopologyBuilder = 
TopologyWrapper.getInternalTopologyBuilder(topology);
+
+        
assertThat(internalTopologyBuilder.stateStores().get("store-this-join-store").loggingEnabled(),
 equalTo(false));
+        
assertThat(internalTopologyBuilder.stateStores().get("store-other-join-store").loggingEnabled(),
 equalTo(false));
+    }
+
+    @Test
+    public void shouldEnableLoggingWithCustomConfigOnStreamJoined() {
+
+        final JoinWindows joinWindows = 
JoinWindows.of(ofMillis(100)).grace(Duration.ofMillis(50));
+        final StreamJoined<String, Integer, Integer> streamJoined = 
StreamJoined.with(Serdes.String(), Serdes.Integer(), 
Serdes.Integer()).withStoreName("store").withLoggingEnabled(Collections.emptyMap());
+
+        final StreamsBuilder builder = new StreamsBuilder();
+        final KStream<String, Integer> left = builder.stream("left", 
Consumed.with(Serdes.String(), Serdes.Integer()));
+        final KStream<String, Integer> right = builder.stream("right", 
Consumed.with(Serdes.String(), Serdes.Integer()));
+
+        left.join(
+            right,
+            (value1, value2) -> value1 + value2,
+            joinWindows,
+            streamJoined);
+
+        final Topology topology = builder.build();
+        final InternalTopologyBuilder internalTopologyBuilder = 
TopologyWrapper.getInternalTopologyBuilder(topology);
+
+        
assertThat(internalTopologyBuilder.stateStores().get("store-this-join-store").loggingEnabled(),
 equalTo(true));
+        
assertThat(internalTopologyBuilder.stateStores().get("store-other-join-store").loggingEnabled(),
 equalTo(true));

Review comment:
       Yeah it does pass by default. I experimented with passing the logs in 
but I wasn't able to find a good way to confirm that the logs I was passing 
were getting set somewhere. Do you know where they're exposed for me to check? 
Materialized doesn't seem to check this either




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to