Hi, I'm trying to do a join between a stream and a KTable with a grace period, which requires a versioned table. However even if I specify a serializer everywhere, it seems this doesn't quite make it through to the RocksDBTimeOrderedKeyValueBuffer and I get an error when building/starting the topology.
I've put a minimal reproduction below (against kafka-streams 3.7.0, please excuse the Scala). This fails with: org.apache.kafka.streams.errors.StreamsException: failed to initialize processor KSTREAM-LEFTJOIN-0000000003 at org.apache.kafka.streams.processor.internals.ProcessorNode.init(ProcessorNode.java:110) [...] Caused by: org.apache.kafka.common.config.ConfigException: Please specify a key serde or set one through StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG at org.apache.kafka.streams.StreamsConfig.defaultKeySerde(StreamsConfig.java:1857) at org.apache.kafka.streams.processor.internals.AbstractProcessorContext.keySerde(AbstractProcessorContext.java:92) at org.apache.kafka.streams.processor.internals.SerdeGetter.keySerde(SerdeGetter.java:47) at org.apache.kafka.streams.state.internals.RocksDBTimeOrderedKeyValueBuffer.setSerdesIfNull(RocksDBTimeOrderedKeyValueBuffer.java:159) at org.apache.kafka.streams.kstream.internals.KStreamKTableJoinProcessor.init(KStreamKTableJoinProcessor.java:86) at org.apache.kafka.streams.processor.internals.ProcessorNode.init(ProcessorNode.java:102) Am I missing some other place where the serde should be specified, or is this a bug? (In the real application I need to do this multiple times for different types, so setting the default serde class would be difficult) Many thanks, Mickey Code: import org.apache.kafka.streams.kstream._ import org.apache.kafka.streams.scala.serialization.Serdes import org.apache.kafka.streams.state.Stores import org.apache.kafka.streams.{StreamsBuilder, StreamsConfig, TopologyTestDriver} import java.time.Duration import java.util.Properties val builder = new StreamsBuilder val table = builder.table( "table", Materialized .as[String, String](Stores.persistentVersionedKeyValueStore("table-store", Duration.ofMinutes(20))) .withKeySerde(Serdes.stringSerde) .withValueSerde(Serdes.stringSerde) ) val stream = builder.stream("stream", Consumed.`with`(Serdes.stringSerde, Serdes.stringSerde)) stream .leftJoin( table, (_ + _): ValueJoiner[String, String, String], Joined .`with`(Serdes.stringSerde, Serdes.stringSerde, Serdes.stringSerde) .withGracePeriod(Duration.ofMinutes(10)) ) .to("output", Produced.`with`(Serdes.stringSerde, Serdes.stringSerde)) val topology = builder.build() val props = new Properties() props.put(StreamsConfig.APPLICATION_ID_CONFIG, "test") props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234") val testDriver = new TopologyTestDriver(topology, props)