Repository: beam
Updated Branches:
  refs/heads/gearpump-runner 99221e739 -> 559e3c341


[BEAM-1779] Port UnboundedSourceWrapperTest to use Flink operator test harness


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/c1dc8f53
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/c1dc8f53
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/c1dc8f53

Branch: refs/heads/gearpump-runner
Commit: c1dc8f53c5438b575a7e84e9f680616ead49d61e
Parents: 62b942a
Author: Aljoscha Krettek <aljoscha.kret...@gmail.com>
Authored: Wed Mar 22 11:43:30 2017 +0100
Committer: Aljoscha Krettek <aljoscha.kret...@gmail.com>
Committed: Wed Jun 7 19:43:11 2017 +0200

----------------------------------------------------------------------
 .../flink/streaming/TestCountingSource.java     |  48 +++--
 .../streaming/UnboundedSourceWrapperTest.java   | 198 +++++++------------
 2 files changed, 110 insertions(+), 136 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/c1dc8f53/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/TestCountingSource.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/TestCountingSource.java
 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/TestCountingSource.java
index 3a08088..edf548a 100644
--- 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/TestCountingSource.java
+++ 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/TestCountingSource.java
@@ -133,18 +133,8 @@ public class TestCountingSource
   public Coder<CounterMark> getCheckpointMarkCoder() {
     return DelegateCoder.of(
         VarIntCoder.of(),
-        new DelegateCoder.CodingFunction<CounterMark, Integer>() {
-          @Override
-          public Integer apply(CounterMark input) {
-            return input.current;
-          }
-        },
-        new DelegateCoder.CodingFunction<Integer, CounterMark>() {
-          @Override
-          public CounterMark apply(Integer input) {
-            return new CounterMark(input);
-          }
-        });
+        new FromCounterMark(),
+        new ToCounterMark());
   }
 
   @Override
@@ -251,4 +241,38 @@ public class TestCountingSource
   public Coder<KV<Integer, Integer>> getDefaultOutputCoder() {
     return KvCoder.of(VarIntCoder.of(), VarIntCoder.of());
   }
