Hi
Convert ??
How does that work?
Can you spare a couple of lines for that?

Regards Hans

Op di 8 feb. 2022 om 21:56 schreef Roman Khachatryan <ro...@apache.org>:

> Hi,
>
> setDeserializer() expects KafkaRecordDeserializationSchema;
> JSONKeyValueDeserializationSchema you provided is not compatible with
> it.
> You can convert it using [1]
>
> [1]
>
> https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/connector/kafka/source/reader/deserializer/KafkaRecordDeserializationSchema.html#of-org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema-
>
>
> Regards,
> Roman
>
> On Tue, Feb 8, 2022 at 5:43 PM HG <hanspeter.sl...@gmail.com> wrote:
> >
> > Hi all,
> >
> > When I build this code:
> >
> > KafkaSource<ObjectNode> source = KafkaSource.<ObjectNode>builder()
> >         .setProperties(kafkaProps)
> >         .setProperty("ssl.truststore.type",trustStoreType)
> >         .setProperty("ssl.truststore.password",trustStorePassword)
> >         .setProperty("ssl.truststore.location",trustStoreLocation)
> >         .setProperty("security.protocol",securityProtocol)
> >         .setProperty("partition.discovery.interval.ms",
> partitionDiscoveryIntervalMs)
> >         .setProperty("commit.offsets.on.checkpoint",
> commitOffsetsOnCheckpoint)
> >         .setGroupId(groupId)
> >         .setTopics(kafkaInputTopic)
> >         .setDeserializer(new JSONKeyValueDeserializationSchema(false))
> >
>  
> .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
> >         .build();
> >
> >
> > I get:
> > This error:
> >
> > error: incompatible types:
> org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema
> cannot be converted to
> org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema<org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode>
> >                 .setDeserializer(new
> JSONKeyValueDeserializationSchema(false))
> >
> >
> > What am I doing wrong?
> > As per the documentation JSONKeyValueDeserializationSchema returns an
> ObjectNode.
> >
> > Regards Hans
> >
>

Reply via email to