[ 
https://issues.apache.org/jira/browse/BEAM-2257?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Raghu Angadi updated BEAM-2257:
-------------------------------
    Fix Version/s:     (was: 2.2.0)

> KafkaIO write without key requires a producer fn
> ------------------------------------------------
>
>                 Key: BEAM-2257
>                 URL: https://issues.apache.org/jira/browse/BEAM-2257
>             Project: Beam
>          Issue Type: Bug
>          Components: sdk-java-extensions
>            Reporter: Jean-Baptiste Onofré
>            Assignee: Jean-Baptiste Onofré
>
> The {{KafkaIO}} javadoc says that it's possible to write directly {{String}} 
> to the topic without key:
> {code}
>    PCollection<String> strings = ...;
>    strings.apply(KafkaIO.<Void, String>write()
>        .withBootstrapServers("broker_1:9092,broker_2:9092")
>        .withTopic("results")
>        .withValueSerializer(new StringSerializer()) // just need serializer 
> for value
>        .values()
>      );
> {code}
> This is not fully correct:
> 1. {{withValueSerializer()}} requires a class of serializer, not an instance. 
> So, it should be {{withValueSerializer(StringSerializer.class)}}.
> 2. As the key serializer is not provider, a kafka producer fn is required, 
> else, the user will get:
> {code}
> Caused by: org.apache.kafka.common.KafkaException: Failed to construct kafka 
> producer
>       at 
> org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:321)
>       at 
> org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:156)
>       at 
> org.apache.beam.sdk.io.kafka.KafkaIO$KafkaWriter.setup(KafkaIO.java:1494)
> Caused by: java.lang.NullPointerException
>       at 
> org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:300)
>       at 
> org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:156)
>       at 
> org.apache.beam.sdk.io.kafka.KafkaIO$KafkaWriter.setup(KafkaIO.java:1494)
> {code}
> A possible workaround is to create a {{VoidSerializer}} and pass it via 
> {{withKeySerializer()}} or provide the producer fn.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to