cadonna commented on a change in pull request #11241:
URL: https://github.com/apache/kafka/pull/11241#discussion_r695498527



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMap.java
##########
@@ -41,6 +43,7 @@
         public void process(final Record<KIn, VIn> record) {
             final Iterable<? extends KeyValue<? extends KOut, ? extends VOut>> 
newKeyValues =
                 mapper.apply(record.key(), record.value());
+            Objects.requireNonNull(newKeyValues, String.format("KeyValueMapper 
can't return null from mapping the record: %s", record));

Review comment:
       Just a suggestion:
   ```suggestion
               Objects.requireNonNull(newKeyValues, "The provided 
KeyValueMapper returned null which is not allowed.");
   ```
   BTW, we should not output records since they might contain sensitive data.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamMap.java
##########
@@ -42,6 +44,7 @@ public KStreamMap(final KeyValueMapper<? super KIn, ? super 
VIn, ? extends KeyVa
         public void process(final Record<KIn, VIn> record) {
             final KeyValue<? extends KOut, ? extends VOut> newPair =
                 mapper.apply(record.key(), record.value());
+            Objects.requireNonNull(newPair, String.format("KeyValueMapper 
can't return null from mapping the record: %s", record));

Review comment:
       Just a suggestion:
   ```suggestion
               Objects.requireNonNull(newPair, "The provided KeyValueMapper 
returned null which is not allowed.");
   ```
   
   BTW, we should not output records since they might contain sensitive data.

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapTest.java
##########
@@ -68,6 +70,14 @@ public void testMap() {
         }
     }
 
+    @Test
+    public void testKeyValueMapperResultNotNull() {
+        final KStreamMap<String, Integer, String, Integer> supplier = new 
KStreamMap<>((key, value) -> null);
+        final Record<String, Integer> record = new Record<>("K", 0, 0L);
+        final Throwable throwable = assertThrows(NullPointerException.class, 
() -> supplier.get().process(record));
+        assertEquals(throwable.getMessage(), String.format("KeyValueMapper 
can't return null from mapping the record: %s", record));

Review comment:
       See my comment above.

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapTest.java
##########
@@ -86,4 +88,12 @@ public void testFlatMap() {
             assertEquals(expected[i], 
supplier.theCapturedProcessor().processed().get(i));
         }
     }
+
+    @Test
+    public void testKeyValueMapperResultNotNull() {
+        final KStreamFlatMap<String, Integer, String, Integer> supplier = new 
KStreamFlatMap<>((key, value) -> null);
+        final Record<String, Integer> record = new Record<>("K", 0, 0L);
+        final Throwable throwable = assertThrows(NullPointerException.class, 
() -> supplier.get().process(record));
+        assertEquals(throwable.getMessage(), String.format("KeyValueMapper 
can't return null from mapping the record: %s", record));

Review comment:
       We prefer to use `assertThat()`:
   
   ```suggestion
           assertThat(throwable.getMessage(), is(...);
   ```
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to