[ https://issues.apache.org/jira/browse/FLINK-4391?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15636088#comment-15636088 ]
ASF GitHub Bot commented on FLINK-4391: --------------------------------------- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2629#discussion_r86527698 --- Diff: flink-tests/src/test/java/org/apache/flink/test/streaming/api/StreamingOperatorsITCase.java --- @@ -195,6 +202,70 @@ public Integer map(NonSerializable value) throws Exception { env.execute(); } + @Test + public void testAsyncWaitOperator() throws Exception { + final int numElements = 10; + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + DataStream<Tuple2<Integer, NonSerializable>> input = env.addSource(new NonSerializableTupleSource(numElements)).setParallelism(1); + + AsyncFunction<Tuple2<Integer, NonSerializable>, Integer> function = new RichAsyncFunction<Tuple2<Integer, NonSerializable>, Integer>() { + transient ExecutorService executorService; + + @Override + public void open(Configuration parameters) throws Exception { + super.open(parameters); + executorService = Executors.newFixedThreadPool(numElements); + } + + @Override + public void close() throws Exception { + super.close(); + executorService.shutdown(); + } + + @Override + public void asyncInvoke(final Tuple2<Integer, NonSerializable> input, + final AsyncCollector<Tuple2<Integer, NonSerializable>, Integer> collector) throws Exception { + this.executorService.submit(new Runnable() { + @Override + public void run() { + // wait for while to simulate async operation here + int sleep = (int) (new Random().nextFloat() * 1000); + try { + Thread.sleep(sleep); + List<Integer> ret = new ArrayList<>(); + ret.add(input.f0+input.f0); + collector.collect(ret); + } + catch (InterruptedException e) { + collector.collect(new ArrayList<Integer>(0)); + } + } + }); + } + }; + + DataStream<Integer> orderedResult = AsyncDataStream.orderedWait(input, function, 2).setParallelism(1); + orderedResult.writeAsText(resultPath1, FileSystem.WriteMode.OVERWRITE).setParallelism(1); + + DataStream<Integer> unorderedResult = AsyncDataStream.unorderedWait(input, function, 2).setParallelism(1); + unorderedResult.writeAsText(resultPath2, FileSystem.WriteMode.OVERWRITE); --- End diff -- You don't have to use the `compareResultsByLinesInMemory` to check the results. Then you don't have to write it on disk. > Provide support for asynchronous operations over streams > -------------------------------------------------------- > > Key: FLINK-4391 > URL: https://issues.apache.org/jira/browse/FLINK-4391 > Project: Flink > Issue Type: New Feature > Components: DataStream API > Reporter: Jamie Grier > Assignee: david.wang > > Many Flink users need to do asynchronous processing driven by data from a > DataStream. The classic example would be joining against an external > database in order to enrich a stream with extra information. > It would be nice to add general support for this type of operation in the > Flink API. Ideally this could simply take the form of a new operator that > manages async operations, keeps so many of them in flight, and then emits > results to downstream operators as the async operations complete. -- This message was sent by Atlassian JIRA (v6.3.4#6332)