Simon Su created FLINK-13492:
--------------------------------

             Summary: BoundedOutOfOrderTimestamps cause Watermark's timestamp 
leak
                 Key: FLINK-13492
                 URL: https://issues.apache.org/jira/browse/FLINK-13492
             Project: Flink
          Issue Type: Bug
          Components: Table SQL / Runtime
    Affects Versions: 1.9.0
            Reporter: Simon Su
         Attachments: Watermark_timestamp_leak.diff

{code:java}
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createLocalEnvironment(1, conf);

// Use eventtime, default autoWatermarkInterval is 200ms
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
Kafka kafka = new Kafka() 
.version("0.11") 
.topic(topic) 
.startFromLatest() 
.properties(properties);

Schema schema = new Schema();
for (int i = 0; i < names.length; i++) { 
    if ("timestamp".equalsIgnoreCase(names[i])) {
        // set latency to 1000ms
        schema.field("rowtime", types[i]).rowtime(new     
Rowtime().timestampsFromField("timestamp").watermarksPeriodicBounded(1000)); }  
  
    else { 
        schema.field(names[i], types[i]); 
}

/** ..... */
tableEnv 
.connect(kafka) 
.withFormat(new Protobuf().protobufName("order_sink")) 
.withSchema(schema) 
.inAppendMode() 
.registerTableSource("orderStream");{code}
Register up stream table, then use a 10s Tumble window on this table, we input 
a sequence of normal data, but there is not result output.

Then we start to debug to see if the watermark is normally emitted, finally we 
found the issue.
 # maxTimestamp will be initialized in BoundedOutOfOrderTimestamps to 
Long.MIN_VALUE.
 # nextTimestamp method will extract timestamp from source and set to 
maxTimestamp.
 # getWatermark() method will calculate the watermark's timestamp based on 
maxTimestamp and delay.



When  +{color:#205081}TimestampsAndPeriodicWatermarksOperator{color}+ 
{color:#333333}initialize and call open method, it will start to register a 
SystemTimeService to generate watermark based on watermarkInterval, so that's 
the problem, the thread initialize and call 
BoundedOutOfOrderTimestamps${color}getCurrentWatermark, it will cause a Long 
Value leak. {color:#d04437}(Long.MIN_VALUE - delay). which cause all of the 
watermark will be dropped because apparently there are less then ( 
Long.MIN_VALUE - delay ).

{color}

{color:#d04437}A workaround is to set a large autoWatermarkInterval to make 
SystemTimeService Thread a long start delay.{color}

 
{code:java}
public void onProcessingTime(long timestamp) throws Exception {
...
getProcessingTimeService().registerTimer(now + watermarkInterval, this);
...
}
{code}
 

 
{code:java}
public ScheduledFuture<?> registerTimer(long timestamp, ProcessingTimeCallback 
target) {
...
long delay = Math.max(timestamp - getCurrentProcessingTime(), 0) + 1;
...
}
{code}
 

{color:#d04437} {color}

{color:#d04437}Actually, I think we can fix it by add the delay in 
BoundedOutOfOrderTimestamps's constructor which can avoid the calculation leak 
...{color}

{color:#d04437} {color}

 

 



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)

Reply via email to