Florens Pauwels created KAFKA-19602:
---------------------------------------
Summary: Kafka Streams join after unmaterialized transformValues
on KTable with extra store fails
Key: KAFKA-19602
URL: https://issues.apache.org/jira/browse/KAFKA-19602
Project: Kafka
Issue Type: Bug
Affects Versions: 3.9.1, 3.6.1
Reporter: Florens Pauwels
I believe for this to occur you need
# transformValues on a KTable, followed by a KTable join or leftJoin
# The transformValues is not materialized (no store name given)
# The transformValues accesses at least extra store
Tested on 3.6.1 and 3.9.1
Example code:
{code:java}
@Component
class TestCase {
private static final StoreBuilder<TimestampedKeyValueStore<String, String>>
TRANSFORMER_STORE =
Stores.timestampedKeyValueStoreBuilder(
Stores.persistentTimestampedKeyValueStore("transformer-store"),
Serdes.String(),
Serdes.String()
);
private final StreamsBuilder streamsBuilder;
TestCase(StreamsBuilder streamsBuilder) {
this.streamsBuilder = streamsBuilder;
}
@PostConstruct
void configure() {
streamsBuilder.addStateStore(TRANSFORMER_STORE);
var aggregateTable = streamsBuilder
.stream("input", Consumed.with(Serdes.String(),
Serdes.String()).withName("input-to-stream"))
.toTable(Named.as("to-table"),
MaterializedAs.keyValue("aggregate-store",
Serdes.String(), Serdes.String()))
.transformValues(MyTransformer::new,
Materialized.with(Serdes.String(), Serdes.String()),
Named.as("my-transformer"), TRANSFORMER_STORE.name());
aggregateTable
.join(aggregateTable,
(value, _) -> value,
Named.as("after-transformer"),
Materialized.<String, String, KeyValueStore<Bytes,
byte[]>>as("after-transformer-store")
.withKeySerde(Serdes.String())
.withValueSerde(Serdes.String()))
.toStream(Named.as("aggregate-to-stream"))
.to("output", Produced.with(Serdes.String(),
Serdes.String()).withName("output-to-topic"));
System.out.println(streamsBuilder.build().describe().toString());
}
private static class MyTransformer implements
ValueTransformerWithKey<String, String , String> {
@Override
public void init(ProcessorContext context) {
context.getStateStore(TRANSFORMER_STORE.name());
}
@Override
public String transform(String readOnlyKey, String value) {
return value;
}
@Override
public void close() {
}
}
}
{code}
Result of the above code:
{noformat}
org.apache.kafka.streams.errors.StreamsException: failed to initialize
processor after-transformer-join-this
at
org.apache.kafka.streams.processor.internals.ProcessorNode.init(ProcessorNode.java:131)
~[kafka-streams-3.9.1.jar:na]
at
org.apache.kafka.streams.processor.internals.ProcessorNode.init(ProcessorNode.java:140)
~[kafka-streams-3.9.1.jar:na]
at
org.apache.kafka.streams.processor.internals.StreamTask.initializeTopology(StreamTask.java:1089)
~[kafka-streams-3.9.1.jar:na]
at
org.apache.kafka.streams.processor.internals.StreamTask.completeRestoration(StreamTask.java:295)
~[kafka-streams-3.9.1.jar:na]
at
org.apache.kafka.streams.processor.internals.TaskManager.transitRestoredTaskToRunning(TaskManager.java:980)
~[kafka-streams-3.9.1.jar:na]
at
org.apache.kafka.streams.processor.internals.TaskManager.handleRestoredTasksFromStateUpdater(TaskManager.java:1055)
~[kafka-streams-3.9.1.jar:na]
at
org.apache.kafka.streams.processor.internals.TaskManager.checkStateUpdater(TaskManager.java:920)
~[kafka-streams-3.9.1.jar:na]
at
org.apache.kafka.streams.processor.internals.StreamThread.checkStateUpdater(StreamThread.java:1191)
~[kafka-streams-3.9.1.jar:na]
at
org.apache.kafka.streams.processor.internals.StreamThread.runOnceWithoutProcessingThreads(StreamThread.java:999)
~[kafka-streams-3.9.1.jar:na]
at
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:713)
~[kafka-streams-3.9.1.jar:na]
at
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:672)
~[kafka-streams-3.9.1.jar:na]
Caused by: org.apache.kafka.streams.errors.StreamsException: Processor
after-transformer-join-this has no access to StateStore transformer-store as
the store is not connected to the processor. If you add stores manually via
'.addStateStore()' make sure to connect the added store to the processor by
providing the processor name to '.addStateStore()' or connect them via
'.connectProcessorAndStateStores()'. DSL users need to provide the store name
to '.process()', '.transform()', or '.transformValues()' to connect the store
to the corresponding operator, or they can provide a StoreBuilder by
implementing the stores() method on the Supplier itself. If you do not add
stores manually, please file a bug report at
https://issues.apache.org/jira/projects/KAFKA.
at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.getStateStore(ProcessorContextImpl.java:174)
~[kafka-streams-3.9.1.jar:na]
at
org.apache.kafka.streams.processor.internals.ForwardingDisabledProcessorContext.getStateStore(ForwardingDisabledProcessorContext.java:90)
~[kafka-streams-3.9.1.jar:na]
at
be.florens.kafkaspringtest.selfjoin.TestCase$MyTransformer.init(TestCase.java:63)
~[main/:na]
at
org.apache.kafka.streams.kstream.internals.KTableTransformValues$KTableTransformValuesGetter.init(KTableTransformValues.java:156)
~[kafka-streams-3.9.1.jar:na]
at
org.apache.kafka.streams.kstream.internals.KTableKTableInnerJoin$KTableKTableJoinProcessor.init(KTableKTableInnerJoin.java:83)
~[kafka-streams-3.9.1.jar:na]
at
org.apache.kafka.streams.processor.internals.ProcessorNode.init(ProcessorNode.java:123)
~[kafka-streams-3.9.1.jar:na]
... 10 common frames omitted{noformat}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)