[ 
https://issues.apache.org/jira/browse/FLINK-13798?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhijiang updated FLINK-13798:
-----------------------------
    Comment: was deleted

(was: Thanks for the replies. [~StephanEwen]

Wish it would make logic easy understand as you mentioned in new source 
operator. But now we have to solve this issue during refactoring. :(

I just found another issue which is not mentioned in our precious discussion. 
If we remove this check from RecordWriterOutput, and add this logic in 
TimestampsAndPeriodicWatermarksOperator, for the  AutomaticWatermarkContext 
case it seems no problem. But for the above second case of  
ManualWatermarkContext, I am not quite sure whether it needs the check logic as 
now. Could you help double confirm it? [~tzulitai]

 )

> Refactor the process of checking stream status while emitting watermark in 
> source
> ---------------------------------------------------------------------------------
>
>                 Key: FLINK-13798
>                 URL: https://issues.apache.org/jira/browse/FLINK-13798
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Runtime / Task
>            Reporter: zhijiang
>            Assignee: zhijiang
>            Priority: Minor
>
> As we know, the watermark could be emitted to downstream only when the stream 
> status is active. For the downstream task we already have the component of 
> StatusWatermarkValve in StreamInputProcessor to handle this logic. But for 
> the source task the current implementation of this logic seems a bit tricky. 
> There are three scenarios for the source case:
>  * In the AutomaticWatermarkContext, it would toggle the status as active 
> while collecting record and the status is checked in the RecordWriterOutput. 
> If the watermark is triggered by timer, the timer task would check the status 
> before emitting watermark. 
>  * In the ManualWatermarkContext, the status is also checked in 
> RecordWriterOutput before emitting watermark.
>  * TimestampsAndPeriodicWatermarksOperator: The watermark is scheduled by 
> timer in interval time. When it happens, it would call emitting watermark via 
> output. Then the RecordWriterOutput would check the status before emitting.
> So we can see that the checking logic in RecordWriterOutput only makes sense 
> for the last two scenarios, and seems redundant for the first scenario.
> Even worse, the logic is RecordWriterOutput would bring cycle dependency with 
> StreamStatusMaintainer, which is a blocker for the following work of 
> integrating source processing on runtime side.
> To solve above issues, the basic idea is to make this check logic in upper 
> layer instead of current low level RecordWriterOutput. The solution is that 
> we could migrate the checking logic from RecordWriterOutput to 
> TimestampsAndPeriodicWatermarksOperator. And the toggling active action could 
> be removed in AutomaticWatermarkContext while emitting records.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)

Reply via email to