[ 
https://issues.apache.org/jira/browse/FLINK-13798?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhijiang updated FLINK-13798:
-----------------------------
    Description: 
As we know, the watermark could be emitted to downstream only when the stream 
status is active. For the downstream task we already have the component of 
StatusWatermarkValve in StreamInputProcessor to handle this logic. But for the 
source task the current implementation of this logic seems a bit tricky. There 
are three scenarios for the source case:
 * In the AutomaticWatermarkContext, it would toggle the status as active while 
collecting record and the status is checked in the RecordWriterOutput. If the 
watermark is triggered by timer, the timer task would check the status before 
calling emit watermark. 
 * In the ManualWatermarkContext, the status is also checked in 
RecordWriterOutput before emitting watermark.
 * TimestampsAndPeriodicWatermarksOperator: The watermark is scheduled by timer 
in interval time. When it happens, it would call emitting watermark via output. 
Then the RecordWriterOutput would check the status before emitting.

So we can see that the checking logic in RecordWriterOutput only makes sense 
for the last two scenarios, and seems redundant for the first scenario.

Even worse, the logic is RecordWriterOutput would bring cycle dependency with 
StreamStatusMaintainer, which is a blocker for the following work of 
integrating source processing on runtime side.

To solve above issues, the basic idea is to make this check logic in upper 
layer instead of current low level RecordWriterOutput. The solution is that we 
could migrate the checking logic from RecordWriterOutput to 
TimestampsAndPeriodicWatermarksOperator. 

  was:
As we know, the watermark could be emitted to downstream only when the stream 
status is active. For the downstream task we already have the component of 
StatusWatermarkValve in StreamInputProcessor to handle this logic. But for the 
source task the current implementation of this logic seems a bit tricky. There 
are two scenarios for the source case:
 * Emit watermark via source context: In the specific WatermarkContext, it 
would toggle the  stream status as active before collecting/emitting 
records/watermarks. Then in the implementation of RecordWriterOutput, it would 
check the status always active before really emitting watermark.
 * TimestampsAndPeriodicWatermarksOperator: The watermark is triggered by timer 
in interval time. When it happens, it would call output stack to emit 
watermark. Then the RecordWriterOutput could take the role of checking status 
before really emitting watermark.

So we can see that the checking status logic in RecordWriterOutput only works 
for above second scenario, and this logic seems redundant for the first 
scenario because WatermarkContext always toggle active status before emitting. 
Even worse, the logic is RecordWriterOutput would bring cycle dependency with 
StreamStatusMaintainer, which is a blocker for the following work of 
integrating source processing on runtime side.

The solution is that we could migrate the checking logic from 
RecordWriterOutput to TimestampsAndPeriodicWatermarksOperator. 


> Refactor the process of checking stream status while emitting watermark in 
> source
> ---------------------------------------------------------------------------------
>
>                 Key: FLINK-13798
>                 URL: https://issues.apache.org/jira/browse/FLINK-13798
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Runtime / Task
>            Reporter: zhijiang
>            Assignee: zhijiang
>            Priority: Minor
>
> As we know, the watermark could be emitted to downstream only when the stream 
> status is active. For the downstream task we already have the component of 
> StatusWatermarkValve in StreamInputProcessor to handle this logic. But for 
> the source task the current implementation of this logic seems a bit tricky. 
> There are three scenarios for the source case:
>  * In the AutomaticWatermarkContext, it would toggle the status as active 
> while collecting record and the status is checked in the RecordWriterOutput. 
> If the watermark is triggered by timer, the timer task would check the status 
> before calling emit watermark. 
>  * In the ManualWatermarkContext, the status is also checked in 
> RecordWriterOutput before emitting watermark.
>  * TimestampsAndPeriodicWatermarksOperator: The watermark is scheduled by 
> timer in interval time. When it happens, it would call emitting watermark via 
> output. Then the RecordWriterOutput would check the status before emitting.
> So we can see that the checking logic in RecordWriterOutput only makes sense 
> for the last two scenarios, and seems redundant for the first scenario.
> Even worse, the logic is RecordWriterOutput would bring cycle dependency with 
> StreamStatusMaintainer, which is a blocker for the following work of 
> integrating source processing on runtime side.
> To solve above issues, the basic idea is to make this check logic in upper 
> layer instead of current low level RecordWriterOutput. The solution is that 
> we could migrate the checking logic from RecordWriterOutput to 
> TimestampsAndPeriodicWatermarksOperator. 



--
This message was sent by Atlassian Jira
(v8.3.2#803003)

Reply via email to