Hi,

You can use other (de)serialization methods besides ByteBuffer as
well. Endianness is set explicitly to have the same byte order during
serialization and deserialization.

Regards,
Roman

On Fri, Feb 4, 2022 at 2:43 PM Dawid Wysakowicz <dwysakow...@apache.org> wrote:
>
> Hi,
>
> You can use DeserializationSchema with KafkaSource as well. You can pass it 
> to the KafkaSource.builder():
>
>                     KafkaSource.<...>builder()
>
>                             .setDeserializer(...)
>
> You can also take a look at the StateMachineExample[1], where KafkaSource is 
> used.
>
> BTW, have you tried looking at Table API? It would abstract quite a few 
> things for you, e.g. translation of what I presume is a CSV format[2] in your 
> case.
>
> Best,
>
> Dawid
>
> [1] 
> https://github.com/apache/flink/blob/5846d8d61b4b2aa10f925e9f63885cb7f6686303/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/StateMachineExample.java#L104
>
> [2] 
> https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/formats/csv/
>
> On 03/02/2022 16:56, HG wrote:
>
> Hello
>
> Most examples available still use the FlinkKafkaConsumer unfortunately.
> I need to consume events from Kafka.
> The format is Long,Timestamp,String,String.
>
> Do I need to create a custom deserializer?
>
> What also confuses me is
>
> KafkaSource<String> source = KafkaSource....
>
> How does it relate to the deserializer?
> Is there a kind of <Row> type or is <String> fine even if the message is a 
> composite of Long,String...?
>
> Regards Hans
>

Reply via email to