I believe you can solve this iss with, .setDeserializer(KafkaRecordDeserializationSchema.of(new JSONKeyValueDeserializationSchema(false)))
On Thu, Mar 3, 2022 at 8:07 AM Kamil ty <kamilt...@gmail.com> wrote: > > Hello, > > Sorry for the late reply. I have checked the issue and it seems to be a type > issue as the exception suggests. What happens is that the > JSONKeyValueDeserializationSchema included in flink implements a > KafkaDeserializationSchema. The .setDeserializer method expects a > Deserialization schema though. The JSONKeyValueDeserializationSchema might > have been left from an older flink version. > > My recommendation would be to implement your own > JSONKeyValueDeserializationSchema that implements a Deserialization schema. > You even should be able to copy the implementation from the flink included > JSONKeyValueDeserializationSchema and change KafkaDeserializationSchema to > DeserializationSchema. > > If you will have issues with implementing this, please let me know and I will > provide you with the code. > > Best regards > Kamil > > On Mon, 7 Feb 2022, 15:15 HG, <hanspeter.sl...@gmail.com> wrote: >> >> Hello Kamil et all, >> >> When I build this code: >> >> KafkaSource<ObjectNode> source = KafkaSource.<ObjectNode>builder() >> .setProperties(kafkaProps) >> .setProperty("ssl.truststore.type",trustStoreType) >> .setProperty("ssl.truststore.password",trustStorePassword) >> .setProperty("ssl.truststore.location",trustStoreLocation) >> .setProperty("security.protocol",securityProtocol) >> .setProperty("partition.discovery.interval.ms", >> partitionDiscoveryIntervalMs) >> .setProperty("commit.offsets.on.checkpoint", >> commitOffsetsOnCheckpoint) >> .setGroupId(groupId) >> .setTopics(kafkaInputTopic) >> .setDeserializer(new JSONKeyValueDeserializationSchema(false)) >> >> .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST)) >> .build(); >> >> >> I get: >> This error: >> >> error: incompatible types: >> org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema >> cannot be converted to >> org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema<org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode> >> .setDeserializer(new >> JSONKeyValueDeserializationSchema(false)) >> >> >> What am I doing wrong? >> As per the documentation JSONKeyValueDeserializationSchema returns an >> ObjectNode. >> >> Regards Hans-Peter >> >> >> >> Op vr 14 jan. 2022 om 20:32 schreef Kamil ty <kamilt...@gmail.com>: >>> >>> Hello Hans, >>> >>> As far as I know the JSONKeyValueDeserializationSchema returns a Jackson >>> ObjectNode. Below I have included an example based on Flink stable >>> documentation. >>> >>> KafkaSource<ObjectNode> source = KafkaSource.<ObjectNode>builder() >>> .setBootstrapServers(brokers) >>> .setTopics("input-topic") >>> .setGroupId("my-group") >>> .setStartingOffsets(OffsetsInitializer.earliest()) >>> .setDeserializer(new JSONKeyValueDeserializationSchema(false)) >>> .build(); >>> >>> DataStream<ObjectNode> ds = env.fromSource(source, >>> WatermarkStrategy.noWatermarks(), "Kafka Source"); >>> // Below we access the JSON field stored in the ObjectNode. >>> DataStream<String> processedDs = ds.map(record -> >>> record.get("value").get("my-field").asText()); >>> >>> It is also possible to implement your own deserialization schema that for >>> eg. could turn each record into a POJO. You can do this by implementing the >>> KafkaDeserializationSchema (Flink : 1.14-SNAPSHOT API) (apache.org). If you >>> are only interested in the value of the Kafka record, you can also extend >>> the AbstractDeserializationSchema (Flink : 1.14-SNAPSHOT API) (apache.org) >>> and use .setValueOnlyDeserializer(new CustomDeserializer()). There is also >>> a different API that you could use for this which is specified here: >>> KafkaSourceBuilder (Flink : 1.14-SNAPSHOT API) (apache.org). Although the >>> customDeserializer will be the same for older Flink versions, the Kafka >>> Source has appeared recently, to learn about the previous kafka source >>> (FlinkKafkaConsumer) see: Kafka | Apache Flink. >>> >>> Best Regards >>> Kamil >>> >>> On Fri, 14 Jan 2022 at 18:37, HG <hanspeter.sl...@gmail.com> wrote: >>>> >>>> Hi, >>>> >>>> Before starting programming myself I'd like to know whether there are good >>>> examples with deserialization of JSON that I can borrow. >>>> The structure of the JSON is nested with multiple levels. >>>> >>>> Any references? >>>> >>>> 'better well stolen than badly invented myself' we'd say in Dutch😁 >>>> >>>> Regards Hans -- Cheers, Aeden GitHub: https://github.com/aedenj Linked In: http://www.linkedin.com/in/aedenjameson Blah Blah Blah: http://www.twitter.com/daliful