Thanks for sharing the full solution, much appreciated!

On Thu, 10 Feb 2022 at 09:07, HG <hanspeter.sl...@gmail.com> wrote:

> The complete solution for the record ( that others can benefit from it).
>
> KafkaSource<ObjectNode> source = KafkaSource.<ObjectNode>builder()
>         .setProperties(kafkaProps)
>         .setProperty("ssl.truststore.type",trustStoreType)
>         .setProperty("ssl.truststore.password",trustStorePassword)
>         .setProperty("ssl.truststore.location",trustStoreLocation)
>         .setProperty("security.protocol",securityProtocol)
>         .setProperty("partition.discovery.interval.ms", 
> partitionDiscoveryIntervalMs)
>         .setProperty("commit.offsets.on.checkpoint", 
> commitOffsetsOnCheckpoint)
>         .setGroupId(groupId)
>         .setTopics(kafkaInputTopic)
>         .setDeserializer(KafkaRecordDeserializationSchema.of(new 
> JSONKeyValueDeserializationSchema(fetchMetadata)))
>         
> .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
>         .build();
>
>
> Op wo 9 feb. 2022 om 09:46 schreef HG <hanspeter.sl...@gmail.com>:
>
>> Sorry to have bothered everyone.
>>
>> This is the obvious solution:
>>
>> .setDeserializer(KafkaRecordDeserializationSchema.of(new 
>> JSONKeyValueDeserializationSchema(false)))
>>
>>
>> Regards Hans-Peter
>>
>>
>> Op di 8 feb. 2022 om 21:56 schreef Roman Khachatryan <ro...@apache.org>:
>>
>>> Hi,
>>>
>>> setDeserializer() expects KafkaRecordDeserializationSchema;
>>> JSONKeyValueDeserializationSchema you provided is not compatible with
>>> it.
>>> You can convert it using [1]
>>>
>>> [1]
>>>
>>> https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/connector/kafka/source/reader/deserializer/KafkaRecordDeserializationSchema.html#of-org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema-
>>>
>>>
>>> Regards,
>>> Roman
>>>
>>> On Tue, Feb 8, 2022 at 5:43 PM HG <hanspeter.sl...@gmail.com> wrote:
>>> >
>>> > Hi all,
>>> >
>>> > When I build this code:
>>> >
>>> > KafkaSource<ObjectNode> source = KafkaSource.<ObjectNode>builder()
>>> >         .setProperties(kafkaProps)
>>> >         .setProperty("ssl.truststore.type",trustStoreType)
>>> >         .setProperty("ssl.truststore.password",trustStorePassword)
>>> >         .setProperty("ssl.truststore.location",trustStoreLocation)
>>> >         .setProperty("security.protocol",securityProtocol)
>>> >         .setProperty("partition.discovery.interval.ms",
>>> partitionDiscoveryIntervalMs)
>>> >         .setProperty("commit.offsets.on.checkpoint",
>>> commitOffsetsOnCheckpoint)
>>> >         .setGroupId(groupId)
>>> >         .setTopics(kafkaInputTopic)
>>> >         .setDeserializer(new JSONKeyValueDeserializationSchema(false))
>>> >
>>>  
>>> .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
>>> >         .build();
>>> >
>>> >
>>> > I get:
>>> > This error:
>>> >
>>> > error: incompatible types:
>>> org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema
>>> cannot be converted to
>>> org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema<org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode>
>>> >                 .setDeserializer(new
>>> JSONKeyValueDeserializationSchema(false))
>>> >
>>> >
>>> > What am I doing wrong?
>>> > As per the documentation JSONKeyValueDeserializationSchema returns an
>>> ObjectNode.
>>> >
>>> > Regards Hans
>>> >
>>>
>>

Reply via email to