[
https://issues.apache.org/jira/browse/FLINK-4391?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15683316#comment-15683316
]
ASF GitHub Bot commented on FLINK-4391:
---------------------------------------
Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/2629#discussion_r88865801
--- Diff:
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
---
@@ -0,0 +1,359 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.async;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.checkpoint.SubtaskState;
+import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
+import org.apache.flink.runtime.state.TaskStateHandles;
+import org.apache.flink.streaming.api.datastream.AsyncDataStream;
+import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
+import
org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask;
+import
org.apache.flink.streaming.runtime.tasks.OneInputStreamTaskTestHarness;
+import org.apache.flink.streaming.runtime.tasks.StreamMockEnvironment;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.flink.streaming.util.TestHarnessUtil;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for {@link AsyncWaitOperator}. These test that:
+ *
+ * <ul>
+ * <li>Process StreamRecords and Watermarks in ORDERED mode</li>
+ * <li>Process StreamRecords and Watermarks in UNORDERED mode</li>
+ * <li>Snapshot state and restore state</li>
+ * </ul>
+ */
+public class AsyncWaitOperatorTest {
+
+ public static class MyAsyncFunction extends RichAsyncFunction<Integer,
Integer> {
+ transient final int SLEEP_FACTOR = 100;
+ transient final int THREAD_POOL_SIZE = 10;
+ transient ExecutorService executorService;
+ transient Random random;
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ super.open(parameters);
+ executorService =
Executors.newFixedThreadPool(THREAD_POOL_SIZE);
+ random = new Random();
+ }
+
+ @Override
+ public void close() throws Exception {
+ super.close();
+ executorService.shutdown();
+
executorService.awaitTermination(SLEEP_FACTOR*THREAD_POOL_SIZE,
TimeUnit.MILLISECONDS);
+ }
+
+ @Override
+ public void asyncInvoke(final Integer input, final
AsyncCollector<Integer, Integer> collector) throws Exception {
+ this.executorService.submit(new Runnable() {
+ @Override
+ public void run() {
+ // wait for while to simulate async
operation here
+ int sleep = (int) (random.nextFloat() *
SLEEP_FACTOR);
+ try {
+ Thread.sleep(sleep);
+ List<Integer> ret = new
ArrayList<>();
+ ret.add(input*2);
+ collector.collect(ret);
+ }
+ catch (InterruptedException e) {
+ }
+ }
+ });
+ }
+ }
+
+ @Test
+ public void testWaterMarkOrdered() throws Exception {
+ testWithWatermark(AsyncDataStream.OutputMode.ORDERED);
+ }
+
+ @Test
+ public void testWaterMarkUnordered() throws Exception {
+ testWithWatermark(AsyncDataStream.OutputMode.UNORDERED);
+ }
+
+ private void testWithWatermark(AsyncDataStream.OutputMode mode) throws
Exception {
+ final AsyncWaitOperator<Integer, Integer> operator = new
AsyncWaitOperator<>(new MyAsyncFunction());
+ operator.setOutputMode(mode);
+ operator.setBufferSize(6);
+ operator.setInStreamElementSerializerForTest(new
StreamElementSerializer<>(IntSerializer.INSTANCE));
+
+ final OneInputStreamOperatorTestHarness<Integer, Integer>
testHarness =
+ new
OneInputStreamOperatorTestHarness<>(operator);
+
+ final long initialTime = 0L;
+ final ConcurrentLinkedQueue<Object> expectedOutput = new
ConcurrentLinkedQueue<>();
+
+ testHarness.open();
+
+ testHarness.processElement(new StreamRecord<>(1, initialTime +
1));
+ testHarness.processElement(new StreamRecord<>(2, initialTime +
2));
+ testHarness.processWatermark(new Watermark(initialTime + 2));
+ testHarness.processElement(new StreamRecord<>(3, initialTime +
3));
+
+ // wait until all async collectors in the buffer have been
emitted out.
+ synchronized (testHarness.getCheckpointLock()) {
+ testHarness.close();
+ }
+
+ expectedOutput.add(new StreamRecord<>(2, initialTime + 1));
+ expectedOutput.add(new StreamRecord<>(4, initialTime + 2));
+ expectedOutput.add(new Watermark(initialTime + 2));
+ expectedOutput.add(new StreamRecord<>(6, initialTime + 3));
+
+ if (AsyncDataStream.OutputMode.ORDERED == mode) {
+ TestHarnessUtil.assertOutputEquals("Output with
watermark was not correct.", expectedOutput, testHarness.getOutput());
+ }
+ else {
+ Assert.assertTrue(expectedOutput.size() == 4);
+ Assert.assertTrue(expectedOutput.toArray(new
StreamElement[4])[2].asWatermark().getTimestamp() == (initialTime+2));
--- End diff --
Can't we make this check a little bit more exhaustive?
> Provide support for asynchronous operations over streams
> --------------------------------------------------------
>
> Key: FLINK-4391
> URL: https://issues.apache.org/jira/browse/FLINK-4391
> Project: Flink
> Issue Type: New Feature
> Components: DataStream API
> Reporter: Jamie Grier
> Assignee: david.wang
>
> Many Flink users need to do asynchronous processing driven by data from a
> DataStream. The classic example would be joining against an external
> database in order to enrich a stream with extra information.
> It would be nice to add general support for this type of operation in the
> Flink API. Ideally this could simply take the form of a new operator that
> manages async operations, keeps so many of them in flight, and then emits
> results to downstream operators as the async operations complete.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)