[jira] [Commented] (FLINK-8354) Flink Kafka connector ignores Kafka message headers

2018-10-22 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8354?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16659449#comment-16659449
 ] 

ASF GitHub Bot commented on FLINK-8354:
---

alexeyt820 commented on issue #6615: [FLINK-8354] [flink-connectors] Add 
ability to access and provider Kafka headers
URL: https://github.com/apache/flink/pull/6615#issuecomment-431860212
 
 
   Rebased on top of #6703 and implemented KeyedDeserializationSchema.Record 
for Kafka 2.0 (had to keep KeyedDeserializationSchema.Record because 
KeyedDeserializationSchema is used by legacy connectors as well.
   I will create PR for 
[FLINK-8500](https://issues.apache.org/jira/browse/FLINK-8500)(Timestamp)  on 
top of this PR.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Flink Kafka connector ignores Kafka message  headers 
> -
>
> Key: FLINK-8354
> URL: https://issues.apache.org/jira/browse/FLINK-8354
> Project: Flink
>  Issue Type: Sub-task
>  Components: Kafka Connector
> Environment: Kafka 0.11.0.0
> Flink 1.4.0
> flink-connector-kafka-0.11_2.11 
>Reporter: Mohammad Abareghi
>Assignee: Aegeaner
>Priority: Major
>  Labels: pull-request-available
>
> Kafka has introduced notion of Header for messages in version 0.11.0.0  
> https://issues.apache.org/jira/browse/KAFKA-4208.
> But flink-connector-kafka-0.11_2.11 which supports kafka 0.11.0.0 ignores 
> headers when consuming kafka messages. 
> It would be useful in some scenarios, such as distributed log tracing, to 
> support message headers to FlinkKafkaConsumer011 and FlinkKafkaProducer011. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8354) Flink Kafka connector ignores Kafka message headers

2018-10-22 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8354?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16659472#comment-16659472
 ] 

ASF GitHub Bot commented on FLINK-8354:
---

alexeyt820 edited a comment on issue #6615: [FLINK-8354] [flink-connectors] Add 
ability to access and provider Kafka headers
URL: https://github.com/apache/flink/pull/6615#issuecomment-431860212
 
 
   Rebased on top of #6703 and implemented KeyedDeserializationSchema.Record 
for Kafka 2.0 (had to keep KeyedDeserializationSchema.Record because 
KeyedDeserializationSchema is used by legacy connectors as well.
   I will create PR for 
[FLINK-8500](https://issues.apache.org/jira/browse/FLINK-8500)(Timestamp)  on 
top of this PR.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Flink Kafka connector ignores Kafka message  headers 
> -
>
> Key: FLINK-8354
> URL: https://issues.apache.org/jira/browse/FLINK-8354
> Project: Flink
>  Issue Type: Sub-task
>  Components: Kafka Connector
> Environment: Kafka 0.11.0.0
> Flink 1.4.0
> flink-connector-kafka-0.11_2.11 
>Reporter: Mohammad Abareghi
>Assignee: Aegeaner
>Priority: Major
>  Labels: pull-request-available
>
> Kafka has introduced notion of Header for messages in version 0.11.0.0  
> https://issues.apache.org/jira/browse/KAFKA-4208.
> But flink-connector-kafka-0.11_2.11 which supports kafka 0.11.0.0 ignores 
> headers when consuming kafka messages. 
> It would be useful in some scenarios, such as distributed log tracing, to 
> support message headers to FlinkKafkaConsumer011 and FlinkKafkaProducer011. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8354) Flink Kafka connector ignores Kafka message headers

2018-10-24 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8354?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16662923#comment-16662923
 ] 

ASF GitHub Bot commented on FLINK-8354:
---

alexeyt820 commented on issue #6615: [FLINK-8354] [flink-connectors] Add 
ability to access and provider Kafka headers
URL: https://github.com/apache/flink/pull/6615#issuecomment-432846236
 
 
   @yanghua, @aljoscha is anything else need to be done with this PR ?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Flink Kafka connector ignores Kafka message  headers 
