Hi,

I'm trying to do a join between a stream and a KTable with a grace period,
which requires a versioned table. However even if I specify a serializer
everywhere, it seems this doesn't quite make it through to
the RocksDBTimeOrderedKeyValueBuffer and I get an error when
building/starting the topology.

I've put a minimal reproduction below (against kafka-streams 3.7.0, please
excuse the Scala). This fails with:
org.apache.kafka.streams.errors.StreamsException: failed to initialize
processor KSTREAM-LEFTJOIN-0000000003
 at
org.apache.kafka.streams.processor.internals.ProcessorNode.init(ProcessorNode.java:110)
[...]
Caused by: org.apache.kafka.common.config.ConfigException: Please specify a
key serde or set one through StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG
at
org.apache.kafka.streams.StreamsConfig.defaultKeySerde(StreamsConfig.java:1857)
at
org.apache.kafka.streams.processor.internals.AbstractProcessorContext.keySerde(AbstractProcessorContext.java:92)
at
org.apache.kafka.streams.processor.internals.SerdeGetter.keySerde(SerdeGetter.java:47)
at
org.apache.kafka.streams.state.internals.RocksDBTimeOrderedKeyValueBuffer.setSerdesIfNull(RocksDBTimeOrderedKeyValueBuffer.java:159)
at
org.apache.kafka.streams.kstream.internals.KStreamKTableJoinProcessor.init(KStreamKTableJoinProcessor.java:86)
at
org.apache.kafka.streams.processor.internals.ProcessorNode.init(ProcessorNode.java:102)

Am I missing some other place where the serde should be specified, or is
this a bug? (In the real application I need to do this multiple times for
different types, so setting the default serde class would be difficult)

Many thanks,
Mickey

Code:

import org.apache.kafka.streams.kstream._
import org.apache.kafka.streams.scala.serialization.Serdes
import org.apache.kafka.streams.state.Stores
import org.apache.kafka.streams.{StreamsBuilder, StreamsConfig,
TopologyTestDriver}

import java.time.Duration
import java.util.Properties

val builder = new StreamsBuilder
val table   = builder.table(
  "table",
  Materialized
    .as[String,
String](Stores.persistentVersionedKeyValueStore("table-store",
Duration.ofMinutes(20)))
    .withKeySerde(Serdes.stringSerde)
    .withValueSerde(Serdes.stringSerde)
)
val stream  = builder.stream("stream", Consumed.`with`(Serdes.stringSerde,
Serdes.stringSerde))
stream
  .leftJoin(
    table,
    (_ + _): ValueJoiner[String, String, String],
    Joined
      .`with`(Serdes.stringSerde, Serdes.stringSerde, Serdes.stringSerde)
      .withGracePeriod(Duration.ofMinutes(10))
  )
  .to("output", Produced.`with`(Serdes.stringSerde, Serdes.stringSerde))

val topology   = builder.build()
val props      = new Properties()
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "test")
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234")
val testDriver = new TopologyTestDriver(topology, props)

Reply via email to