[
https://issues.apache.org/jira/browse/FLINK-4391?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15635019#comment-15635019
]
ASF GitHub Bot commented on FLINK-4391:
---------------------------------------
Github user bjlovegithub commented on the issue:
https://github.com/apache/flink/pull/2629
Hi @tillrohrmann , Thanks for your review ;D I will check through each of
your comments and update the PR later.
Coming to the first part of review, the first one is about `UNORDERED` mode
against `Watermark`. This combination is meaningless, of course. Maybe an error
can be printed out and the graph generator stops compiling the graph if
`UNORDERED` mode and `Watermark` are enabled at the same time?
Both of these two modes are guaranteed by `AsyncWaitOperator`. While doing
checkpoint for the chained operator and making the snapshot for the
`AsyncWaitOperator`, it will first try to get all elements in the
`AsyncCollectorBuffer` by calling `getStreamElementsInBuffer()`, which will try
to get the lock first to block `Emitter` thread and set a flag named
`isCheckpointing` to idle `Emitter` thread. So any finished `AsyncCollector`
will not be transferred to the next operator. Calling the `snapshotState()`
method is from the head operator to the tail operator, making sure that all
states can be taken correctly since `Emitter` threads in parent operators have
stopped working.
I used to consider about using checkpoint lock in `Emitter` thread, but
after testing with the case chaining multiple `AsyncWaitOperator` together, all
`Emitter` thread can not fully utilize the the parallelism since they have to
get the same lock while collecting outputs. One way to optimize this is to put
a conditional statement at `performCheckpoint()`, if there is an
`AsyncWaitOpeartor` in the chained operator, then it should broadcast barriers
later after `checkpointState()`, otherwise, we can use original design.
At last, I will add more test cases based on the
`OneInputStreamTaskTestHarness`.
> Provide support for asynchronous operations over streams
> --------------------------------------------------------
>
> Key: FLINK-4391
> URL: https://issues.apache.org/jira/browse/FLINK-4391
> Project: Flink
> Issue Type: New Feature
> Components: DataStream API
> Reporter: Jamie Grier
> Assignee: david.wang
>
> Many Flink users need to do asynchronous processing driven by data from a
> DataStream. The classic example would be joining against an external
> database in order to enrich a stream with extra information.
> It would be nice to add general support for this type of operation in the
> Flink API. Ideally this could simply take the form of a new operator that
> manages async operations, keeps so many of them in flight, and then emits
> results to downstream operators as the async operations complete.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)