[ 
https://issues.apache.org/jira/browse/FLINK-4035?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15437170#comment-15437170
 ] 

ASF GitHub Bot commented on FLINK-4035:
---------------------------------------

Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2369#discussion_r76277062
  
    --- Diff: 
flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest.java
 ---
    @@ -135,45 +135,45 @@ public void testPeriodicWatermarks() throws Exception 
{
                // elements generate a watermark if the timestamp is a multiple 
of three
     
                // elements for partition 1
    -           fetcher.emitRecord(1L, part1, 1L, new ConsumerRecord<byte[], 
byte[]>(testTopic, 7, new byte[]{0}, 1L));
    -           fetcher.emitRecord(2L, part1, 2L, new ConsumerRecord<byte[], 
byte[]>(testTopic, 7, new byte[]{0}, 2L));
    -           fetcher.emitRecord(3L, part1, 3L, new ConsumerRecord<byte[], 
byte[]>(testTopic, 7, new byte[]{0}, 3L));
    +           fetcher.emitRecord(1L, part1, 1L, Long.MAX_VALUE);
    +           fetcher.emitRecord(2L, part1, 2L, Long.MAX_VALUE);
    +           fetcher.emitRecord(3L, part1, 3L, Long.MAX_VALUE);
                assertEquals(3L, 
sourceContext.getLatestElement().getValue().longValue());
                assertEquals(3L, 
sourceContext.getLatestElement().getTimestamp());
     
                // elements for partition 2
    -           fetcher.emitRecord(12L, part2, 1L, new ConsumerRecord<byte[], 
byte[]>(testTopic, 13, new byte[]{0}, 1L));
    +           fetcher.emitRecord(12L, part2, 1L, Long.MAX_VALUE);
                assertEquals(12L, 
sourceContext.getLatestElement().getValue().longValue());
                assertEquals(12L, 
sourceContext.getLatestElement().getTimestamp());
     
                // elements for partition 3
    -           fetcher.emitRecord(101L, part3, 1L, new ConsumerRecord<byte[], 
byte[]>(testTopic, 21, new byte[]{0}, 1L));
    -           fetcher.emitRecord(102L, part3, 2L, new ConsumerRecord<byte[], 
byte[]>(testTopic, 21, new byte[]{0}, 2L));
    +           fetcher.emitRecord(101L, part3, 1L, Long.MAX_VALUE);
    +           fetcher.emitRecord(102L, part3, 2L, Long.MAX_VALUE);
                assertEquals(102L, 
sourceContext.getLatestElement().getValue().longValue());
                assertEquals(102L, 
sourceContext.getLatestElement().getTimestamp());
     
                // now, we should have a watermark (this blocks until the 
periodic thread emitted the watermark)
                assertEquals(3L, 
sourceContext.getLatestWatermark().getTimestamp());
     
                // advance partition 3
    -           fetcher.emitRecord(1003L, part3, 3L, new ConsumerRecord<byte[], 
byte[]>(testTopic, 21, new byte[]{0}, 3L));
    -           fetcher.emitRecord(1004L, part3, 4L, new ConsumerRecord<byte[], 
byte[]>(testTopic, 21, new byte[]{0}, 4L));
    -           fetcher.emitRecord(1005L, part3, 5L, new ConsumerRecord<byte[], 
byte[]>(testTopic, 21, new byte[]{0}, 5L));
    +           fetcher.emitRecord(1003L, part3, 3L, Long.MAX_VALUE);
    +           fetcher.emitRecord(1004L, part3, 4L, Long.MAX_VALUE);
    +           fetcher.emitRecord(1005L, part3, 5L, Long.MAX_VALUE);
                assertEquals(1005L, 
sourceContext.getLatestElement().getValue().longValue());
                assertEquals(1005L, 
sourceContext.getLatestElement().getTimestamp());
     
                // advance partition 1 beyond partition 2 - this bumps the 
watermark
    -           fetcher.emitRecord(30L, part1, 4L, new ConsumerRecord<byte[], 
byte[]>(testTopic, 7, new byte[]{0}, 4L));
    +           fetcher.emitRecord(30L, part1, 4L, Long.MAX_VALUE);
                assertEquals(30L, 
sourceContext.getLatestElement().getValue().longValue());
                assertEquals(30L, 
sourceContext.getLatestElement().getTimestamp());
                
                // this blocks until the periodic thread emitted the watermark
                assertEquals(12L, 
sourceContext.getLatestWatermark().getTimestamp());
     
                // advance partition 2 again - this bumps the watermark
    -           fetcher.emitRecord(13L, part2, 2L, new ConsumerRecord<byte[], 
byte[]>(testTopic, 13, new byte[]{0}, 2L));
    -           fetcher.emitRecord(14L, part2, 3L, new ConsumerRecord<byte[], 
byte[]>(testTopic, 13, new byte[]{0}, 3L));
    -           fetcher.emitRecord(15L, part2, 3L, new ConsumerRecord<byte[], 
byte[]>(testTopic, 13, new byte[]{0}, 3L));
    +           fetcher.emitRecord(13L, part2, 2L, Long.MAX_VALUE);
    +           fetcher.emitRecord(14L, part2, 3L, Long.MAX_VALUE);
    +           fetcher.emitRecord(15L, part2, 3L, Long.MAX_VALUE);
    --- End diff --
    
    Same here: I think these are supposed to give a `Long.MIN_VALUE` instead of 
MAX?


> Bump Kafka producer in Kafka sink to Kafka 0.10.0.0
> ---------------------------------------------------
>
>                 Key: FLINK-4035
>                 URL: https://issues.apache.org/jira/browse/FLINK-4035
>             Project: Flink
>          Issue Type: Bug
>          Components: Kafka Connector
>    Affects Versions: 1.0.3
>            Reporter: Elias Levy
>            Assignee: Robert Metzger
>            Priority: Minor
>
> Kafka 0.10.0.0 introduced protocol changes related to the producer.  
> Published messages now include timestamps and compressed messages now include 
> relative offsets.  As it is now, brokers must decompress publisher compressed 
> messages, assign offset to them, and recompress them, which is wasteful and 
> makes it less likely that compression will be used at all.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to