[ 
https://issues.apache.org/jira/browse/FLINK-4391?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15636088#comment-15636088
 ] 

ASF GitHub Bot commented on FLINK-4391:
---------------------------------------

Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2629#discussion_r86527698
  
    --- Diff: 
flink-tests/src/test/java/org/apache/flink/test/streaming/api/StreamingOperatorsITCase.java
 ---
    @@ -195,6 +202,70 @@ public Integer map(NonSerializable value) throws 
Exception {
                env.execute();
        }
     
    +   @Test
    +   public void testAsyncWaitOperator() throws Exception {
    +           final int numElements = 10;
    +
    +           StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
    +
    +           DataStream<Tuple2<Integer, NonSerializable>> input = 
env.addSource(new NonSerializableTupleSource(numElements)).setParallelism(1);
    +
    +           AsyncFunction<Tuple2<Integer, NonSerializable>, Integer> 
function = new RichAsyncFunction<Tuple2<Integer, NonSerializable>, Integer>() {
    +                   transient ExecutorService executorService;
    +
    +                   @Override
    +                   public void open(Configuration parameters) throws 
Exception {
    +                           super.open(parameters);
    +                           executorService = 
Executors.newFixedThreadPool(numElements);
    +                   }
    +
    +                   @Override
    +                   public void close() throws Exception {
    +                           super.close();
    +                           executorService.shutdown();
    +                   }
    +
    +                   @Override
    +                   public void asyncInvoke(final Tuple2<Integer, 
NonSerializable> input,
    +                                                                   final 
AsyncCollector<Tuple2<Integer, NonSerializable>, Integer> collector) throws 
Exception {
    +                           this.executorService.submit(new Runnable() {
    +                                   @Override
    +                                   public void run() {
    +                                           // wait for while to simulate 
async operation here
    +                                           int sleep = (int) (new 
Random().nextFloat() * 1000);
    +                                           try {
    +                                                   Thread.sleep(sleep);
    +                                                   List<Integer> ret = new 
ArrayList<>();
    +                                                   
ret.add(input.f0+input.f0);
    +                                                   collector.collect(ret);
    +                                           }
    +                                           catch (InterruptedException e) {
    +                                                   collector.collect(new 
ArrayList<Integer>(0));
    +                                           }
    +                                   }
    +                           });
    +                   }
    +           };
    +
    +           DataStream<Integer> orderedResult = 
AsyncDataStream.orderedWait(input, function, 2).setParallelism(1);
    +           orderedResult.writeAsText(resultPath1, 
FileSystem.WriteMode.OVERWRITE).setParallelism(1);
    +
    +           DataStream<Integer> unorderedResult = 
AsyncDataStream.unorderedWait(input, function, 2).setParallelism(1);
    +           unorderedResult.writeAsText(resultPath2, 
FileSystem.WriteMode.OVERWRITE);
    --- End diff --
    
    You don't have to use the `compareResultsByLinesInMemory` to check the 
results. Then you don't have to write it on disk.


> Provide support for asynchronous operations over streams
> --------------------------------------------------------
>
>                 Key: FLINK-4391
>                 URL: https://issues.apache.org/jira/browse/FLINK-4391
>             Project: Flink
>          Issue Type: New Feature
>          Components: DataStream API
>            Reporter: Jamie Grier
>            Assignee: david.wang
>
> Many Flink users need to do asynchronous processing driven by data from a 
> DataStream.  The classic example would be joining against an external 
> database in order to enrich a stream with extra information.
> It would be nice to add general support for this type of operation in the 
> Flink API.  Ideally this could simply take the form of a new operator that 
> manages async operations, keeps so many of them in flight, and then emits 
> results to downstream operators as the async operations complete.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to