> -
>
> Key: FLINK-8354
> URL: https://issues.apache.org/jira/browse/FLINK-8354
> Project: Flink
>  Issue Type: Sub-task
>  Components: Kafka Connector
> Environment: Kafka 0.11.0.0
> Flink 1.4.0
> flink-connector-kafka-0.11_2.11 
>Reporter: Mohammad Abareghi
>Assignee: Aegeaner
>Priority: Major
>  Labels: pull-request-available
>
> Kafka has introduced notion of Header for messages in version 0.11.0.0  
> https://issues.apache.org/jira/browse/KAFKA-4208.
> But flink-connector-kafka-0.11_2.11 which supports kafka 0.11.0.0 ignores 
> headers when consuming kafka messages. 
> It would be useful in some scenarios, such as distributed log tracing, to 
> support message headers to FlinkKafkaConsumer011 and FlinkKafkaProducer011. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8354) Flink Kafka connector ignores Kafka message headers

2018-08-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8354?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16593020#comment-16593020
 ] 

ASF GitHub Bot commented on FLINK-8354:
---

alexeyt820 opened a new pull request #6615: [FLINK-8354] [flink-connectors] Add 
ability to access and provider Kafka headers
URL: https://github.com/apache/flink/pull/6615
 
 
   
   ## What is the purpose of the change
   This changes allows to access Kafka headers in KeyedDeserializationSchema, 
and to provide Kafka headers via KeyedSerializationSchema.headers method
   
   ## Brief change log
   - Add default _headers_ method to KeyedSerializationSchema interface
   - Add default deserialize method with _headers_ argument to 
KeyedDeserializationSchema
   - Add Kafka011Fetcher to get headers from Kafka ConsumerRecord and provide 
them as parameter KeyedDeserializationSchema
   - Modify FlinkKafkaProducer011 to request _headers_ from 
KeyedSerializationSchema and add them to ProducerRecord.
   
   ## Verifying this change
   
   This change added tests and can be verified as follows:
 - *Added integration test *Kafka011ITCase.testHeaders* to verify that we 
can produce headers and consume them
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (yes)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes)
 - If yes, how is the feature documented? (JavaDocs)


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Flink Kafka connector ignores Kafka message  headers 
> -
>
> Key: FLINK-8354
> URL: https://issues.apache.org/jira/browse/FLINK-8354
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
> Environment: Kafka 0.11.0.0
> Flink 1.4.0
> flink-connector-kafka-0.11_2.11 
>Reporter: Mohammad Abareghi
>Assignee: Aegeaner
>Priority: Major
>  Labels: pull-request-available
>
> Kafka has introduced notion of Header for messages in version 0.11.0.0  
> https://issues.apache.org/jira/browse/KAFKA-4208.
> But flink-connector-kafka-0.11_2.11 which supports kafka 0.11.0.0 ignores 
> headers when consuming kafka messages. 
> It would be useful in some scenarios, such as distributed log tracing, to 
> support message headers to FlinkKafkaConsumer011 and FlinkKafkaProducer011. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8354) Flink Kafka connector ignores Kafka message headers

2018-08-27 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8354?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16593905#comment-16593905
 ] 

ASF GitHub Bot commented on FLINK-8354:
---

yanghua commented on a change in pull request #6615: [FLINK-8354] 
[flink-connectors] Add ability to access and provider Kafka headers
URL: https://github.com/apache/flink/pull/6615#discussion_r213034632
 
 

 ##
 File path: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchema.java
 ##
 @@ -54,4 +55,20 @@
 * @return True, if the element signals end of stream, false otherwise.
 */
