This is an automated email from the ASF dual-hosted git repository.
lucasbru pushed a commit to branch 4.0
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/4.0 by this push:
new 9ef4d5ab867 KAFKA-18917: TransformValues throws NPE (#19089)
9ef4d5ab867 is described below
commit 9ef4d5ab867ca0dc2a9010770ea483e9724a40ba
Author: Lucas Brutschy <[email protected]>
AuthorDate: Tue Mar 4 17:49:47 2025 +0100
KAFKA-18917: TransformValues throws NPE (#19089)
When `transformValues` is used with a `Materialized` instance, but
without a queryable name, a `NullPointerException` is thrown. To
preserve the semantics present in 3.9, we need to avoid materialization
when a queryable name is not present.
Reviewers: Bruno Cadonna <[email protected]>
---
.../streams/kstream/internals/KTableImpl.java | 8 +++++--
.../internals/KTableTransformValuesTest.java | 26 ++++++++++++++++++++++
2 files changed, 32 insertions(+), 2 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
index 06a6043ffd3..8a0e647de93 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
@@ -450,8 +450,12 @@ public class KTableImpl<K, S, V> extends AbstractStream<K,
V> implements KTable<
valueSerde = materializedInternal.valueSerde();
queryableStoreName = materializedInternal.queryableStoreName();
// only materialize if materialized is specified and it has
queryable name
- final StoreFactory storeFactory = queryableStoreName != null ?
(new KeyValueStoreMaterializer<>(materializedInternal)) : null;
- storeBuilder = Collections.singleton(new
FactoryWrappingStoreBuilder<>(storeFactory));
+ if (queryableStoreName != null) {
+ final StoreFactory storeFactory = new
KeyValueStoreMaterializer<>(materializedInternal);
+ storeBuilder = Collections.singleton(new
FactoryWrappingStoreBuilder<>(storeFactory));
+ } else {
+ storeBuilder = null;
+ }
} else {
keySerde = this.keySerde;
valueSerde = null;
diff --git
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableTransformValuesTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableTransformValuesTest.java
index 402a27e461a..d03bf99da93 100644
---
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableTransformValuesTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableTransformValuesTest.java
@@ -436,6 +436,32 @@ public class KTableTransformValuesTest {
new KeyValueTimestamp<>("A", "3", 15))));
}
+ @Test
+ public void
shouldCalculateCorrectOldValuesIfNotStatefulEvenNotMaterializedNoQueryableName()
{
+ builder
+ .table(INPUT_TOPIC, CONSUMED)
+ .transformValues(new StatelessTransformerSupplier(),
+ Materialized.with(Serdes.String(), Serdes.Integer())
+ )
+ .groupBy(toForceSendingOfOldValues(),
Grouped.with(Serdes.String(), Serdes.Integer()))
+ .reduce(MockReducer.INTEGER_ADDER, MockReducer.INTEGER_SUBTRACTOR)
+ .mapValues(mapBackToStrings())
+ .toStream()
+ .process(capture);
+
+ driver = new TopologyTestDriver(builder.build(), props());
+ final TestInputTopic<String, String> inputTopic =
+ driver.createInputTopic(INPUT_TOPIC, new StringSerializer(), new
StringSerializer());
+
+ inputTopic.pipeInput("A", "a", 5L);
+ inputTopic.pipeInput("A", "aa", 15L);
+ inputTopic.pipeInput("A", "aaa", 10);
+
+ assertThat(output(), equalTo(Arrays.asList(new
KeyValueTimestamp<>("A", "1", 5),
+ new KeyValueTimestamp<>("A", "2", 15),
+ new KeyValueTimestamp<>("A", "3", 15))));
+ }
+
private ArrayList<KeyValueTimestamp<String, String>> output() {
return capture.capturedProcessors(1).get(0).processed();
}