Hi,

You can use DeserializationSchema with KafkaSource as well. You can pass it to the KafkaSource.builder():

                    KafkaSource.<...>builder()

                            .setDeserializer(...)

You can also take a look at the StateMachineExample[1], where KafkaSource is used.

BTW, have you tried looking at Table API? It would abstract quite a few things for you, e.g. translation of what I presume is a CSV format[2] in your case.

Best,

Dawid

[1] https://github.com/apache/flink/blob/5846d8d61b4b2aa10f925e9f63885cb7f6686303/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/StateMachineExample.java#L104

[2] https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/formats/csv/

On 03/02/2022 16:56, HG wrote:
Hello

Most examples available still use the FlinkKafkaConsumer unfortunately.
I need to consume events from Kafka.
The format is Long,Timestamp,String,String.

Do I need to create a custom deserializer?

What also confuses me is

KafkaSource*<String>* source = KafkaSource....

How does it relate to the deserializer?
Is there a kind of <Row> type or is <String> fine even if the message is a composite of Long,String...?

Regards Hans

Attachment: OpenPGP_signature
Description: OpenPGP digital signature

Reply via email to