Usage of KafkaDeserializationSchema and KafkaSerializationSchema

2020-01-22 Thread Jason Kania
Hello, I was looking for documentation in 1.9.1 on how to create implementations of the KafkaSerializationSchema and KafkaDeserializationSchema interfaces. I have created implementations in the past for the SerializationSchema and DeserializationSchema interface. Unfortunately, I can find no exa

Re: Usage of KafkaDeserializationSchema and KafkaSerializationSchema

2020-01-22 Thread David Magalhães
Hi Jason, The topic is used in *FlinkKafkaConsumer*, following the *KafkaDeserializationSchema* and then *Properties*. https://ci.apache.org/projects/flink/flink-docs-release-1.9/api/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.html new FlinkKafkaConsumer(kafkaTopic, new M

Re: Usage of KafkaDeserializationSchema and KafkaSerializationSchema

2020-01-22 Thread Jason Kania
Thanks for responding. I am aware where the topic is used. What I do not see is how to set the topic within the class that implements the KafkaSerializationSchema.serialize(  T classObject, Long timestamp ) method. The method must create and return a value of type ProducerRecord, but all the con

Re: Usage of KafkaDeserializationSchema and KafkaSerializationSchema

2020-01-23 Thread Chesnay Schepler
That's a fair question; the interface is indeed weird in this regard and does have some issues. From what I can tell you have 2 options: a) have the user pass the topic to the serialization schema constructor, which in practice would be identical to the topic they pass to the producer. b) Addit

Re: Usage of KafkaDeserializationSchema and KafkaSerializationSchema

2020-01-23 Thread Aljoscha Krettek
Hi, the reason the new schema feels a bit weird is that it implements a new paradigm in a FlinkKafkaProducer that still follows a somewhat older paradigm. In the old paradigm, partitioning and topic where configured on the sink, which made it fixed for all produced records. The new schema all