[ https://issues.apache.org/jira/browse/FLINK-4035?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15437170#comment-15437170 ]
ASF GitHub Bot commented on FLINK-4035: --------------------------------------- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/2369#discussion_r76277062 --- Diff: flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest.java --- @@ -135,45 +135,45 @@ public void testPeriodicWatermarks() throws Exception { // elements generate a watermark if the timestamp is a multiple of three // elements for partition 1 - fetcher.emitRecord(1L, part1, 1L, new ConsumerRecord<byte[], byte[]>(testTopic, 7, new byte[]{0}, 1L)); - fetcher.emitRecord(2L, part1, 2L, new ConsumerRecord<byte[], byte[]>(testTopic, 7, new byte[]{0}, 2L)); - fetcher.emitRecord(3L, part1, 3L, new ConsumerRecord<byte[], byte[]>(testTopic, 7, new byte[]{0}, 3L)); + fetcher.emitRecord(1L, part1, 1L, Long.MAX_VALUE); + fetcher.emitRecord(2L, part1, 2L, Long.MAX_VALUE); + fetcher.emitRecord(3L, part1, 3L, Long.MAX_VALUE); assertEquals(3L, sourceContext.getLatestElement().getValue().longValue()); assertEquals(3L, sourceContext.getLatestElement().getTimestamp()); // elements for partition 2 - fetcher.emitRecord(12L, part2, 1L, new ConsumerRecord<byte[], byte[]>(testTopic, 13, new byte[]{0}, 1L)); + fetcher.emitRecord(12L, part2, 1L, Long.MAX_VALUE); assertEquals(12L, sourceContext.getLatestElement().getValue().longValue()); assertEquals(12L, sourceContext.getLatestElement().getTimestamp()); // elements for partition 3 - fetcher.emitRecord(101L, part3, 1L, new ConsumerRecord<byte[], byte[]>(testTopic, 21, new byte[]{0}, 1L)); - fetcher.emitRecord(102L, part3, 2L, new ConsumerRecord<byte[], byte[]>(testTopic, 21, new byte[]{0}, 2L)); + fetcher.emitRecord(101L, part3, 1L, Long.MAX_VALUE); + fetcher.emitRecord(102L, part3, 2L, Long.MAX_VALUE); assertEquals(102L, sourceContext.getLatestElement().getValue().longValue()); assertEquals(102L, sourceContext.getLatestElement().getTimestamp()); // now, we should have a watermark (this blocks until the periodic thread emitted the watermark) assertEquals(3L, sourceContext.getLatestWatermark().getTimestamp()); // advance partition 3 - fetcher.emitRecord(1003L, part3, 3L, new ConsumerRecord<byte[], byte[]>(testTopic, 21, new byte[]{0}, 3L)); - fetcher.emitRecord(1004L, part3, 4L, new ConsumerRecord<byte[], byte[]>(testTopic, 21, new byte[]{0}, 4L)); - fetcher.emitRecord(1005L, part3, 5L, new ConsumerRecord<byte[], byte[]>(testTopic, 21, new byte[]{0}, 5L)); + fetcher.emitRecord(1003L, part3, 3L, Long.MAX_VALUE); + fetcher.emitRecord(1004L, part3, 4L, Long.MAX_VALUE); + fetcher.emitRecord(1005L, part3, 5L, Long.MAX_VALUE); assertEquals(1005L, sourceContext.getLatestElement().getValue().longValue()); assertEquals(1005L, sourceContext.getLatestElement().getTimestamp()); // advance partition 1 beyond partition 2 - this bumps the watermark - fetcher.emitRecord(30L, part1, 4L, new ConsumerRecord<byte[], byte[]>(testTopic, 7, new byte[]{0}, 4L)); + fetcher.emitRecord(30L, part1, 4L, Long.MAX_VALUE); assertEquals(30L, sourceContext.getLatestElement().getValue().longValue()); assertEquals(30L, sourceContext.getLatestElement().getTimestamp()); // this blocks until the periodic thread emitted the watermark assertEquals(12L, sourceContext.getLatestWatermark().getTimestamp()); // advance partition 2 again - this bumps the watermark - fetcher.emitRecord(13L, part2, 2L, new ConsumerRecord<byte[], byte[]>(testTopic, 13, new byte[]{0}, 2L)); - fetcher.emitRecord(14L, part2, 3L, new ConsumerRecord<byte[], byte[]>(testTopic, 13, new byte[]{0}, 3L)); - fetcher.emitRecord(15L, part2, 3L, new ConsumerRecord<byte[], byte[]>(testTopic, 13, new byte[]{0}, 3L)); + fetcher.emitRecord(13L, part2, 2L, Long.MAX_VALUE); + fetcher.emitRecord(14L, part2, 3L, Long.MAX_VALUE); + fetcher.emitRecord(15L, part2, 3L, Long.MAX_VALUE); --- End diff -- Same here: I think these are supposed to give a `Long.MIN_VALUE` instead of MAX? > Bump Kafka producer in Kafka sink to Kafka 0.10.0.0 > --------------------------------------------------- > > Key: FLINK-4035 > URL: https://issues.apache.org/jira/browse/FLINK-4035 > Project: Flink > Issue Type: Bug > Components: Kafka Connector > Affects Versions: 1.0.3 > Reporter: Elias Levy > Assignee: Robert Metzger > Priority: Minor > > Kafka 0.10.0.0 introduced protocol changes related to the producer. > Published messages now include timestamps and compressed messages now include > relative offsets. As it is now, brokers must decompress publisher compressed > messages, assign offset to them, and recompress them, which is wasteful and > makes it less likely that compression will be used at all. -- This message was sent by Atlassian JIRA (v6.3.4#6332)