[ 
https://issues.apache.org/jira/browse/KAFKA-6728?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16423455#comment-16423455
 ] 

ASF GitHub Bot commented on KAFKA-6728:
---------------------------------------

tedyu closed pull request #4800: KAFKA-6728 Kafka Connect Header Null Pointer 
Exception
URL: https://github.com/apache/kafka/pull/4800
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
index 2ba785c4668..fe103af4a07 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
@@ -498,6 +498,7 @@ private Headers convertHeadersFor(ConsumerRecord<byte[], 
byte[]> record) {
         if (recordHeaders != null) {
             String topic = record.topic();
             for (org.apache.kafka.common.header.Header recordHeader : 
recordHeaders) {
+                if (recordHeader == null) continue;
                 SchemaAndValue schemaAndValue = 
headerConverter.toConnectHeader(topic, recordHeader.key(), 
recordHeader.value());
                 result.add(recordHeader.key(), schemaAndValue);
             }
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
index 6ef5ae3201c..fb712c7aeda 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
@@ -292,6 +292,7 @@ private RecordHeaders convertHeaderFor(SourceRecord record) 
{
         if (headers != null) {
             String topic = record.topic();
             for (Header header : headers) {
+                if (header == null) continue;
                 String key = header.key();
                 byte[] rawHeader = headerConverter.fromConnectHeader(topic, 
key, header.schema(), header.value());
                 result.add(key, rawHeader);


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Kafka Connect Header Null Pointer Exception
> -------------------------------------------
>
>                 Key: KAFKA-6728
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6728
>             Project: Kafka
>          Issue Type: Bug
>          Components: KafkaConnect
>    Affects Versions: 1.1.0
>         Environment: Linux Mint
>            Reporter: Philippe Hong
>            Priority: Critical
>
> I am trying to use the newly released Kafka Connect that supports headers by 
> using the standalone connector to write to a text file (so in this case I am 
> only using the sink component)
> I am sadly greeted by a NullPointerException :
> {noformat}
> ERROR WorkerSinkTask{id=local-file-sink-0} Task threw an uncaught and 
> unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:172)
> java.lang.NullPointerException
>     at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.convertHeadersFor(WorkerSinkTask.java:501)
>     at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:469)
>     at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:301)
>     at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:205)
>     at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:173)
>     at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)
>     at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)
>     at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>     at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>     at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>     at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>     at java.lang.Thread.run(Thread.java:748)
> {noformat}
> I launched zookeeper and kafka 1.1.0 locally and sent a 
> ProducerRecord[String, Array[Byte]] using a KafkaProducer[String, 
> Array[Byte]] with a header that has a key and value.
> I can read the record with a console consumer as well as using a 
> KafkaConsumer (where in this case I can see the content of the header of the 
> record I sent previously) so no problem here.
> I only made two changes to the kafka configuration:
>      - I used the StringConverter for the key and the ByteArrayConverter for 
> the value. 
>      - I also changed the topic where the sink would connect to.
> If I forgot something please tell me so as it is the first time I am creating 
> an issue on Jira.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to