[ 
https://issues.apache.org/jira/browse/FLINK-4391?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15683329#comment-15683329
 ] 

ASF GitHub Bot commented on FLINK-4391:
---------------------------------------

Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2629#discussion_r88867342
  
    --- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
 ---
    @@ -0,0 +1,359 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.api.operators.async;
    +
    +import org.apache.flink.api.common.ExecutionConfig;
    +import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
    +import org.apache.flink.api.common.typeutils.base.IntSerializer;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.core.testutils.OneShotLatch;
    +import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
    +import org.apache.flink.runtime.checkpoint.SubtaskState;
    +import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
    +import org.apache.flink.runtime.state.TaskStateHandles;
    +import org.apache.flink.streaming.api.datastream.AsyncDataStream;
    +import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
    +import org.apache.flink.streaming.api.graph.StreamConfig;
    +import org.apache.flink.streaming.api.watermark.Watermark;
    +import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
    +import 
org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
    +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
    +import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask;
    +import 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTaskTestHarness;
    +import org.apache.flink.streaming.runtime.tasks.StreamMockEnvironment;
    +import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
    +import org.apache.flink.streaming.util.TestHarnessUtil;
    +import org.junit.Assert;
    +import org.junit.Test;
    +
    +import java.util.ArrayList;
    +import java.util.Comparator;
    +import java.util.List;
    +import java.util.Random;
    +import java.util.concurrent.ConcurrentLinkedQueue;
    +import java.util.concurrent.ExecutorService;
    +import java.util.concurrent.Executors;
    +import java.util.concurrent.TimeUnit;
    +
    +import static org.junit.Assert.assertEquals;
    +
    +/**
    + * Tests for {@link AsyncWaitOperator}. These test that:
    + *
    + * <ul>
    + *     <li>Process StreamRecords and Watermarks in ORDERED mode</li>
    + *     <li>Process StreamRecords and Watermarks in UNORDERED mode</li>
    + *     <li>Snapshot state and restore state</li>
    + * </ul>
    + */
    +public class AsyncWaitOperatorTest {
    +
    +   public static class MyAsyncFunction extends RichAsyncFunction<Integer, 
Integer> {
    +           transient final int SLEEP_FACTOR = 100;
    +           transient final int THREAD_POOL_SIZE = 10;
    +           transient ExecutorService executorService;
    +           transient Random random;
    +
    +           @Override
    +           public void open(Configuration parameters) throws Exception {
    +                   super.open(parameters);
    +                   executorService = 
Executors.newFixedThreadPool(THREAD_POOL_SIZE);
    +                   random = new Random();
    +           }
    +
    +           @Override
    +           public void close() throws Exception {
    +                   super.close();
    +                   executorService.shutdown();
    +                   
executorService.awaitTermination(SLEEP_FACTOR*THREAD_POOL_SIZE, 
TimeUnit.MILLISECONDS);
    +           }
    +
    +           @Override
    +           public void asyncInvoke(final Integer input, final 
AsyncCollector<Integer, Integer> collector) throws Exception {
    +                   this.executorService.submit(new Runnable() {
    +                           @Override
    +                           public void run() {
    +                                   // wait for while to simulate async 
operation here
    +                                   int sleep = (int) (random.nextFloat() * 
SLEEP_FACTOR);
    +                                   try {
    +                                           Thread.sleep(sleep);
    +                                           List<Integer> ret = new 
ArrayList<>();
    +                                           ret.add(input*2);
    +                                           collector.collect(ret);
    +                                   }
    +                                   catch (InterruptedException e) {
    +                                   }
    +                           }
    +                   });
    +           }
    +   }
    +
    +   @Test
    +   public void testWaterMarkOrdered() throws Exception {
    +           testWithWatermark(AsyncDataStream.OutputMode.ORDERED);
    +   }
    +
    +   @Test
    +   public void testWaterMarkUnordered() throws Exception {
    +           testWithWatermark(AsyncDataStream.OutputMode.UNORDERED);
    +   }
    +
    +   private void testWithWatermark(AsyncDataStream.OutputMode mode) throws 
Exception {
    +           final AsyncWaitOperator<Integer, Integer> operator = new 
AsyncWaitOperator<>(new MyAsyncFunction());
    +           operator.setOutputMode(mode);
    +           operator.setBufferSize(6);
    +           operator.setInStreamElementSerializerForTest(new 
StreamElementSerializer<>(IntSerializer.INSTANCE));
    +
    +           final OneInputStreamOperatorTestHarness<Integer, Integer> 
testHarness =
    +                           new 
OneInputStreamOperatorTestHarness<>(operator);
    +
    +           final long initialTime = 0L;
    +           final ConcurrentLinkedQueue<Object> expectedOutput = new 
ConcurrentLinkedQueue<>();
    +
    +           testHarness.open();
    +
    +           testHarness.processElement(new StreamRecord<>(1, initialTime + 
1));
    +           testHarness.processElement(new StreamRecord<>(2, initialTime + 
2));
    +           testHarness.processWatermark(new Watermark(initialTime + 2));
    +           testHarness.processElement(new StreamRecord<>(3, initialTime + 
3));
    +
    +           // wait until all async collectors in the buffer have been 
emitted out.
    +           synchronized (testHarness.getCheckpointLock()) {
    +                   testHarness.close();
    +           }
    +
    +           expectedOutput.add(new StreamRecord<>(2, initialTime + 1));
    +           expectedOutput.add(new StreamRecord<>(4, initialTime + 2));
    +           expectedOutput.add(new Watermark(initialTime + 2));
    +           expectedOutput.add(new StreamRecord<>(6, initialTime + 3));
    +
    +           if (AsyncDataStream.OutputMode.ORDERED == mode) {
    +                   TestHarnessUtil.assertOutputEquals("Output with 
watermark was not correct.", expectedOutput, testHarness.getOutput());
    +           }
    +           else {
    +                   Assert.assertTrue(expectedOutput.size() == 4);
    +                   Assert.assertTrue(expectedOutput.toArray(new 
StreamElement[4])[2].asWatermark().getTimestamp() == (initialTime+2));
    +           }
    +   }
    +
    +   @Test
    +   public void testOrdered() throws Exception {
    +           testRun(AsyncDataStream.OutputMode.ORDERED);
    +   }
    +
    +   @Test
    +   public void testUnordered() throws Exception {
    +           testRun(AsyncDataStream.OutputMode.UNORDERED);
    +   }
    +
    +   private void testRun(AsyncDataStream.OutputMode mode) throws Exception {
    +           final OneInputStreamTask<Integer, Integer> task = new 
OneInputStreamTask<>();
    +           final OneInputStreamTaskTestHarness<Integer, Integer> 
testHarness =
    +                           new OneInputStreamTaskTestHarness<>(task, 1, 1, 
BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO);
    +
    +           final AsyncWaitOperator<Integer, Integer> operator = new 
AsyncWaitOperator<>(new MyAsyncFunction());
    +           operator.setOutputMode(mode);
    +           operator.setBufferSize(6);
    +
    +           final StreamConfig streamConfig = testHarness.getStreamConfig();
    +           streamConfig.setStreamOperator(operator);
    +
    +           testHarness.invoke();
    +           testHarness.waitForTaskRunning();
    +
    +           final long initialTime = 0L;
    +           final ConcurrentLinkedQueue<Object> expectedOutput = new 
ConcurrentLinkedQueue<Object>();
    +
    +           testHarness.processElement(new StreamRecord<>(1, initialTime + 
1));
    +           testHarness.processElement(new StreamRecord<>(2, initialTime + 
2));
    +           testHarness.processElement(new StreamRecord<>(3, initialTime + 
3));
    +           testHarness.processElement(new StreamRecord<>(4, initialTime + 
4));
    +           testHarness.processElement(new StreamRecord<>(5, initialTime + 
5));
    +           testHarness.processElement(new StreamRecord<>(6, initialTime + 
6));
    +           testHarness.processElement(new StreamRecord<>(7, initialTime + 
7));
    +           testHarness.processElement(new StreamRecord<>(8, initialTime + 
8));
    +
    +           expectedOutput.add(new StreamRecord<>(2, initialTime + 1));
    +           expectedOutput.add(new StreamRecord<>(4, initialTime + 2));
    +           expectedOutput.add(new StreamRecord<>(6, initialTime + 3));
    +           expectedOutput.add(new StreamRecord<>(8, initialTime + 4));
    +           expectedOutput.add(new StreamRecord<>(10, initialTime + 5));
    +           expectedOutput.add(new StreamRecord<>(12, initialTime + 6));
    +           expectedOutput.add(new StreamRecord<>(14, initialTime + 7));
    +           expectedOutput.add(new StreamRecord<>(16, initialTime + 8));
    +
    +           testHarness.waitForInputProcessing();
    +
    +           testHarness.endInput();
    +
    +           testHarness.waitForTaskCompletion();
    +
    +           if (mode == AsyncDataStream.OutputMode.ORDERED) {
    +                   TestHarnessUtil.assertOutputEquals("ORDERED Output was 
not correct.", expectedOutput, testHarness.getOutput());
    +           }
    +           else {
    +                   TestHarnessUtil.assertOutputEqualsSorted(
    +                                   "UNORDERED Output was not correct.",
    +                                   expectedOutput,
    +                                   testHarness.getOutput(),
    +                                   new Comparator<Object>() {
    +                                           @Override
    +                                           public int compare(Object o1, 
Object o2) {
    +                                                   Integer val1 = 
((StreamRecord<Integer>)o1).getValue();
    +                                                   Integer val2 = 
((StreamRecord<Integer>)o2).getValue();
    +                                                   if (val1 > val2) {
    +                                                           return 1;
    +                                                   }
    +                                                   else if (val1 == val2) {
    +                                                           return 0;
    +                                                   }
    +                                                   else {
    +                                                           return -1;
    +                                                   }
    +                                           }
    +                                   });
    +           }
    +   }
    +
    +   @Test
    +   public void testStateSnapshotAndRestore() throws Exception {
    +           final OneInputStreamTask<Integer, Integer> task = new 
OneInputStreamTask<>();
    +           final OneInputStreamTaskTestHarness<Integer, Integer> 
testHarness =
    +                           new OneInputStreamTaskTestHarness<>(task, 1, 1, 
BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO);
    +
    +           final AsyncWaitOperator<Integer, Integer> operator = new 
AsyncWaitOperator<>(new MyAsyncFunction());
    +           operator.setOutputMode(AsyncDataStream.OutputMode.ORDERED);
    +           operator.setBufferSize(6);
    +           // disable starting emitter thread, so that all inputs will be 
kept in the buffer, which will be
    +           // snapshotted into operator state
    +           operator.setEmitFlag(false);
    +
    +           final StreamConfig streamConfig = testHarness.getStreamConfig();
    +           streamConfig.setStreamOperator(operator);
    +
    +           final AcknowledgeStreamMockEnvironment env = new 
AcknowledgeStreamMockEnvironment(
    +                           testHarness.jobConfig,
    +                           testHarness.taskConfig,
    +                           testHarness.getExecutionConfig(),
    +                           testHarness.memorySize,
    +                           new MockInputSplitProvider(),
    +                           testHarness.bufferSize);
    +
    +           testHarness.invoke(env);
    +           testHarness.waitForTaskRunning();
    +
    +           final long initialTime = 0L;
    +
    +           testHarness.processElement(new StreamRecord<>(1, initialTime + 
1));
    +           testHarness.processElement(new StreamRecord<>(2, initialTime + 
2));
    +           testHarness.processElement(new StreamRecord<>(3, initialTime + 
3));
    +           testHarness.processElement(new StreamRecord<>(4, initialTime + 
4));
    +
    +           testHarness.waitForInputProcessing();
    +
    +           final long checkpointId = 1L;
    +           final long checkpointTimestamp = 1L;
    +
    +           final CheckpointMetaData checkpointMetaData = new 
CheckpointMetaData(checkpointId, checkpointTimestamp);
    +
    +           while(!task.triggerCheckpoint(checkpointMetaData));
    +
    +           env.getCheckpointLatch().await();
    +
    +           assertEquals(checkpointId, env.getCheckpointId());
    +
    +           testHarness.endInput();
    +           testHarness.waitForTaskCompletion();
    +
    +           env.getCheckpointStateHandles();
    +
    +           // set the operator state from previous attempt into the 
restored one
    +           final OneInputStreamTask<Integer, Integer> restoredTask = new 
OneInputStreamTask<>();
    +           restoredTask.setInitialState(new 
TaskStateHandles(env.getCheckpointStateHandles()));
    +
    +           final OneInputStreamTaskTestHarness<Integer, Integer> 
restoredTaskHarness =
    +                           new 
OneInputStreamTaskTestHarness<>(restoredTask, BasicTypeInfo.INT_TYPE_INFO, 
BasicTypeInfo.INT_TYPE_INFO);
    +
    +           final AsyncWaitOperator<Integer, Integer> restoredOperator = 
new AsyncWaitOperator<>(new MyAsyncFunction());
    +           
restoredOperator.setOutputMode(AsyncDataStream.OutputMode.ORDERED);
    +           restoredOperator.setBufferSize(6);
    +
    +           
restoredTaskHarness.getStreamConfig().setStreamOperator(restoredOperator);
    +
    +           restoredTaskHarness.invoke();
    +           restoredTaskHarness.waitForTaskRunning();
    +
    +           restoredTaskHarness.processElement(new StreamRecord<>(5, 
initialTime + 5));
    +           restoredTaskHarness.processElement(new StreamRecord<>(6, 
initialTime + 6));
    +           restoredTaskHarness.processElement(new StreamRecord<>(7, 
initialTime + 7));
    +           restoredTaskHarness.processElement(new StreamRecord<>(8, 
initialTime + 8));
    +
    +           restoredTaskHarness.endInput();
    +           restoredTaskHarness.waitForTaskCompletion();
    +
    +           ConcurrentLinkedQueue<Object> expectedOutput = new 
ConcurrentLinkedQueue<>();
    +           expectedOutput.add(new StreamRecord<>(2, initialTime + 1));
    +           expectedOutput.add(new StreamRecord<>(4, initialTime + 2));
    +           expectedOutput.add(new StreamRecord<>(6, initialTime + 3));
    +           expectedOutput.add(new StreamRecord<>(8, initialTime + 4));
    +           expectedOutput.add(new StreamRecord<>(10, initialTime + 5));
    +           expectedOutput.add(new StreamRecord<>(12, initialTime + 6));
    +           expectedOutput.add(new StreamRecord<>(14, initialTime + 7));
    +           expectedOutput.add(new StreamRecord<>(16, initialTime + 8));
    +
    +           TestHarnessUtil.assertOutputEquals("StateAndRestored Test 
Output was not correct.", expectedOutput, restoredTaskHarness.getOutput());
    +   }
    --- End diff --
    
    What about checkpointing tests where some elements have been processed and 
others not? That way we can see that the processed elements are properly 
removed from the internal state.


> Provide support for asynchronous operations over streams
> --------------------------------------------------------
>
>                 Key: FLINK-4391
>                 URL: https://issues.apache.org/jira/browse/FLINK-4391
>             Project: Flink
>          Issue Type: New Feature
>          Components: DataStream API
>            Reporter: Jamie Grier
>            Assignee: david.wang
>
> Many Flink users need to do asynchronous processing driven by data from a 
> DataStream.  The classic example would be joining against an external 
> database in order to enrich a stream with extra information.
> It would be nice to add general support for this type of operation in the 
> Flink API.  Ideally this could simply take the form of a new operator that 
> manages async operations, keeps so many of them in flight, and then emits 
> results to downstream operators as the async operations complete.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to