[ 
https://issues.apache.org/jira/browse/BEAM-4038?focusedWorklogId=92039&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-92039
 ]

ASF GitHub Bot logged work on BEAM-4038:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 18/Apr/18 03:25
            Start Date: 18/Apr/18 03:25
    Worklog Time Spent: 10m 
      Work Description: rangadi commented on a change in pull request #5111: 
[BEAM-4038] Support Kafka Headers in KafkaIO
URL: https://github.com/apache/beam/pull/5111#discussion_r182299393
 
 

 ##########
 File path: 
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecord.java
 ##########
 @@ -33,6 +34,7 @@
   private final String topic;
   private final int partition;
   private final long offset;
+  private final Headers headers;
 
 Review comment:
   Thanks for the update. I missed later part of your comment above.
   
   > and I package this class and use it as a dependency in another application 
which simply instantiates this class. Even though the downstream application 
did not call getHeaders, it will still throw a runtime exception.
   
   Is that true? Can you try it with a small simple java class? I tested with a 
small test app and it worked ok . 
   Run it with different versions of kafka, and with '-Dis.present=True|False':
   
   ````
   package org.example;
   
   import org.apache.kafka.clients.consumer.ConsumerRecord;
   import org.apache.kafka.common.header.Headers;
   
   public class TestRuntimeLoading {
   
     public static class A {
   
       Headers kafkaHeaders = null;
   
       Headers getHeaders(boolean isPresent) {
         if (isPresent && kafkaHeaders == null) {
           return new ConsumerRecord<>("test", 0, 0L, "key", "value").headers();
         } else {
           throw new RuntimeException("no headers");
         }
       }
   
     }
   
     public static void main(String[] args) {
   
       boolean isPresent = Boolean.valueOf(System.getProperty("is.present", 
"True"));
       Object headers = new A().getHeaders(isPresent);
       System.out.println("Headers : " + headers.toString());
     }
   }
   ````
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 92039)
    Time Spent: 6h  (was: 5h 50m)

> Support Kafka Headers in KafkaIO
> --------------------------------
>
>                 Key: BEAM-4038
>                 URL: https://issues.apache.org/jira/browse/BEAM-4038
>             Project: Beam
>          Issue Type: New Feature
>          Components: io-java-kafka
>            Reporter: Geet Kumar
>            Assignee: Raghu Angadi
>            Priority: Minor
>          Time Spent: 6h
>  Remaining Estimate: 0h
>
> Headers have been added to Kafka Consumer/Producer records (KAFKA-4208). The 
> purpose of this JIRA is to support this feature in KafkaIO.  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to