cadonna commented on a change in pull request #11241: URL: https://github.com/apache/kafka/pull/11241#discussion_r695498527
########## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMap.java ########## @@ -41,6 +43,7 @@ public void process(final Record<KIn, VIn> record) { final Iterable<? extends KeyValue<? extends KOut, ? extends VOut>> newKeyValues = mapper.apply(record.key(), record.value()); + Objects.requireNonNull(newKeyValues, String.format("KeyValueMapper can't return null from mapping the record: %s", record)); Review comment: Just a suggestion: ```suggestion Objects.requireNonNull(newKeyValues, "The provided KeyValueMapper returned null which is not allowed."); ``` BTW, we should not output records since they might contain sensitive data. ########## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamMap.java ########## @@ -42,6 +44,7 @@ public KStreamMap(final KeyValueMapper<? super KIn, ? super VIn, ? extends KeyVa public void process(final Record<KIn, VIn> record) { final KeyValue<? extends KOut, ? extends VOut> newPair = mapper.apply(record.key(), record.value()); + Objects.requireNonNull(newPair, String.format("KeyValueMapper can't return null from mapping the record: %s", record)); Review comment: Just a suggestion: ```suggestion Objects.requireNonNull(newPair, "The provided KeyValueMapper returned null which is not allowed."); ``` BTW, we should not output records since they might contain sensitive data. ########## File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapTest.java ########## @@ -68,6 +70,14 @@ public void testMap() { } } + @Test + public void testKeyValueMapperResultNotNull() { + final KStreamMap<String, Integer, String, Integer> supplier = new KStreamMap<>((key, value) -> null); + final Record<String, Integer> record = new Record<>("K", 0, 0L); + final Throwable throwable = assertThrows(NullPointerException.class, () -> supplier.get().process(record)); + assertEquals(throwable.getMessage(), String.format("KeyValueMapper can't return null from mapping the record: %s", record)); Review comment: See my comment above. ########## File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapTest.java ########## @@ -86,4 +88,12 @@ public void testFlatMap() { assertEquals(expected[i], supplier.theCapturedProcessor().processed().get(i)); } } + + @Test + public void testKeyValueMapperResultNotNull() { + final KStreamFlatMap<String, Integer, String, Integer> supplier = new KStreamFlatMap<>((key, value) -> null); + final Record<String, Integer> record = new Record<>("K", 0, 0L); + final Throwable throwable = assertThrows(NullPointerException.class, () -> supplier.get().process(record)); + assertEquals(throwable.getMessage(), String.format("KeyValueMapper can't return null from mapping the record: %s", record)); Review comment: We prefer to use `assertThat()`: ```suggestion assertThat(throwable.getMessage(), is(...); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org