cadonna commented on a change in pull request #9708:
URL: https://github.com/apache/kafka/pull/9708#discussion_r543222611



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/StreamJoined.java
##########
@@ -36,6 +39,8 @@
     protected final WindowBytesStoreSupplier otherStoreSupplier;
     protected final String name;
     protected final String storeName;
+    protected boolean loggingEnabled;
+    protected Map<String, String> topicConfig;

Review comment:
       I guess both can be declared as `final`, right?

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
##########
@@ -152,6 +155,52 @@ private void 
shouldLogAndMeterOnSkippedRecordsWithNullValue(final String builtIn
         }
     }
 
+    @Test
+    public void shouldDisableLoggingOnStreamJoined() {
+
+        final JoinWindows joinWindows = 
JoinWindows.of(ofMillis(100)).grace(Duration.ofMillis(50));
+        final StreamJoined<String, Integer, Integer> streamJoined = 
StreamJoined.with(Serdes.String(), Serdes.Integer(), 
Serdes.Integer()).withStoreName("store").withLoggingDisabled();
+
+        final StreamsBuilder builder = new StreamsBuilder();
+        final KStream<String, Integer> left = builder.stream("left", 
Consumed.with(Serdes.String(), Serdes.Integer()));
+        final KStream<String, Integer> right = builder.stream("right", 
Consumed.with(Serdes.String(), Serdes.Integer()));
+
+        left.join(
+            right,
+            (value1, value2) -> value1 + value2,
+            joinWindows,
+            streamJoined);

Review comment:
       ```suggestion
           left.join(
               right,
               (value1, value2) -> value1 + value2,
               joinWindows,
               streamJoined
           );
   ```

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
##########
@@ -152,6 +155,52 @@ private void 
shouldLogAndMeterOnSkippedRecordsWithNullValue(final String builtIn
         }
     }
 
+    @Test
+    public void shouldDisableLoggingOnStreamJoined() {
+
+        final JoinWindows joinWindows = 
JoinWindows.of(ofMillis(100)).grace(Duration.ofMillis(50));
+        final StreamJoined<String, Integer, Integer> streamJoined = 
StreamJoined.with(Serdes.String(), Serdes.Integer(), 
Serdes.Integer()).withStoreName("store").withLoggingDisabled();

Review comment:
       Suggestion:
   ```suggestion
           final StreamJoined<String, Integer, Integer> streamJoined = 
StreamJoined
               .with(Serdes.String(), Serdes.Integer(), Serdes.Integer())
               .withStoreName("store")
               .withLoggingDisabled();
   ```

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
##########
@@ -152,6 +155,52 @@ private void 
shouldLogAndMeterOnSkippedRecordsWithNullValue(final String builtIn
         }
     }
 