boolean isEndOfStream(T nextElement);
+
+   /**
+* Deserializes the byte message.
+*
+* @param messageKey the key as a byte array (null if no key has been 
set).
+* @param message The message, as a byte array (null if the message was 
empty or deleted).
 
 Review comment:
   missed topic parameter description.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Flink Kafka connector ignores Kafka message  headers 
> -
>
> Key: FLINK-8354
> URL: https://issues.apache.org/jira/browse/FLINK-8354
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
> Environment: Kafka 0.11.0.0
> Flink 1.4.0
> flink-connector-kafka-0.11_2.11 
>Reporter: Mohammad Abareghi
>Assignee: Aegeaner
>Priority: Major
>  Labels: pull-request-available
>
> Kafka has introduced notion of Header for messages in version 0.11.0.0  
> https://issues.apache.org/jira/browse/KAFKA-4208.
> But flink-connector-kafka-0.11_2.11 which supports kafka 0.11.0.0 ignores 
> headers when consuming kafka messages. 
> It would be useful in some scenarios, such as distributed log tracing, to 
> support message headers to FlinkKafkaConsumer011 and FlinkKafkaProducer011. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8354) Flink Kafka connector ignores Kafka message headers

2018-08-27 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8354?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16593903#comment-16593903
 ] 

ASF GitHub Bot commented on FLINK-8354:
---

yanghua commented on a change in pull request #6615: [FLINK-8354] 
[flink-connectors] Add ability to access and provider Kafka headers
URL: https://github.com/apache/flink/pull/6615#discussion_r213034601
 
 

 ##
 File path: 
flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
 ##
 @@ -206,6 +208,9 @@ protected KafkaConsumerCallBridge createCallBridge() {
return new KafkaConsumerCallBridge();
}
 
+   protected Iterable> 
headersOf(ConsumerRecord record) {
+   return Collections.emptyList();
+   }
 
 Review comment:
   insert a new line after this looks better


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Flink Kafka connector ignores Kafka message  headers 
> -
>
> Key: FLINK-8354
> URL: https://issues.apache.org/jira/browse/FLINK-8354
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
> Environment: Kafka 0.11.0.0
> Flink 1.4.0
> flink-connector-kafka-0.11_2.11 
>Reporter: Mohammad Abareghi
>Assignee: Aegeaner
>Priority: Major
>  Labels: pull-request-available
>
> Kafka has introduced notion of Header for messages in version 0.11.0.0  
> https://issues.apache.org/jira/browse/KAFKA-4208.
> But flink-connector-kafka-0.11_2.11 which supports kafka 0.11.0.0 ignores 
> headers when consuming kafka messages. 
> It would be useful in some scenarios, such as distributed log tracing, to 
> support message headers to FlinkKafkaConsumer011 and FlinkKafkaProducer011. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8354) Flink Kafka connector ignores Kafka message headers

2018-08-27 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8354?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16593904#comment-16593904
 ] 

ASF GitHub Bot commented on FLINK-8354:
---

yanghua commented on a change in pull request #6615: [FLINK-8354] 
[flink-connectors] Add ability to access and provider Kafka headers
URL: https://github.com/apache/flink/pull/6615#discussion_r213034555
 
 

 ##
 File path: 
flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011ITCase.java
 ##
 @@ -350,4 +363,201 @@ public boolean isEndOfStream(Long nextElement) {
}
}
 
+   /**
+* Kafka 0.11 specific test, ensuring Kafka Headers are properly 
written to and read from Kafka.
+*/
+   @Test(timeout = 6)
+   public void testHeaders() throws Exception {
+   final String topic = "headers-topic";
+   final long testSequenceLength = 127L;
+   createTestTopic(topic, 3, 1);
+   StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+   env.setParallelism(1);
+   
env.getConfig().setRestartStrategy(RestartStrategies.noRestart());
+   env.getConfig().disableSysoutLogging();
+   env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+
+   DataStream testSequence = env.addSource(new 
SourceFunction() {
+   private static final long serialVersionUID = 1L;
+   boolean running = true;
+
+   @Override
+   public void run(SourceContext ctx) throws 
Exception {
+   long i = 0;
+   while (running) {
+   ctx.collectWithTimestamp(i, i * 2);
+   if (i++ == testSequenceLength) {
+   running = false;
+   }
+   }
+   }
+
+   @Override
+   public void cancel() {
+   running = false;
+   }
+   });
+
+   FlinkKafkaProducer011 producer = new 
FlinkKafkaProducer011<>(topic,
+   new TestHeadersKeyedSerializationSchema(topic), 
standardProps, Optional.empty());
+   testSequence.addSink(producer).setParallelism(3);
+   env.execute("Produce some data");
+
+   // Now let's consume data and check that headers deserialized 
correctly
+   env = StreamExecutionEnvironment.getExecutionEnvironment();
+   env.setParallelism(1);
+   
env.getConfig().setRestartStrategy(RestartStrategies.noRestart());
+   env.getConfig().disableSysoutLogging();
+   
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+
+   FlinkKafkaConsumer011 kafkaSource = new 
FlinkKafkaConsumer011<>(topic, new 
TestHeadersKeyedDeserializationSchema(testSequenceLength), standardProps);
+
+   env.addSource(kafkaSource).addSink(new 
TestHeadersElementValid());
+   env.execute("Consume again");
+
+   deleteTestTopic(topic);
+   }
+
+   /**
+* Element consisting of key, value and headers represented as list of 
tuples: key, list of Bytes.
+*/
+   public static class TestHeadersElement extends Tuple3>>> {
+
+   }
+
+   /**
+* Generate "headers" for given element.
+* @param element - sequence element
+* @return headers
+*/
+   private static Iterable> headersFor(Long 
element) {
+   final long x = element;
+   return Arrays.asList(
+   new AbstractMap.SimpleImmutableEntry<>("low", new 
byte[]{
+   (byte) ((x >>> 8) & 0xFF),
+   (byte) ((x) & 0xFF)
+   }),
+   new AbstractMap.SimpleImmutableEntry<>("low", new 
byte[]{
+   (byte) ((x >>> 24) & 0xFF),
+   (byte) ((x >>> 16) & 0xFF)
+   }),
+   new AbstractMap.SimpleImmutableEntry<>("high", new 
byte[]{
+   (byte) ((x >>> 40) & 0xFF),
+   (byte) ((x >>> 32) & 0xFF)
+   }),
+   new AbstractMap.SimpleImmutableEntry<>("high", new 
byte[]{
+   (byte) ((x >>> 56) & 0xFF),
+   (byte) ((x >>> 48) & 0xFF)
+   })
+   );
+   }
+
+   /**
+* Convert headers into list of tuples representation. List of tuples 
is more 

[jira] [Commented] (FLINK-8354) Flink Kafka connector ignores Kafka message headers

2018-08-27 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8354?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16594037#comment-16594037
 ] 

ASF GitHub Bot commented on FLINK-8354:
---

alexeyt820 commented on a change in pull request #6615: [FLINK-8354] 
[flink-connectors] Add ability to access and provider Kafka headers
URL: https://github.com/apache/flink/pull/6615#discussion_r213060361
 
 

 ##
 File path: 
flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011ITCase.java
 ##
 @@ -350,4 +363,201 @@ public boolean isEndOfStream(Long nextElement) {
}
}
 
+   /**
+* Kafka 0.11 specific test, ensuring Kafka Headers are properly 
written to and read from Kafka.
+*/
+   @Test(timeout = 6)
+   public void testHeaders() throws Exception {
+   final String topic = "headers-topic";
+   final long testSequenceLength = 127L;
+   createTestTopic(topic, 3, 1);
+   StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+   env.setParallelism(1);
+   
env.getConfig().setRestartStrategy(RestartStrategies.noRestart());
+   env.getConfig().disableSysoutLogging();
+   env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+
+   DataStream testSequence = env.addSource(new 
SourceFunction() {
+   private static final long serialVersionUID = 1L;
+   boolean running = true;
+
+   @Override
+   public void run(SourceContext ctx) throws 
Exception {
+   long i = 0;
+   while (running) {
+   ctx.collectWithTimestamp(i, i * 2);
+   if (i++ == testSequenceLength) {
+   running = false;
+   }
+   }
+   }
+
+   @Override
+   public void cancel() {
+   running = false;
+   }
+   });
+
+   FlinkKafkaProducer011 producer = new 
FlinkKafkaProducer011<>(topic,
+   new TestHeadersKeyedSerializationSchema(topic), 
standardProps, Optional.empty());
+   testSequence.addSink(producer).setParallelism(3);
+   env.execute("Produce some data");
+
+   // Now let's consume data and check that headers deserialized 
correctly
+   env = StreamExecutionEnvironment.getExecutionEnvironment();
+   env.setParallelism(1);
+   
env.getConfig().setRestartStrategy(RestartStrategies.noRestart());
+   env.getConfig().disableSysoutLogging();
+   
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+
+   FlinkKafkaConsumer011 kafkaSource = new 
FlinkKafkaConsumer011<>(topic, new 
TestHeadersKeyedDeserializationSchema(testSequenceLength), standardProps);
+
+   env.addSource(kafkaSource).addSink(new 
TestHeadersElementValid());
+   env.execute("Consume again");
+
+   deleteTestTopic(topic);
+   }
+
+   /**
+* Element consisting of key, value and headers represented as list of 
tuples: key, list of Bytes.
+*/
+   public static class TestHeadersElement extends Tuple3>>> {
+
+   }
+
+   /**
+* Generate "headers" for given element.
+* @param element - sequence element
+* @return headers
+*/
+   private static Iterable> headersFor(Long 
element) {
+   final long x = element;
+   return Arrays.asList(
+   new AbstractMap.SimpleImmutableEntry<>("low", new 
byte[]{
+   (byte) ((x >>> 8) & 0xFF),
+   (byte) ((x) & 0xFF)
+   }),
+   new AbstractMap.SimpleImmutableEntry<>("low", new 
byte[]{
+   (byte) ((x >>> 24) & 0xFF),
+   (byte) ((x >>> 16) & 0xFF)
+   }),
+   new AbstractMap.SimpleImmutableEntry<>("high", new 
byte[]{
+   (byte) ((x >>> 40) & 0xFF),
+   (byte) ((x >>> 32) & 0xFF)
+   }),
+   new AbstractMap.SimpleImmutableEntry<>("high", new 
byte[]{
+   (byte) ((x >>> 56) & 0xFF),
+   (byte) ((x >>> 48) & 0xFF)
+   })
+   );
+   }
+
+   /**
+* Convert headers into list of tuples representation. List of tuples 
is mo

[jira] [Commented] (FLINK-8354) Flink Kafka connector ignores Kafka message headers

2018-08-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8354?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16596396#comment-16596396
 ] 

ASF GitHub Bot commented on FLINK-8354:
---

aljoscha commented on issue #6615: [FLINK-8354] [flink-connectors] Add ability 
to access and provider Kafka headers
URL: https://github.com/apache/flink/pull/6615#issuecomment-416972143
 
 
   Hi, we have to carefully coordinate this with #6105 and and #6577. See 
especially my comments on #6577.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Flink Kafka connector ignores Kafka message  headers 
> -
>
> Key: FLINK-8354
> URL: https://issues.apache.org/jira/browse/FLINK-8354
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
> Environment: Kafka 0.11.0.0
> Flink 1.4.0
> flink-connector-kafka-0.11_2.11 
>Reporter: Mohammad Abareghi
>Assignee: Aegeaner
>Priority: Major
>  Labels: pull-request-available
>
> Kafka has introduced notion of Header for messages in version 0.11.0.0  
> https://issues.apache.org/jira/browse/KAFKA-4208.
> But flink-connector-kafka-0.11_2.11 which supports kafka 0.11.0.0 ignores 
> headers when consuming kafka messages. 
> It would be useful in some scenarios, such as distributed log tracing, to 
> support message headers to FlinkKafkaConsumer011 and FlinkKafkaProducer011. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8354) Flink Kafka connector ignores Kafka message headers

2018-09-12 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8354?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16612800#comment-16612800
 ] 

ASF GitHub Bot commented on FLINK-8354:
---

alexeyt820 commented on issue #6615: [FLINK-8354] [flink-connectors] Add 
ability to access and provider Kafka headers
URL: https://github.com/apache/flink/pull/6615#issuecomment-420812548
 
 
   @yanghua, @aljoscha any other suggestions?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Flink Kafka connector ignores Kafka message  headers 
> -
>
> Key: FLINK-8354
> URL: https://issues.apache.org/jira/browse/FLINK-8354
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
> Environment: Kafka 0.11.0.0
> Flink 1.4.0
> flink-connector-kafka-0.11_2.11 
>Reporter: Mohammad Abareghi
>Assignee: Aegeaner
>Priority: Major
>  Labels: pull-request-available
>
> Kafka has introduced notion of Header for messages in version 0.11.0.0  
> https://issues.apache.org/jira/browse/KAFKA-4208.
> But flink-connector-kafka-0.11_2.11 which supports kafka 0.11.0.0 ignores 
> headers when consuming kafka messages. 
> It would be useful in some scenarios, such as distributed log tracing, to 
> support message headers to FlinkKafkaConsumer011 and FlinkKafkaProducer011. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8354) Flink Kafka connector ignores Kafka message headers

2018-09-12 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8354?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16613052#comment-16613052
 ] 

ASF GitHub Bot commented on FLINK-8354:
---

tzulitai commented on issue #6615: [FLINK-8354] [flink-connectors] Add ability 
to access and provider Kafka headers
URL: https://github.com/apache/flink/pull/6615#issuecomment-420892843
 
 
   @alexeyt820 I think it is not easily possible to resolve the conflicts 
between this PR and #6105, which also touches the `KeyedDeserializationSchema` 
and follows a somewhat different approach.
   
   I'm also not found of wrapping the Kafka record bytes into `Record`; that 
basically always ties deserialization of the bytes with access to other meta 
information, and makes it hard to reuse some already existing deserialization 
formats such as `AvroDeserializationSchema`.
   
   Is it ok if I open a new PR that is based on this one and #6105, and see 
what I come up with? I might not be able to do that this week, but I can try 
next week.
   
   cc @FredTing would that also be ok for you?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Flink Kafka connector ignores Kafka message  headers 
> -
>
> Key: FLINK-8354
> URL: https://issues.apache.org/jira/browse/FLINK-8354
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
> Environment: Kafka 0.11.0.0
> Flink 1.4.0
> flink-connector-kafka-0.11_2.11 
>Reporter: Mohammad Abareghi
>Assignee: Aegeaner
>Priority: Major
>  Labels: pull-request-available
>
> Kafka has introduced notion of Header for messages in version 0.11.0.0  
> https://issues.apache.org/jira/browse/KAFKA-4208.
> But flink-connector-kafka-0.11_2.11 which supports kafka 0.11.0.0 ignores 
> headers when consuming kafka messages. 
> It would be useful in some scenarios, such as distributed log tracing, to 
> support message headers to FlinkKafkaConsumer011 and FlinkKafkaProducer011. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8354) Flink Kafka connector ignores Kafka message headers

2018-09-12 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8354?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16613054#comment-16613054
 ] 

ASF GitHub Bot commented on FLINK-8354:
---

tzulitai edited a comment on issue #6615: [FLINK-8354] [flink-connectors] Add 
ability to access and provider Kafka headers
URL: https://github.com/apache/flink/pull/6615#issuecomment-420892843
 
 
   @alexeyt820 I think it is not easily possible to resolve the conflicts 
between this PR and #6105, which also touches the `KeyedDeserializationSchema` 
and follows a somewhat different approach.
   
   I'm also not found of wrapping the Kafka record bytes into `Record`; that 
basically always ties deserialization of the bytes with access to other meta 
information, and makes it hard to reuse some already existing deserialization 
formats such as `AvroDeserializationSchema`.
   
   Is it ok if I open a new PR that is based on this one and #6105, and see 
what I come up with? I might not be able to do that this week, but I can do 
that as soon as I finish with what I'm currently busy with.
   
   cc @FredTing would that also be ok for you?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Flink Kafka connector ignores Kafka message  headers 
> -
>
> Key: FLINK-8354
> URL: https://issues.apache.org/jira/browse/FLINK-8354
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
> Environment: Kafka 0.11.0.0
> Flink 1.4.0
> flink-connector-kafka-0.11_2.11 
>Reporter: Mohammad Abareghi
>Assignee: Aegeaner
>Priority: Major
>  Labels: pull-request-available
>
> Kafka has introduced notion of Header for messages in version 0.11.0.0  
> https://issues.apache.org/jira/browse/KAFKA-4208.
> But flink-connector-kafka-0.11_2.11 which supports kafka 0.11.0.0 ignores 
> headers when consuming kafka messages. 
> It would be useful in some scenarios, such as distributed log tracing, to 
> support message headers to FlinkKafkaConsumer011 and FlinkKafkaProducer011. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8354) Flink Kafka connector ignores Kafka message headers

2018-09-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8354?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16613166#comment-16613166
 ] 

ASF GitHub Bot commented on FLINK-8354:
---

FredTing commented on issue #6615: [FLINK-8354] [flink-connectors] Add ability 
to access and provider Kafka headers
URL: https://github.com/apache/flink/pull/6615#issuecomment-420918918
 
 
   @tzulitai That's fine with me. Just let me know whether I should close PR 
#6105 now or wait until you submitted the new PR.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Flink Kafka connector ignores Kafka message  headers 
> -
>
> Key: FLINK-8354
> URL: https://issues.apache.org/jira/browse/FLINK-8354
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
> Environment: Kafka 0.11.0.0
> Flink 1.4.0
> flink-connector-kafka-0.11_2.11 
>Reporter: Mohammad Abareghi
>Assignee: Aegeaner
>Priority: Major
>  Labels: pull-request-available
>
> Kafka has introduced notion of Header for messages in version 0.11.0.0  
> https://issues.apache.org/jira/browse/KAFKA-4208.
> But flink-connector-kafka-0.11_2.11 which supports kafka 0.11.0.0 ignores 
> headers when consuming kafka messages. 
> It would be useful in some scenarios, such as distributed log tracing, to 
> support message headers to FlinkKafkaConsumer011 and FlinkKafkaProducer011. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8354) Flink Kafka connector ignores Kafka message headers

2018-09-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8354?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16613528#comment-16613528
 ] 

ASF GitHub Bot commented on FLINK-8354:
---

alexeyt820 commented on issue #6615: [FLINK-8354] [flink-connectors] Add 
ability to access and provider Kafka headers
URL: https://github.com/apache/flink/pull/6615#issuecomment-421014612
 
 
   @tzulitai, using ```Record``` wrapping Kafka ```ConsumerRecord``` allows to 
add for example timestamp from PR #6105 w/o need to change client code, so it 
looks like more extensible approach. Not sure how it makes hard to reuse some 
already existing deserialization formats such as 
```AvroDeserializationSchema```, at least not harder then now - 
```AvroDeserializationSchema``` will be wrapped via 
```KeyedDeserializationSchemaWrapper``` in exactly same way as know. Also 
```KeyedDeserializationSchemaWrapper``` calls only ```Record.value()```, so it 
doesn't ties deserialization of byte with access to other metadata, not in 
terms of execution path (in logical terms it is always tied because underlying 
level - Kafka ```ConsumerRecord``` contains key, value and metadata)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Flink Kafka connector ignores Kafka message  headers 
> -
>
> Key: FLINK-8354
> URL: https://issues.apache.org/jira/browse/FLINK-8354
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
> Environment: Kafka 0.11.0.0
> Flink 1.4.0
> flink-connector-kafka-0.11_2.11 
>Reporter: Mohammad Abareghi
>Assignee: Aegeaner
>Priority: Major
>  Labels: pull-request-available
>
> Kafka has introduced notion of Header for messages in version 0.11.0.0  
> https://issues.apache.org/jira/browse/KAFKA-4208.
> But flink-connector-kafka-0.11_2.11 which supports kafka 0.11.0.0 ignores 
> headers when consuming kafka messages. 
> It would be useful in some scenarios, such as distributed log tracing, to 
> support message headers to FlinkKafkaConsumer011 and FlinkKafkaProducer011. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8354) Flink Kafka connector ignores Kafka message headers

2018-09-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8354?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16613606#comment-16613606
 ] 

ASF GitHub Bot commented on FLINK-8354:
---

aljoscha commented on issue #6615: [FLINK-8354] [flink-connectors] Add ability 
to access and provider Kafka headers
URL: https://github.com/apache/flink/pull/6615#issuecomment-421038704
 
 
   For this, I would await the outcome of #6577. If we end up only having one 
"modern" Kafka connector we might just pass through the `ConsumerRecord`. 
Otherwise we might have to do some wrapping.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Flink Kafka connector ignores Kafka message  headers 
> -
>
> Key: FLINK-8354
> URL: https://issues.apache.org/jira/browse/FLINK-8354
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
> Environment: Kafka 0.11.0.0
> Flink 1.4.0
> flink-connector-kafka-0.11_2.11 
>Reporter: Mohammad Abareghi
>Assignee: Aegeaner
>Priority: Major
>  Labels: pull-request-available
>
> Kafka has introduced notion of Header for messages in version 0.11.0.0  
> https://issues.apache.org/jira/browse/KAFKA-4208.
> But flink-connector-kafka-0.11_2.11 which supports kafka 0.11.0.0 ignores 
> headers when consuming kafka messages. 
> It would be useful in some scenarios, such as distributed log tracing, to 
> support message headers to FlinkKafkaConsumer011 and FlinkKafkaProducer011. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8354) Flink Kafka connector ignores Kafka message headers

2018-07-26 Thread Alexey Trenikhin (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8354?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16559157#comment-16559157
 ] 

Alexey Trenikhin commented on FLINK-8354:
-

In our case we are using Headers to propagate avro schema fingerprints, so to 
deserialize we need access to headers

> Flink Kafka connector ignores Kafka message  headers 
> -
>
> Key: FLINK-8354
> URL: https://issues.apache.org/jira/browse/FLINK-8354
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
> Environment: Kafka 0.11.0.0
> Flink 1.4.0
> flink-connector-kafka-0.11_2.11 
>Reporter: Mohammad Abareghi
>Assignee: Aegeaner
>Priority: Major
>
> Kafka has introduced notion of Header for messages in version 0.11.0.0  
> https://issues.apache.org/jira/browse/KAFKA-4208.
> But flink-connector-kafka-0.11_2.11 which supports kafka 0.11.0.0 ignores 
> headers when consuming kafka messages. 
> It would be useful in some scenarios, such as distributed log tracing, to 
> support message headers to FlinkKafkaConsumer011 and FlinkKafkaProducer011. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)