[ https://issues.apache.org/jira/browse/FLINK-10039?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17328590#comment-17328590 ]
Flink Jira Bot commented on FLINK-10039: ---------------------------------------- This major issue is unassigned and itself and all of its Sub-Tasks have not been updated for 30 days. So, it has been labeled "stale-major". If this ticket is indeed "major", please either assign yourself or give an update. Afterwards, please remove the label. In 7 days the issue will be deprioritized. > FlinkKafkaProducer - Serializer Error > ------------------------------------- > > Key: FLINK-10039 > URL: https://issues.apache.org/jira/browse/FLINK-10039 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka > Affects Versions: 1.4.2 > Reporter: Akshay Nagpal > Priority: Major > Labels: stale-major > > I am working on a use case where I input the data using Kafka's console > producer, read the same data in my program using FlinkKafkaConsumer and write > it back to another Kafka topic using FlinkKafkaProducer. > I am using 1.4.2 version of the following dependencies: > flink-java > flink-streaming-java_2.11 > flink-connector-kafka-0.10_2.11 > > The codes are as follows: > KafkaConsoleProducer: > {code:java} > ./bin/kafka-console-producer --broker-list xxx:9092 --topic test1 --property > "parse.key=true" --property "key.separator=:" --key-serializer > org.apache.kafka.common.serialization.StringSerializer --value-serializer > org.apache.kafka.common.serialization.StringSerializer > {code} > KafkaFlinkConsumer: > {code:java} > Properties properties = new Properties(); > properties.setProperty("bootstrap.servers", "xxx:9092"); > properties.setProperty("zookeeper.connect", "xxx:2181"); > properties.setProperty("group.id", "test"); > properties.setProperty("key.deserializer", > "org.apache.kafka.common.serialization.StringDeserializer"); > properties.setProperty("value.deserializer", > "org.apache.kafka.common.serialization.StringDeserializer"); > FlinkKafkaConsumer010<String> myConsumer = new > FlinkKafkaConsumer010<String>("test1", > new SimpleStringSchema(), > properties); > DataStream<String> stream = env.addSource(myConsumer); > {code} > KafkaFlinkProducer: > {code:java} > Properties properties = new Properties(); > properties.setProperty("bootstrap.servers", "xxx:9092"); > properties.setProperty("zookeeper.connect", "xxx:2181"); > properties.setProperty("group.id", "test"); > properties.setProperty("key.serializer", > "org.apache.kafka.common.serialization.StringSerializer"); > properties1.setProperty("value.serializer", > "org.apache.kafka.common.serialization.StringSerializer"); > FlinkKafkaProducer010<String> myProducer = new > FlinkKafkaProducer010<String>("my-topic", > new SimpleStringSchema(), > properties); > stream.addSink(myProducer); > {code} > When I specify key and value serializer as StringSerializer in > FlinkKafkaProducer, it gives me the following error in the logs: > > {code:java} > org.apache.kafka.common.errors.SerializationException: Can't convert value of > class [B to class org.apache.kafka.common.serialization.StringSerializer > specified in value.serializer > {code} > Though it's giving me this error, it's still producing the data in the topic. > When I am using ByteArraySerializer though with the producer, it is not > giving me the error in the logs. It is also giving me the output. > Moreover, DataStream's print method is not printing the data on the console. > -- This message was sent by Atlassian Jira (v8.3.4#803005)