[
https://issues.apache.org/jira/browse/FLINK-7245?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16114167#comment-16114167
]
Xingcan Cui commented on FLINK-7245:
------------------------------------
Hi all, I'd like to throw out some basic ideas about the design.
# To support holding back watermarks, I plan to cache all the received
watermarks as a priority queue in the {{InternalTimeServiceManager}} and expose
some methods needed (e.g., the {{peek()}} and {{poll()}}).
# For the {{advanceWatermark()}} method in {{InternalTimeServiceManager}}, I
think we can add a boolean parameter to indicate whether the watermark should
be cached.
# A {{triggerWatermark()}} method, which can contain a default emitting
mechanism (i.e., remove some watermarks from the cache and emit them) or be
(partially) user-defined in the future, should be added to a new
{{WatermarkPostponableOperator}}.
# Now the {{processWatermark()}} method in {{AbstractStreamOperator}} can be
overridden in the {{WatermarkPostponableOperator}}.
# The watermarks can be snapshotted and restored with the
{{snapshotStateForKeyGroup()}} and {{restoreStateForKeyGroup()}} methods in
{{InternalTimeServiceManager}}.
There's a question. For an operator with two inputs, the current
{{AbstractStreamOperator}} deals with their watermarks by merging them in
advance, i.e.,
{code:java}
public void processWatermark1(Watermark mark) throws Exception {
input1Watermark = mark.getTimestamp();
long newMin = Math.min(input1Watermark, input2Watermark);
if (newMin > combinedWatermark) {
combinedWatermark = newMin;
processWatermark(new Watermark(combinedWatermark));
}
}
public void processWatermark2(Watermark mark) throws Exception {
input2Watermark = mark.getTimestamp();
long newMin = Math.min(input1Watermark, input2Watermark);
if (newMin > combinedWatermark) {
combinedWatermark = newMin;
processWatermark(new Watermark(combinedWatermark));
}
}
{code}
I'm not sure if we should add two separate queues for them or just keep the
current mechanism.
What do you think? [~fhueske], [~aljoscha], and [~jark].
Best, Xingcan
> Enhance the operators to support holding back watermarks
> --------------------------------------------------------
>
> Key: FLINK-7245
> URL: https://issues.apache.org/jira/browse/FLINK-7245
> Project: Flink
> Issue Type: New Feature
> Components: DataStream API
> Reporter: Xingcan Cui
> Assignee: Xingcan Cui
>
> Currently the watermarks are applied and emitted by the
> {{AbstractStreamOperator}} instantly.
> {code:java}
> public void processWatermark(Watermark mark) throws Exception {
> if (timeServiceManager != null) {
> timeServiceManager.advanceWatermark(mark);
> }
> output.emitWatermark(mark);
> }
> {code}
> Some calculation results (with timestamp fields) triggered by these
> watermarks (e.g., join or aggregate results) may be regarded as delayed by
> the downstream operators since their timestamps must be less than or equal to
> the corresponding triggers.
> This issue aims to add another "working mode", which supports holding back
> watermarks, to current operators. These watermarks should be blocked and
> stored by the operators until all the corresponding new generated results are
> emitted.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)