+    @Test
+    public void shouldDisableLoggingOnStreamJoined() {
+
+        final JoinWindows joinWindows = 
JoinWindows.of(ofMillis(100)).grace(Duration.ofMillis(50));
+        final StreamJoined<String, Integer, Integer> streamJoined = 
StreamJoined.with(Serdes.String(), Serdes.Integer(), 
Serdes.Integer()).withStoreName("store").withLoggingDisabled();
+
+        final StreamsBuilder builder = new StreamsBuilder();
+        final KStream<String, Integer> left = builder.stream("left", 
Consumed.with(Serdes.String(), Serdes.Integer()));
+        final KStream<String, Integer> right = builder.stream("right", 
Consumed.with(Serdes.String(), Serdes.Integer()));
+
+        left.join(
+            right,
+            (value1, value2) -> value1 + value2,
+            joinWindows,
+            streamJoined);
+
+        final Topology topology = builder.build();
+        final InternalTopologyBuilder internalTopologyBuilder = 
TopologyWrapper.getInternalTopologyBuilder(topology);
+
+        
assertThat(internalTopologyBuilder.stateStores().get("store-this-join-store").loggingEnabled(),
 equalTo(false));
+        
assertThat(internalTopologyBuilder.stateStores().get("store-other-join-store").loggingEnabled(),
 equalTo(false));
+    }
+
+    @Test
+    public void shouldEnableLoggingWithCustomConfigOnStreamJoined() {
+
+        final JoinWindows joinWindows = 
JoinWindows.of(ofMillis(100)).grace(Duration.ofMillis(50));
+        final StreamJoined<String, Integer, Integer> streamJoined = 
StreamJoined.with(Serdes.String(), Serdes.Integer(), 
Serdes.Integer()).withStoreName("store").withLoggingEnabled(Collections.emptyMap());
+
+        final StreamsBuilder builder = new StreamsBuilder();
+        final KStream<String, Integer> left = builder.stream("left", 
Consumed.with(Serdes.String(), Serdes.Integer()));
+        final KStream<String, Integer> right = builder.stream("right", 
Consumed.with(Serdes.String(), Serdes.Integer()));
+
+        left.join(
+            right,
+            (value1, value2) -> value1 + value2,
+            joinWindows,
+            streamJoined);
+
+        final Topology topology = builder.build();
+        final InternalTopologyBuilder internalTopologyBuilder = 
TopologyWrapper.getInternalTopologyBuilder(topology);
+
+        
assertThat(internalTopologyBuilder.stateStores().get("store-this-join-store").loggingEnabled(),
 equalTo(true));
+        
assertThat(internalTopologyBuilder.stateStores().get("store-other-join-store").loggingEnabled(),
 equalTo(true));

Review comment:
       Maybe there is an easier way, but I found the following:
   
   Set the config to:
   
   ```
           final StreamJoined<String, Integer, Integer> streamJoined = 
StreamJoined
               .with(Serdes.String(), Serdes.Integer(), Serdes.Integer())
               .withStoreName("store")
               .withLoggingEnabled(Collections.singletonMap("test", 
"property"));
   ```
   and then check it:
   
   ```
           internalTopologyBuilder.buildSubtopology(0);
   
           
assertThat(internalTopologyBuilder.stateStores().get("store-this-join-store").loggingEnabled(),
 equalTo(true));
           
assertThat(internalTopologyBuilder.stateStores().get("store-other-join-store").loggingEnabled(),
 equalTo(true));
           
assertThat(internalTopologyBuilder.topicGroups().get(0).stateChangelogTopics.size(),
 equalTo(2));
           for (final InternalTopicConfig config : 
internalTopologyBuilder.topicGroups().get(0).stateChangelogTopics.values()) {
               assertThat(
                   config.getProperties(Collections.emptyMap(), 0).get("test"),
                   equalTo("property")
               );
           }
   ```
   
   Without
   
   ```
   
assertThat(internalTopologyBuilder.topicGroups().get(0).stateChangelogTopics.size(),
 equalTo(2));
   ```
   
   the test would pass without checking the config if `buildSubtopology()` is 
not called because no changelog topics would be registered in the topology. So 
it basically checks that `buildSubtopology()` is called.

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
##########
@@ -152,6 +155,52 @@ private void 
shouldLogAndMeterOnSkippedRecordsWithNullValue(final String builtIn
         }
     }
 
