Hi,

setDeserializer() expects KafkaRecordDeserializationSchema;
JSONKeyValueDeserializationSchema you provided is not compatible with
it.
You can convert it using [1]

[1]
https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/connector/kafka/source/reader/deserializer/KafkaRecordDeserializationSchema.html#of-org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema-


Regards,
Roman

On Tue, Feb 8, 2022 at 5:43 PM HG <hanspeter.sl...@gmail.com> wrote:
>
> Hi all,
>
> When I build this code:
>
> KafkaSource<ObjectNode> source = KafkaSource.<ObjectNode>builder()
>         .setProperties(kafkaProps)
>         .setProperty("ssl.truststore.type",trustStoreType)
>         .setProperty("ssl.truststore.password",trustStorePassword)
>         .setProperty("ssl.truststore.location",trustStoreLocation)
>         .setProperty("security.protocol",securityProtocol)
>         .setProperty("partition.discovery.interval.ms", 
> partitionDiscoveryIntervalMs)
>         .setProperty("commit.offsets.on.checkpoint", 
> commitOffsetsOnCheckpoint)
>         .setGroupId(groupId)
>         .setTopics(kafkaInputTopic)
>         .setDeserializer(new JSONKeyValueDeserializationSchema(false))
>         
> .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
>         .build();
>
>
> I get:
> This error:
>
> error: incompatible types: 
> org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema
>  cannot be converted to 
> org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema<org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode>
>                 .setDeserializer(new JSONKeyValueDeserializationSchema(false))
>
>
> What am I doing wrong?
> As per the documentation JSONKeyValueDeserializationSchema returns an 
> ObjectNode.
>
> Regards Hans
>

Reply via email to