[ 
https://issues.apache.org/jira/browse/FLINK-13798?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16911537#comment-16911537
 ] 

zhijiang commented on FLINK-13798:
----------------------------------

Thanks for this notification [~StephanEwen]. Wish it would make the watermark 
logic easy to go in the new source operator. But now we have to solve this 
issue by refactoring TimestampsAndPeriodicWatermarksOperator. :(

> Refactor the process of checking stream status while emitting watermark in 
> source
> ---------------------------------------------------------------------------------
>
>                 Key: FLINK-13798
>                 URL: https://issues.apache.org/jira/browse/FLINK-13798
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Runtime / Task
>            Reporter: zhijiang
>            Assignee: zhijiang
>            Priority: Minor
>              Labels: pull-request-available
>          Time Spent: 10m
>  Remaining Estimate: 0h
>
> As we know, the watermark could be emitted to downstream only when the stream 
> status is active. For the downstream task we already have the component of 
> StatusWatermarkValve in StreamInputProcessor to handle this logic. But for 
> the source task the current implementation of this logic seems a bit tricky. 
> There are two scenarios for the source case:
>  * In the source WatermarkContext, it would toggle the status as active while 
> collecting/emitting and the status is checked in RecordWriterOutput. If the 
> watermark is triggered by timer for AutomaticWatermarkContext, the timer task 
> would check the status before emitting watermark. 
>  * TimestampsAndPeriodicWatermarksOperator: The watermark is triggered by 
> timer, but it still relies on RecordWriterOutput to check the status before 
> emitting.
> So the check logic in RecordWriterOutput only makes sense for the last 
> scenario, and seems redundant for the first scenario.
> Even worse, this logic in RecordWriterOutput would bring cycle dependency 
> with StreamStatusMaintainer, which is a blocker for the following work of 
> integrating source processing on runtime side.
> To solve above issues, the basic idea is to refactor this check logic in 
> upper layer instead of current low level RecordWriterOutput. The solution is 
> migrating the check logic from RecordWriterOutput to 
> TimestampsAndPeriodicWatermarksOperator. And we could further remove the 
> logic of toggling active in WatermarkContext



--
This message was sent by Atlassian Jira
(v8.3.2#803003)

Reply via email to