+
+  private class FromCounterMark implements 
DelegateCoder.CodingFunction<CounterMark, Integer> {
+    @Override
+    public Integer apply(CounterMark input) {
+      return input.current;
+    }
+
+    @Override
+    public int hashCode() {
+      return FromCounterMark.class.hashCode();
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      return obj instanceof FromCounterMark;
+    }
+  }
+
+  private class ToCounterMark implements DelegateCoder.CodingFunction<Integer, 
CounterMark> {
+    @Override
+    public CounterMark apply(Integer input) {
+      return new CounterMark(input);
+    }
+
+    @Override
+    public int hashCode() {
+      return ToCounterMark.class.hashCode();
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      return obj instanceof ToCounterMark;
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/c1dc8f53/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java
 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java
index e3875bc..716e71d 100644
--- 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java
+++ 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java
@@ -20,36 +20,20 @@ package org.apache.beam.runners.flink.streaming;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
 
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.HashSet;
-import java.util.List;
 import java.util.Set;
 import 
org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper;
 import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.io.UnboundedSource;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.ValueWithRecordId;
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.accumulators.Accumulator;
-import org.apache.flink.api.common.state.ListState;
-import org.apache.flink.api.common.state.ListStateDescriptor;
-import org.apache.flink.api.common.state.OperatorStateStore;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.execution.Environment;
-import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
-import org.apache.flink.runtime.state.StateInitializationContext;
-import org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl;
 import org.apache.flink.streaming.api.TimeCharacteristic;
-import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.operators.Output;
 import org.apache.flink.streaming.api.operators.StreamSource;
 import org.apache.flink.streaming.api.watermark.Watermark;
@@ -57,15 +41,14 @@ import 
org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.streamstatus.StreamStatus;
 import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer;
-import org.apache.flink.streaming.runtime.tasks.StreamTask;
-import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
+import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
+import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
 import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.OutputTag;
 import org.junit.Test;
 import org.junit.experimental.runners.Enclosed;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
-import org.mockito.Matchers;
 
 /**
  * Tests for {@link UnboundedSourceWrapper}.
@@ -125,10 +108,18 @@ public class UnboundedSourceWrapperTest {
               KV<Integer, Integer>,
               TestCountingSource.CounterMark>> sourceOperator = new 
StreamSource<>(flinkWrapper);
 
-      setupSourceOperator(sourceOperator, numTasks);
+      
AbstractStreamOperatorTestHarness<WindowedValue<ValueWithRecordId<KV<Integer, 
Integer>>>>
+          testHarness =
+          new AbstractStreamOperatorTestHarness<>(
+              sourceOperator,
+              numTasks /* max parallelism */,
+              numTasks /* parallelism */,
+              0 /* subtask index */);
+
+      testHarness.setTimeCharacteristic(TimeCharacteristic.EventTime);
 
       try {
-        sourceOperator.open();
+        testHarness.open();
         sourceOperator.run(checkpointLock,
             new TestStreamStatusMaintainer(),
             new 
Output<StreamRecord<WindowedValue<ValueWithRecordId<KV<Integer, Integer>>>>>() {
@@ -200,29 +191,22 @@ public class UnboundedSourceWrapperTest {
               TestCountingSource.CounterMark>> sourceOperator = new 
StreamSource<>(flinkWrapper);
 
 
-      OperatorStateStore backend = mock(OperatorStateStore.class);
-
-      TestingListState<KV<UnboundedSource, TestCountingSource.CounterMark>>
-          listState = new TestingListState<>();
-
-      when(backend.getOperatorState(Matchers.any(ListStateDescriptor.class)))
-          .thenReturn(listState);
-
-      StateInitializationContext initializationContext = 
mock(StateInitializationContext.class);
-
-      when(initializationContext.getOperatorStateStore()).thenReturn(backend);
-      when(initializationContext.isRestored()).thenReturn(false, true);
+      
AbstractStreamOperatorTestHarness<WindowedValue<ValueWithRecordId<KV<Integer, 
Integer>>>>
+          testHarness =
+          new AbstractStreamOperatorTestHarness<>(
+              sourceOperator,
+              numTasks /* max parallelism */,
+              numTasks /* parallelism */,
+              0 /* subtask index */);
 
-      flinkWrapper.initializeState(initializationContext);
-
-      setupSourceOperator(sourceOperator, numTasks);
+      testHarness.setTimeCharacteristic(TimeCharacteristic.EventTime);
 
       final Set<KV<Integer, Integer>> emittedElements = new HashSet<>();
 
       boolean readFirstBatchOfElements = false;
 
       try {
-        sourceOperator.open();
+        testHarness.open();
         sourceOperator.run(checkpointLock,
             new TestStreamStatusMaintainer(),
             new 
Output<StreamRecord<WindowedValue<ValueWithRecordId<KV<Integer, Integer>>>>>() {
@@ -265,21 +249,12 @@ public class UnboundedSourceWrapperTest {
       assertTrue("Did not successfully read first batch of elements.", 
readFirstBatchOfElements);
 
       // draw a snapshot
-      flinkWrapper.snapshotState(new StateSnapshotContextSynchronousImpl(0, 
0));
-
-      // test snapshot offsets
-      assertEquals(flinkWrapper.getLocalSplitSources().size(),
-          listState.getList().size());
-      int totalEmit = 0;
-      for (KV<UnboundedSource, TestCountingSource.CounterMark> kv : 
listState.get()) {
-        totalEmit += kv.getValue().current + 1;
-      }
-      assertEquals(numElements / 2, totalEmit);
+      OperatorStateHandles snapshot = testHarness.snapshot(0, 0);
 
       // test that finalizeCheckpoint on CheckpointMark is called
       final ArrayList<Integer> finalizeList = new ArrayList<>();
       TestCountingSource.setFinalizeTracker(finalizeList);
-      flinkWrapper.notifyCheckpointComplete(0);
+      testHarness.notifyOfCompletedCheckpoint(0);
       assertEquals(flinkWrapper.getLocalSplitSources().size(), 
finalizeList.size());
 
       // create a completely new source but restore from the snapshot
@@ -297,16 +272,25 @@ public class UnboundedSourceWrapperTest {
               TestCountingSource.CounterMark>> restoredSourceOperator =
           new StreamSource<>(restoredFlinkWrapper);
 
-      setupSourceOperator(restoredSourceOperator, numTasks);
+      // set parallelism to 1 to ensure that our testing operator gets all 
checkpointed state
+      
AbstractStreamOperatorTestHarness<WindowedValue<ValueWithRecordId<KV<Integer, 
Integer>>>>
+          restoredTestHarness =
+          new AbstractStreamOperatorTestHarness<>(
+              restoredSourceOperator,
+              numTasks /* max parallelism */,
+              1 /* parallelism */,
+              0 /* subtask index */);
+
+      restoredTestHarness.setTimeCharacteristic(TimeCharacteristic.EventTime);
 
       // restore snapshot
-      restoredFlinkWrapper.initializeState(initializationContext);
+      restoredTestHarness.initializeState(snapshot);
 
       boolean readSecondBatchOfElements = false;
 
       // run again and verify that we see the other elements
       try {
-        restoredSourceOperator.open();
+        restoredTestHarness.open();
         restoredSourceOperator.run(checkpointLock,
             new TestStreamStatusMaintainer(),
             new 
Output<StreamRecord<WindowedValue<ValueWithRecordId<KV<Integer, Integer>>>>>() {
@@ -345,7 +329,9 @@ public class UnboundedSourceWrapperTest {
         readSecondBatchOfElements = true;
       }
 
-      assertEquals(Math.max(1, numSplits / numTasks), 
flinkWrapper.getLocalSplitSources().size());
+      assertEquals(
+          Math.max(1, numSplits / numTasks),
+          restoredFlinkWrapper.getLocalSplitSources().size());
 
       assertTrue("Did not successfully read second batch of elements.", 
readSecondBatchOfElements);
 
@@ -364,68 +350,57 @@ public class UnboundedSourceWrapperTest {
           return null;
         }
       };
+
       UnboundedSourceWrapper<KV<Integer, Integer>, 
TestCountingSource.CounterMark> flinkWrapper =
           new UnboundedSourceWrapper<>("stepName", options, source, numSplits);
 
-      OperatorStateStore backend = mock(OperatorStateStore.class);
-
-      TestingListState<KV<UnboundedSource, TestCountingSource.CounterMark>>
-          listState = new TestingListState<>();
-
-      when(backend.getOperatorState(Matchers.any(ListStateDescriptor.class)))
-          .thenReturn(listState);
-
-      StateInitializationContext initializationContext = 
mock(StateInitializationContext.class);
-
-      when(initializationContext.getOperatorStateStore()).thenReturn(backend);
-      when(initializationContext.isRestored()).thenReturn(false, true);
+      StreamSource<
+          WindowedValue<ValueWithRecordId<KV<Integer, Integer>>>,
+          UnboundedSourceWrapper<KV<Integer, Integer>, 
TestCountingSource.CounterMark>>
+          sourceOperator = new StreamSource<>(flinkWrapper);
 
-      flinkWrapper.initializeState(initializationContext);
+      
AbstractStreamOperatorTestHarness<WindowedValue<ValueWithRecordId<KV<Integer, 
Integer>>>>
+          testHarness =
+          new AbstractStreamOperatorTestHarness<>(
+              sourceOperator,
+              numTasks /* max parallelism */,
+              numTasks /* parallelism */,
+              0 /* subtask index */);
 
-      StreamSource sourceOperator = new StreamSource<>(flinkWrapper);
-      setupSourceOperator(sourceOperator, numTasks);
-      sourceOperator.open();
+      testHarness.setTimeCharacteristic(TimeCharacteristic.EventTime);
 
-      flinkWrapper.snapshotState(new StateSnapshotContextSynchronousImpl(0, 
0));
+      testHarness.open();
 
-      assertEquals(0, listState.getList().size());
+      OperatorStateHandles snapshot = testHarness.snapshot(0, 0);
 
       UnboundedSourceWrapper<
           KV<Integer, Integer>, TestCountingSource.CounterMark> 
restoredFlinkWrapper =
-          new UnboundedSourceWrapper<>("stepName", options, new 
TestCountingSource(numElements),
-              numSplits);
-
-      StreamSource restoredSourceOperator = new StreamSource<>(flinkWrapper);
-      setupSourceOperator(restoredSourceOperator, numTasks);
-      sourceOperator.open();
-
-      restoredFlinkWrapper.initializeState(initializationContext);
-
-      assertEquals(Math.max(1, numSplits / numTasks), 
flinkWrapper.getLocalSplitSources().size());
-
-    }
+          new UnboundedSourceWrapper<>(
+              "stepName", options, new TestCountingSource(numElements), 
numSplits);
 
-    @SuppressWarnings("unchecked")
-    private static <T> void setupSourceOperator(StreamSource<T, ?> operator, 
int numSubTasks) {
-      ExecutionConfig executionConfig = new ExecutionConfig();
-      StreamConfig cfg = new StreamConfig(new Configuration());
+      StreamSource<
+          WindowedValue<ValueWithRecordId<KV<Integer, Integer>>>,
+          UnboundedSourceWrapper<KV<Integer, Integer>, 
TestCountingSource.CounterMark>>
+          restoredSourceOperator =
+          new StreamSource<>(restoredFlinkWrapper);
 
-      cfg.setTimeCharacteristic(TimeCharacteristic.EventTime);
+      // set parallelism to 1 to ensure that our testing operator gets all 
checkpointed state
+      
AbstractStreamOperatorTestHarness<WindowedValue<ValueWithRecordId<KV<Integer, 
Integer>>>>
+          restoredTestHarness =
+          new AbstractStreamOperatorTestHarness<>(
+              restoredSourceOperator,
+              numTasks /* max parallelism */,
+              1 /* parallelism */,
+              0 /* subtask index */);
 
-      Environment env = new DummyEnvironment("MockTwoInputTask", numSubTasks, 
0);
+      restoredTestHarness.setup();
+      restoredTestHarness.initializeState(snapshot);
+      restoredTestHarness.open();
 
-      StreamTask<?, ?> mockTask = mock(StreamTask.class);
-      when(mockTask.getName()).thenReturn("Mock Task");
-      when(mockTask.getCheckpointLock()).thenReturn(new Object());
-      when(mockTask.getConfiguration()).thenReturn(cfg);
-      when(mockTask.getEnvironment()).thenReturn(env);
-      when(mockTask.getExecutionConfig()).thenReturn(executionConfig);
-      when(mockTask.getAccumulatorMap())
-          .thenReturn(Collections.<String, Accumulator<?, ?>>emptyMap());
-      TestProcessingTimeService testProcessingTimeService = new 
TestProcessingTimeService();
-      
when(mockTask.getProcessingTimeService()).thenReturn(testProcessingTimeService);
+      // when the source checkpointed a null we don't re-initialize the 
splits, that is we
+      // will have no splits.
+      assertEquals(0, restoredFlinkWrapper.getLocalSplitSources().size());
 
-      operator.setup(mockTask, cfg, (Output<StreamRecord<T>>) 
mock(Output.class));
     }
 
     /**
@@ -458,31 +433,6 @@ public class UnboundedSourceWrapperTest {
 
   }
 
-  private static final class TestingListState<T> implements ListState<T> {
-
-    private final List<T> list = new ArrayList<>();
-
-    @Override
-    public void clear() {
-      list.clear();
-    }
-
-    @Override
-    public Iterable<T> get() throws Exception {
-      return list;
-    }
-
-    @Override
-    public void add(T value) throws Exception {
-      list.add(value);
-    }
-
-    public List<T> getList() {
-      return list;
-    }
-
-  }
-
   private static final class TestStreamStatusMaintainer implements 
StreamStatusMaintainer {
     StreamStatus currentStreamStatus = StreamStatus.ACTIVE;
 

Reply via email to