Hi Rss,

> why Flink implements different serialization schemes for keyed and non
keyed messages for Kafka?

The non-keyed serialization schema is a basic schema, which works for most
use cases.
For advanced users which need access to the key, offsets, the partition or
topic, there's the keyed ser schema.
But the keyed schema is richer and can completely subsume the simple,
non-keyed one.

> As a result I see that a message which are serialized by
TypeInformationKeyValueSerializationSchema may be deserialized by Flink's
SimpleStringSchema() or by Kafka's StringSerializer only with additional
first symbol.

The TypeInformationKeyValueSerializationSchema is only meant to be used for
Flink <--> Flink communication through Kafka, because it depends on Flink's
internal serializers (it might even depend on the exact ExecutionConfig

> The question, is it correct behavior of Flink? And should I implement own
serializer and partitioner for Flink's Kafka sink if I want to use just
simple String serialization which may be read by all other tools without

The behavior is correct. If the SimpleStringSchema is not sufficient for
the other systems, you need to impl. your own serializer.

> And second question, why Flink requires to implement a custom partitioner
for serialized byte[] stream instead of using of primary objects as in
Kafka's partitioner? Or instead of just allowing to use Kafka's partitioner

If you are not specifying any Flink partitioner, we'll use the configured
Kafka partitioner.
The advantage of using Flink's own partitioner is that you can access
information like the subtaskId and the number of subtasks.


On Sun, Aug 28, 2016 at 6:16 PM, rss rss <rssde...@gmail.com> wrote:

> Hello,
>   why Flink implements different serialization schemes for keyed and non
> keyed messages for Kafka?
>   I'm using two ways of loading of messages to Kafka. First way is on-fly
> loading without Flink by Kafka's means only. In this case I'm using
> something like:
> props.put("partitioner.class", KafkaPartitioner.class.getCanonicalName());
> props.put("value.serializer", 
> "org.apache.kafka.common.serialization.StringSerializer");
> props.put("key.serializer", 
> "org.apache.kafka.common.serialization.StringSerializer");
> producer = new KafkaProducer<>(props);
> String key = event.getUserId();
> String  value = DummyEvent.eventToString(event);
> producer.send(new ProducerRecord<>(topic, key, value));
>  And from Flink side I can read it without a key by code like:
> DataStream<String> dataStream = env
>         .addSource(new FlinkKafkaConsumer08<String>(
>                 "topic",
>                 new *SimpleStringSchema(),* kafkaProps));
> As a result I have pure message without a key. Actually I need a key only
> for partitioning by Kafka and I have an appropriate class
> https://github.com/rssdev10/flink-kafka-streaming/blob/
> master/src/main/java/KafkaPartitioner.java . That is standard java-hash
> for String class.
>   Also I have other case for messages loading from hadoop to Kafka. I'm
> using Flink for this purpose. All is ok when I'm using
> dataStream.addSink(new FlinkKafkaProducer08<>(config.getProperty("topic", 
>         new SimpleStringSchema(),
>         kafkaProps));
> But I need partitioning in Kafka and I changed it to
> TypeInformation<Tuple2<String, String>> stringStringInfo =
>         TypeInfoParser.parse("org.apache.flink.api.java.tuple.Tuple2<String, 
> String>");
> KeyedSerializationSchema<Tuple2<String, String>> schema =
>         new TypeInformationKeyValueSerializationSchema<>(String.class, 
> String.class, env.getConfig());
> dataStream
>         .map(json -> {
>             Event event = gson.fromJson(json, Event.class);
>             return new Tuple2<String, String>(event.getUserId(), json);
>         }).returns(stringStringInfo)
>         .setParallelism(partitions)
>         .addSink(new FlinkKafkaProducer08<>(config.getProperty("topic", 
>                 schema,
>                 kafkaProps));
> As a result I see that a message which are serialized by
> TypeInformationKeyValueSerializationSchema may be deserialized by Flink's
> SimpleStringSchema() or by Kafka's StringSerializer only with additional
> first symbol. I guess this is a size of String which is added by
> org.apache.flink.types.StringValue#writeString. That is the value of a
> message is not more readable by Spark, Storm, Kafka consumer with standard
> deserialization....
>    The question, is it correct behavior of Flink? And should I implement
> own serializer and partitioner for Flink's Kafka sink if I want to use just
> simple String serialization which may be read by all other tools without
> Flink?
>    And second question, why Flink requires to implement a custom
> partitioner for serialized byte[] stream instead of using of primary
> objects as in Kafka's partitioner? Or instead of just allowing to use
> Kafka's partitioner class.
>   PS: I can give a link to sources if you have an access to
> https://github.com/stratosphere/ private repos.
> Thanks,
> best regards

Reply via email to