+    @Test
+    public void shouldDisableLoggingOnStreamJoined() {
+
+        final JoinWindows joinWindows = 
JoinWindows.of(ofMillis(100)).grace(Duration.ofMillis(50));
+        final StreamJoined<String, Integer, Integer> streamJoined = 
StreamJoined.with(Serdes.String(), Serdes.Integer(), 
Serdes.Integer()).withStoreName("store").withLoggingDisabled();
+
+        final StreamsBuilder builder = new StreamsBuilder();
+        final KStream<String, Integer> left = builder.stream("left", 
Consumed.with(Serdes.String(), Serdes.Integer()));
+        final KStream<String, Integer> right = builder.stream("right", 
Consumed.with(Serdes.String(), Serdes.Integer()));
+
+        left.join(
+            right,
+            (value1, value2) -> value1 + value2,
+            joinWindows,
+            streamJoined);
+
+        final Topology topology = builder.build();
+        final InternalTopologyBuilder internalTopologyBuilder = 
TopologyWrapper.getInternalTopologyBuilder(topology);
+
+        
assertThat(internalTopologyBuilder.stateStores().get("store-this-join-store").loggingEnabled(),
 equalTo(false));
+        
assertThat(internalTopologyBuilder.stateStores().get("store-other-join-store").loggingEnabled(),
 equalTo(false));
+    }
+
+    @Test
+    public void shouldEnableLoggingWithCustomConfigOnStreamJoined() {
+
+        final JoinWindows joinWindows = 
JoinWindows.of(ofMillis(100)).grace(Duration.ofMillis(50));
+        final StreamJoined<String, Integer, Integer> streamJoined = 
StreamJoined.with(Serdes.String(), Serdes.Integer(), 
Serdes.Integer()).withStoreName("store").withLoggingEnabled(Collections.emptyMap());

Review comment:
       ```suggestion
           final StreamJoined<String, Integer, Integer> streamJoined = 
StreamJoined
               .with(Serdes.String(), Serdes.Integer(), Serdes.Integer())
               .withStoreName("store")
               .withLoggingEnabled(Collections.emptyMap());
   ```

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
##########
@@ -152,6 +155,52 @@ private void 
shouldLogAndMeterOnSkippedRecordsWithNullValue(final String builtIn
         }
     }
 
+    @Test
+    public void shouldDisableLoggingOnStreamJoined() {
+
+        final JoinWindows joinWindows = 
JoinWindows.of(ofMillis(100)).grace(Duration.ofMillis(50));
+        final StreamJoined<String, Integer, Integer> streamJoined = 
StreamJoined.with(Serdes.String(), Serdes.Integer(), 
Serdes.Integer()).withStoreName("store").withLoggingDisabled();
+
+        final StreamsBuilder builder = new StreamsBuilder();
+        final KStream<String, Integer> left = builder.stream("left", 
Consumed.with(Serdes.String(), Serdes.Integer()));
+        final KStream<String, Integer> right = builder.stream("right", 
Consumed.with(Serdes.String(), Serdes.Integer()));
+
+        left.join(
+            right,
+            (value1, value2) -> value1 + value2,
+            joinWindows,
+            streamJoined);
+
+        final Topology topology = builder.build();
+        final InternalTopologyBuilder internalTopologyBuilder = 
TopologyWrapper.getInternalTopologyBuilder(topology);
+
+        
assertThat(internalTopologyBuilder.stateStores().get("store-this-join-store").loggingEnabled(),
 equalTo(false));
+        
assertThat(internalTopologyBuilder.stateStores().get("store-other-join-store").loggingEnabled(),
 equalTo(false));
+    }
+
+    @Test
+    public void shouldEnableLoggingWithCustomConfigOnStreamJoined() {
+
+        final JoinWindows joinWindows = 
JoinWindows.of(ofMillis(100)).grace(Duration.ofMillis(50));
+        final StreamJoined<String, Integer, Integer> streamJoined = 
StreamJoined.with(Serdes.String(), Serdes.Integer(), 
Serdes.Integer()).withStoreName("store").withLoggingEnabled(Collections.emptyMap());
+
+        final StreamsBuilder builder = new StreamsBuilder();
+        final KStream<String, Integer> left = builder.stream("left", 
Consumed.with(Serdes.String(), Serdes.Integer()));
+        final KStream<String, Integer> right = builder.stream("right", 
Consumed.with(Serdes.String(), Serdes.Integer()));
+
+        left.join(
+            right,
+            (value1, value2) -> value1 + value2,
+            joinWindows,
+            streamJoined);

Review comment:
       ```suggestion
           left.join(
               right,
               (value1, value2) -> value1 + value2,
               joinWindows,
               streamJoined
           );
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to