[
https://issues.apache.org/jira/browse/FLINK-4391?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15633158#comment-15633158
]
ASF GitHub Bot commented on FLINK-4391:
---------------------------------------
Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/2629#discussion_r86349629
--- Diff:
flink-tests/src/test/java/org/apache/flink/test/streaming/api/StreamingOperatorsITCase.java
---
@@ -195,6 +202,70 @@ public Integer map(NonSerializable value) throws
Exception {
env.execute();
}
+ @Test
+ public void testAsyncWaitOperator() throws Exception {
+ final int numElements = 10;
+
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+
+ DataStream<Tuple2<Integer, NonSerializable>> input =
env.addSource(new NonSerializableTupleSource(numElements)).setParallelism(1);
+
+ AsyncFunction<Tuple2<Integer, NonSerializable>, Integer>
function = new RichAsyncFunction<Tuple2<Integer, NonSerializable>, Integer>() {
+ transient ExecutorService executorService;
+
+ @Override
+ public void open(Configuration parameters) throws
Exception {
+ super.open(parameters);
+ executorService =
Executors.newFixedThreadPool(numElements);
+ }
+
+ @Override
+ public void close() throws Exception {
+ super.close();
+ executorService.shutdown();
+ }
+
+ @Override
+ public void asyncInvoke(final Tuple2<Integer,
NonSerializable> input,
+ final
AsyncCollector<Tuple2<Integer, NonSerializable>, Integer> collector) throws
Exception {
+ this.executorService.submit(new Runnable() {
+ @Override
+ public void run() {
+ // wait for while to simulate
async operation here
+ int sleep = (int) (new
Random().nextFloat() * 1000);
--- End diff --
Decrease the sleep timeout will help to speedup the test.
> Provide support for asynchronous operations over streams
> --------------------------------------------------------
>
> Key: FLINK-4391
> URL: https://issues.apache.org/jira/browse/FLINK-4391
> Project: Flink
> Issue Type: New Feature
> Components: DataStream API
> Reporter: Jamie Grier
> Assignee: david.wang
>
> Many Flink users need to do asynchronous processing driven by data from a
> DataStream. The classic example would be joining against an external
> database in order to enrich a stream with extra information.
> It would be nice to add general support for this type of operation in the
> Flink API. Ideally this could simply take the form of a new operator that
> manages async operations, keeps so many of them in flight, and then emits
> results to downstream operators as the async operations complete.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)