Hi Dawid

I am a little bit worried by the code because of the ByteBuffer and the
endianness?
Do I really need to worry about them and determine them too?

Or was it just convenient to use ByteBuffer and the endianness here?

Regards Hans

public Event deserialize(byte[] message) throws IOException {
ByteBuffer buffer = ByteBuffer.wrap(message).order(ByteOrder.LITTLE_ENDIAN);
int address = buffer.getInt(0);
int typeOrdinal = buffer.getInt(4);
return new Event(EventType.values()[typeOrdinal], address);
}





Op vr 4 feb. 2022 om 14:42 schreef Dawid Wysakowicz <dwysakow...@apache.org
>:

> Hi,
>
> You can use DeserializationSchema with KafkaSource as well. You can pass
> it to the KafkaSource.builder():
>
>                     KafkaSource.<...>builder()
>
>                             .setDeserializer(...)
>
> You can also take a look at the StateMachineExample[1], where KafkaSource
> is used.
>
> BTW, have you tried looking at Table API? It would abstract quite a few
> things for you, e.g. translation of what I presume is a CSV format[2] in
> your case.
>
> Best,
>
> Dawid
>
> [1]
> https://github.com/apache/flink/blob/5846d8d61b4b2aa10f925e9f63885cb7f6686303/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/StateMachineExample.java#L104
>
> [2]
> https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/formats/csv/
> On 03/02/2022 16:56, HG wrote:
>
> Hello
>
> Most examples available still use the FlinkKafkaConsumer unfortunately.
> I need to consume events from Kafka.
> The format is Long,Timestamp,String,String.
>
> Do I need to create a custom deserializer?
>
> What also confuses me is
>
> KafkaSource*<String>* source = KafkaSource....
>
> How does it relate to the deserializer?
> Is there a kind of <Row> type or is <String> fine even if the message is a
> composite of Long,String...?
>
> Regards Hans
>
>

Reply via email to