[ https://issues.apache.org/jira/browse/FLINK-4035?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15437224#comment-15437224 ]
ASF GitHub Bot commented on FLINK-4035: --------------------------------------- Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/2369 Thanks Robert for addressing my comments :) Overall, I like the new hybrid producer approach. However, I'm still curious whether or not it is possible / reasonable to drop the `FlinkKafkaProducer010Configuration` return type of invocation (b), and let both invocation methods return `FlinkKafkaProducer010` instead. So, ``` FlinkKafkaProducer010 kafka = new FlinkKafkaProducer010(...) // or FlinkKafkaProducer010 kafka = FlinkKafkaProducer010.writeToKafkaWithTimestamps(...) for timestamp support // setter config methods directly done on the FlinkKafkaProducer010 instance regardless of (a) or (b) kafka.setLogFailuresOnly(true) kafka.setFlushOnCheckpoint(true) kafka.setWriteTimestampToKafka(true) // would not have effect if original invocation method (a) was used ``` But we'll need to be bit hacky in `invokeInternal(element, elementTimestamp)`, something like only letting the given `timestamp` to `ProducerRecord` be non-null if `writeTimestampToKafka && elementTimestamp != Long.MIN_VALUE`. What do you think? > Bump Kafka producer in Kafka sink to Kafka 0.10.0.0 > --------------------------------------------------- > > Key: FLINK-4035 > URL: https://issues.apache.org/jira/browse/FLINK-4035 > Project: Flink > Issue Type: Bug > Components: Kafka Connector > Affects Versions: 1.0.3 > Reporter: Elias Levy > Assignee: Robert Metzger > Priority: Minor > > Kafka 0.10.0.0 introduced protocol changes related to the producer. > Published messages now include timestamps and compressed messages now include > relative offsets. As it is now, brokers must decompress publisher compressed > messages, assign offset to them, and recompress them, which is wasteful and > makes it less likely that compression will be used at all. -- This message was sent by Atlassian JIRA (v6.3.4#6332)