[
https://issues.apache.org/jira/browse/FLINK-4391?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15683300#comment-15683300
]
ASF GitHub Bot commented on FLINK-4391:
---------------------------------------
Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/2629#discussion_r88720498
--- Diff:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
---
@@ -116,7 +116,12 @@ public OperatorChain(StreamTask<OUT, OP>
containingTask) {
// add head operator to end of chain
allOps.add(headOperator);
-
+
+ // reverse the order of all operators so that head
operator is at the first place.
+ // for chained operator with async wait operator,
operators after wait operator have to
+ // wait for while until all data in the buffer in wait
operator has done snapshot.
+ Collections.reverse(allOps);
--- End diff --
This won't work with emitting elements in the open method of
`AsyncWaitOperator`, because then the downstream operators are potentially not
yet opened when the first stream element arrives there.
> Provide support for asynchronous operations over streams
> --------------------------------------------------------
>
> Key: FLINK-4391
> URL: https://issues.apache.org/jira/browse/FLINK-4391
> Project: Flink
> Issue Type: New Feature
> Components: DataStream API
> Reporter: Jamie Grier
> Assignee: david.wang
>
> Many Flink users need to do asynchronous processing driven by data from a
> DataStream. The classic example would be joining against an external
> database in order to enrich a stream with extra information.
> It would be nice to add general support for this type of operation in the
> Flink API. Ideally this could simply take the form of a new operator that
> manages async operations, keeps so many of them in flight, and then emits
> results to downstream operators as the async operations complete.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)