Hi, setDeserializer() expects KafkaRecordDeserializationSchema; JSONKeyValueDeserializationSchema you provided is not compatible with it. You can convert it using [1]
[1] https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/connector/kafka/source/reader/deserializer/KafkaRecordDeserializationSchema.html#of-org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema- Regards, Roman On Tue, Feb 8, 2022 at 5:43 PM HG <hanspeter.sl...@gmail.com> wrote: > > Hi all, > > When I build this code: > > KafkaSource<ObjectNode> source = KafkaSource.<ObjectNode>builder() > .setProperties(kafkaProps) > .setProperty("ssl.truststore.type",trustStoreType) > .setProperty("ssl.truststore.password",trustStorePassword) > .setProperty("ssl.truststore.location",trustStoreLocation) > .setProperty("security.protocol",securityProtocol) > .setProperty("partition.discovery.interval.ms", > partitionDiscoveryIntervalMs) > .setProperty("commit.offsets.on.checkpoint", > commitOffsetsOnCheckpoint) > .setGroupId(groupId) > .setTopics(kafkaInputTopic) > .setDeserializer(new JSONKeyValueDeserializationSchema(false)) > > .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST)) > .build(); > > > I get: > This error: > > error: incompatible types: > org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema > cannot be converted to > org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema<org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode> > .setDeserializer(new JSONKeyValueDeserializationSchema(false)) > > > What am I doing wrong? > As per the documentation JSONKeyValueDeserializationSchema returns an > ObjectNode. > > Regards Hans >