[ https://issues.apache.org/jira/browse/FLINK-4035?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15523327#comment-15523327 ]
ASF GitHub Bot commented on FLINK-4035: --------------------------------------- Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/2369#discussion_r80496915 --- Diff: flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010Fetcher.java --- @@ -60,12 +60,17 @@ protected void assignPartitionsToConsumer(KafkaConsumer<byte[], byte[]> consumer consumer.assign(topicPartitions); } + @Override + protected void emitRecord(T record, KafkaTopicPartitionState<TopicPartition> partition, long offset, ConsumerRecord consumerRecord) throws Exception { + // pass timestamp + super.emitRecord(record, partition, offset, consumerRecord.timestamp()); + } + /** * Emit record Kafka-timestamp aware. */ @Override - protected <R> void emitRecord(T record, KafkaTopicPartitionState<TopicPartition> partitionState, long offset, R kafkaRecord) throws Exception { - long timestamp = ((ConsumerRecord) kafkaRecord).timestamp(); + protected void emitRecord(T record, KafkaTopicPartitionState<TopicPartition> partitionState, long offset, long timestamp) throws Exception { if (timestampWatermarkMode == NO_TIMESTAMPS_WATERMARKS) { --- End diff -- Yes, I'll do that. > Bump Kafka producer in Kafka sink to Kafka 0.10.0.0 > --------------------------------------------------- > > Key: FLINK-4035 > URL: https://issues.apache.org/jira/browse/FLINK-4035 > Project: Flink > Issue Type: Bug > Components: Kafka Connector > Affects Versions: 1.0.3 > Reporter: Elias Levy > Assignee: Robert Metzger > Priority: Minor > > Kafka 0.10.0.0 introduced protocol changes related to the producer. > Published messages now include timestamps and compressed messages now include > relative offsets. As it is now, brokers must decompress publisher compressed > messages, assign offset to them, and recompress them, which is wasteful and > makes it less likely that compression will be used at all. -- This message was sent by Atlassian JIRA (v6.3.4#6332)