I believe you can solve this iss with,

.setDeserializer(KafkaRecordDeserializationSchema.of(new
JSONKeyValueDeserializationSchema(false)))


On Thu, Mar 3, 2022 at 8:07 AM Kamil ty <kamilt...@gmail.com> wrote:
>
> Hello,
>
> Sorry for the late reply. I have checked the issue and it seems to be a type 
> issue as the exception suggests. What happens is that the 
> JSONKeyValueDeserializationSchema included in flink implements a 
> KafkaDeserializationSchema. The .setDeserializer method expects a 
> Deserialization schema though. The JSONKeyValueDeserializationSchema might 
> have been left from an older flink version.
>
> My recommendation would be to implement your own 
> JSONKeyValueDeserializationSchema that implements a Deserialization schema. 
> You even should be able to copy the implementation from the flink included 
> JSONKeyValueDeserializationSchema and change KafkaDeserializationSchema to 
> DeserializationSchema.
>
> If you will have issues with implementing this, please let me know and I will 
> provide you with the code.
>
> Best regards
> Kamil
>
> On Mon, 7 Feb 2022, 15:15 HG, <hanspeter.sl...@gmail.com> wrote:
>>
>> Hello Kamil et all,
>>
>> When I build this code:
>>
>> KafkaSource<ObjectNode> source = KafkaSource.<ObjectNode>builder()
>>         .setProperties(kafkaProps)
>>         .setProperty("ssl.truststore.type",trustStoreType)
>>         .setProperty("ssl.truststore.password",trustStorePassword)
>>         .setProperty("ssl.truststore.location",trustStoreLocation)
>>         .setProperty("security.protocol",securityProtocol)
>>         .setProperty("partition.discovery.interval.ms", 
>> partitionDiscoveryIntervalMs)
>>         .setProperty("commit.offsets.on.checkpoint", 
>> commitOffsetsOnCheckpoint)
>>         .setGroupId(groupId)
>>         .setTopics(kafkaInputTopic)
>>         .setDeserializer(new JSONKeyValueDeserializationSchema(false))
>>         
>> .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
>>         .build();
>>
>>
>> I get:
>> This error:
>>
>> error: incompatible types: 
>> org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema
>>  cannot be converted to 
>> org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema<org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode>
>>                 .setDeserializer(new 
>> JSONKeyValueDeserializationSchema(false))
>>
>>
>> What am I doing wrong?
>> As per the documentation JSONKeyValueDeserializationSchema returns an 
>> ObjectNode.
>>
>> Regards Hans-Peter
>>
>>
>>
>> Op vr 14 jan. 2022 om 20:32 schreef Kamil ty <kamilt...@gmail.com>:
>>>
>>> Hello Hans,
>>>
>>> As far as I know the JSONKeyValueDeserializationSchema returns a Jackson 
>>> ObjectNode. Below I have included an example based on Flink stable 
>>> documentation.
>>>
>>> KafkaSource<ObjectNode> source = KafkaSource.<ObjectNode>builder()
>>> .setBootstrapServers(brokers)
>>> .setTopics("input-topic")
>>> .setGroupId("my-group")
>>> .setStartingOffsets(OffsetsInitializer.earliest())
>>> .setDeserializer(new JSONKeyValueDeserializationSchema(false))
>>> .build();
>>>
>>> DataStream<ObjectNode> ds = env.fromSource(source, 
>>> WatermarkStrategy.noWatermarks(), "Kafka Source");
>>>                 // Below we access the JSON field stored in the ObjectNode.
>>> DataStream<String> processedDs = ds.map(record -> 
>>> record.get("value").get("my-field").asText());
>>>
>>> It is also possible to implement your own deserialization schema that for 
>>> eg. could turn each record into a POJO. You can do this by implementing the 
>>> KafkaDeserializationSchema (Flink : 1.14-SNAPSHOT API) (apache.org). If you 
>>> are only interested in the value of the Kafka record, you can also extend 
>>> the AbstractDeserializationSchema (Flink : 1.14-SNAPSHOT API) (apache.org) 
>>> and use .setValueOnlyDeserializer(new CustomDeserializer()). There is also 
>>> a different API that you could use for this which is specified here: 
>>> KafkaSourceBuilder (Flink : 1.14-SNAPSHOT API) (apache.org). Although the 
>>> customDeserializer will be the same for older Flink versions, the Kafka 
>>> Source has appeared recently, to learn about the previous kafka source 
>>> (FlinkKafkaConsumer) see: Kafka | Apache Flink.
>>>
>>> Best Regards
>>> Kamil
>>>
>>> On Fri, 14 Jan 2022 at 18:37, HG <hanspeter.sl...@gmail.com> wrote:
>>>>
>>>> Hi,
>>>>
>>>> Before starting programming myself I'd like to know whether there are good 
>>>> examples with deserialization of JSON that I can borrow.
>>>> The structure of the JSON is nested with multiple levels.
>>>>
>>>> Any references?
>>>>
>>>> 'better well stolen than badly invented myself' we'd say in Dutch😁
>>>>
>>>> Regards Hans



-- 
Cheers,
Aeden

GitHub: https://github.com/aedenj
Linked In: http://www.linkedin.com/in/aedenjameson
Blah Blah Blah: http://www.twitter.com/daliful

Reply via email to