Repository: incubator-beam
Updated Branches:
  refs/heads/apex-runner 9197d1e05 -> c08ebbe79


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5553c603/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/ParDoBoundTranslatorTest.java
----------------------------------------------------------------------
diff --git 
a/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/ParDoBoundTranslatorTest.java
 
b/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/ParDoBoundTranslatorTest.java
deleted file mode 100644
index 2379a9e..0000000
--- 
a/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/ParDoBoundTranslatorTest.java
+++ /dev/null
@@ -1,340 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.apex.translators;
-
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import com.datatorrent.api.DAG;
-import com.datatorrent.api.Sink;
-import com.datatorrent.lib.util.KryoCloneUtils;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.regex.Pattern;
-
-import org.apache.beam.runners.apex.ApexPipelineOptions;
-import org.apache.beam.runners.apex.ApexRunner;
-import org.apache.beam.runners.apex.ApexRunnerResult;
-import org.apache.beam.runners.apex.TestApexRunner;
-import org.apache.beam.runners.apex.translators.functions.ApexParDoOperator;
-import 
org.apache.beam.runners.apex.translators.io.ApexReadUnboundedInputOperator;
-import org.apache.beam.runners.apex.translators.utils.ApexStateInternals;
-import org.apache.beam.runners.apex.translators.utils.ApexStreamTuple;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.SerializableCoder;
-import org.apache.beam.sdk.coders.VarIntCoder;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.OldDoFn;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.Sum;
-import org.apache.beam.sdk.transforms.View;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.util.WindowingStrategy;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionTuple;
-import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.TupleTag;
-import org.apache.beam.sdk.values.TupleTagList;
-import org.junit.Assert;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * integration test for {@link ParDoBoundTranslator}.
- */
-@RunWith(JUnit4.class)
-public class ParDoBoundTranslatorTest {
-  private static final Logger LOG = 
LoggerFactory.getLogger(ParDoBoundTranslatorTest.class);
-  private static final long SLEEP_MILLIS = 500;
-  private static final long TIMEOUT_MILLIS = 30000;
-
-  @Test
-  public void test() throws Exception {
-    ApexPipelineOptions options = PipelineOptionsFactory.create()
-        .as(ApexPipelineOptions.class);
-    options.setApplicationName("ParDoBound");
-    options.setRunner(ApexRunner.class);
-
-    Pipeline p = Pipeline.create(options);
-
-    List<Integer> collection = Lists.newArrayList(1, 2, 3, 4, 5);
-    List<Integer> expected = Lists.newArrayList(6, 7, 8, 9, 10);
-    
p.apply(Create.of(collection).withCoder(SerializableCoder.of(Integer.class)))
-        .apply(ParDo.of(new Add(5)))
-        .apply(ParDo.of(new EmbeddedCollector()));
-
-    ApexRunnerResult result = (ApexRunnerResult) p.run();
-    DAG dag = result.getApexDAG();
-
-    DAG.OperatorMeta om = dag.getOperatorMeta("Create.Values");
-    Assert.assertNotNull(om);
-    Assert.assertEquals(om.getOperator().getClass(), 
ApexReadUnboundedInputOperator.class);
-
-    om = dag.getOperatorMeta("ParDo(Add)");
-    Assert.assertNotNull(om);
-    Assert.assertEquals(om.getOperator().getClass(), ApexParDoOperator.class);
-
-    long timeout = System.currentTimeMillis() + TIMEOUT_MILLIS;
-    while (System.currentTimeMillis() < timeout) {
-      if (EmbeddedCollector.RESULTS.containsAll(expected)) {
-        break;
-      }
-      LOG.info("Waiting for expected results.");
-      Thread.sleep(SLEEP_MILLIS);
-    }
-    Assert.assertEquals(Sets.newHashSet(expected), EmbeddedCollector.RESULTS);
-  }
-
-  @SuppressWarnings("serial")
-  private static class Add extends OldDoFn<Integer, Integer> {
-    private Integer number;
-    private PCollectionView<Integer> sideInputView;
-
-    private Add(Integer number) {
-      this.number = number;
-    }
-
-    private Add(PCollectionView<Integer> sideInputView) {
-      this.sideInputView = sideInputView;
-    }
-
-    @Override
-    public void processElement(ProcessContext c) throws Exception {
-      if (sideInputView != null) {
-        number = c.sideInput(sideInputView);
-      }
-      c.output(c.element() + number);
-    }
-  }
-
-  private static class EmbeddedCollector extends OldDoFn<Object, Void> {
-    private static final long serialVersionUID = 1L;
-    protected static final HashSet<Object> RESULTS = new HashSet<>();
-
-    public EmbeddedCollector() {
-      RESULTS.clear();
-    }
-
-    @Override
-    public void processElement(ProcessContext c) throws Exception {
-      RESULTS.add(c.element());
-    }
-  }
-
-  private static Throwable runExpectingAssertionFailure(Pipeline pipeline) {
-    // We cannot use thrown.expect(AssertionError.class) because the 
AssertionError
-    // is first caught by JUnit and causes a test failure.
-    try {
-      pipeline.run();
-    } catch (AssertionError exc) {
-      return exc;
-    }
-    fail("assertion should have failed");
-    throw new RuntimeException("unreachable");
-  }
-
-  @Test
-  public void testAssertionFailure() throws Exception {
-    ApexPipelineOptions options = PipelineOptionsFactory.create()
-        .as(ApexPipelineOptions.class);
-    options.setRunner(TestApexRunner.class);
-    Pipeline pipeline = Pipeline.create(options);
-
-    PCollection<Integer> pcollection = pipeline
-        .apply(Create.of(1, 2, 3, 4));
-    PAssert.that(pcollection).containsInAnyOrder(2, 1, 4, 3, 7);
-
-    Throwable exc = runExpectingAssertionFailure(pipeline);
-    Pattern expectedPattern = Pattern.compile(
-        "Expected: iterable over \\[((<4>|<7>|<3>|<2>|<1>)(, )?){5}\\] in any 
order");
-    // A loose pattern, but should get the job done.
-    assertTrue(
-        "Expected error message from PAssert with substring matching "
-            + expectedPattern
-            + " but the message was \""
-            + exc.getMessage()
-            + "\"",
-        expectedPattern.matcher(exc.getMessage()).find());
-  }
-
-  @Test
-  public void testContainsInAnyOrder() throws Exception {
-    ApexPipelineOptions options = 
PipelineOptionsFactory.create().as(ApexPipelineOptions.class);
-    options.setRunner(TestApexRunner.class);
-    Pipeline pipeline = Pipeline.create(options);
-    PCollection<Integer> pcollection = pipeline.apply(Create.of(1, 2, 3, 4));
-    PAssert.that(pcollection).containsInAnyOrder(2, 1, 4, 3);
-    // TODO: terminate faster based on processed assertion vs. auto-shutdown
-    pipeline.run();
-  }
-
-  @Test
-  public void testSerialization() throws Exception {
-    ApexPipelineOptions options = PipelineOptionsFactory.create()
-        .as(ApexPipelineOptions.class);
-    options.setRunner(TestApexRunner.class);
-    Pipeline pipeline = Pipeline.create(options);
-    Coder<WindowedValue<Integer>> coder = 
WindowedValue.getValueOnlyCoder(VarIntCoder.of());
-
-    PCollectionView<Integer> singletonView = pipeline.apply(Create.of(1))
-            .apply(Sum.integersGlobally().asSingletonView());
-
-    ApexParDoOperator<Integer, Integer> operator = new 
ApexParDoOperator<>(options,
-        new Add(singletonView), new TupleTag<Integer>(), 
TupleTagList.empty().getAll(),
-        WindowingStrategy.globalDefault(),
-        Collections.<PCollectionView<?>>singletonList(singletonView),
-        coder,
-        new ApexStateInternals.ApexStateInternalsFactory<Void>()
-        );
-    operator.setup(null);
-    operator.beginWindow(0);
-    WindowedValue<Integer> wv1 = WindowedValue.valueInGlobalWindow(1);
-    WindowedValue<Iterable<?>> sideInput = 
WindowedValue.<Iterable<?>>valueInGlobalWindow(
-        Lists.<Integer>newArrayList(22));
-    operator.input.process(ApexStreamTuple.DataTuple.of(wv1)); // pushed back 
input
-
-    final List<Object> results = Lists.newArrayList();
-    Sink<Object> sink =  new Sink<Object>() {
-      @Override
-      public void put(Object tuple) {
-        results.add(tuple);
-      }
-      @Override
-      public int getCount(boolean reset) {
-        return 0;
-      }
-    };
-
-    // verify pushed back input checkpointing
-    Assert.assertNotNull("Serialization", operator = 
KryoCloneUtils.cloneObject(operator));
-    operator.output.setSink(sink);
-    operator.setup(null);
-    operator.beginWindow(1);
-    WindowedValue<Integer> wv2 = WindowedValue.valueInGlobalWindow(2);
-    operator.sideInput1.process(ApexStreamTuple.DataTuple.of(sideInput));
-    Assert.assertEquals("number outputs", 1, results.size());
-    Assert.assertEquals("result", WindowedValue.valueInGlobalWindow(23),
-        ((ApexStreamTuple.DataTuple) results.get(0)).getValue());
-
-    // verify side input checkpointing
-    results.clear();
-    Assert.assertNotNull("Serialization", operator = 
KryoCloneUtils.cloneObject(operator));
-    operator.output.setSink(sink);
-    operator.setup(null);
-    operator.beginWindow(2);
-    operator.input.process(ApexStreamTuple.DataTuple.of(wv2));
-    Assert.assertEquals("number outputs", 1, results.size());
-    Assert.assertEquals("result", WindowedValue.valueInGlobalWindow(24),
-        ((ApexStreamTuple.DataTuple) results.get(0)).getValue());
-  }
-
-  @Test
-  public void testMultiOutputParDoWithSideInputs() throws Exception {
-    ApexPipelineOptions options = 
PipelineOptionsFactory.create().as(ApexPipelineOptions.class);
-    options.setRunner(ApexRunner.class); // non-blocking run
-    Pipeline pipeline = Pipeline.create(options);
-
-    List<Integer> inputs = Arrays.asList(3, -42, 666);
-    final TupleTag<String> mainOutputTag = new TupleTag<>("main");
-    final TupleTag<Void> sideOutputTag = new TupleTag<>("sideOutput");
-
-    PCollectionView<Integer> sideInput1 = pipeline
-        .apply("CreateSideInput1", Create.of(11))
-        .apply("ViewSideInput1", View.<Integer>asSingleton());
-    PCollectionView<Integer> sideInputUnread = pipeline
-        .apply("CreateSideInputUnread", Create.of(-3333))
-        .apply("ViewSideInputUnread", View.<Integer>asSingleton());
-    PCollectionView<Integer> sideInput2 = pipeline
-        .apply("CreateSideInput2", Create.of(222))
-        .apply("ViewSideInput2", View.<Integer>asSingleton());
-
-    PCollectionTuple outputs = pipeline
-        .apply(Create.of(inputs))
-        .apply(ParDo.withSideInputs(sideInput1)
-            .withSideInputs(sideInputUnread)
-            .withSideInputs(sideInput2)
-            .withOutputTags(mainOutputTag, TupleTagList.of(sideOutputTag))
-            .of(new TestMultiOutputWithSideInputsFn(
-                Arrays.asList(sideInput1, sideInput2),
-                Arrays.<TupleTag<String>>asList())));
-
-     outputs.get(mainOutputTag).apply(ParDo.of(new EmbeddedCollector()));
-     ApexRunnerResult result = (ApexRunnerResult) pipeline.run();
-
-     HashSet<String> expected = Sets.newHashSet("processing: 3: [11, 222]",
-         "processing: -42: [11, 222]", "processing: 666: [11, 222]");
-     long timeout = System.currentTimeMillis() + TIMEOUT_MILLIS;
-     while (System.currentTimeMillis() < timeout) {
-       if (EmbeddedCollector.RESULTS.containsAll(expected)) {
-         break;
-       }
-       LOG.info("Waiting for expected results.");
-       Thread.sleep(SLEEP_MILLIS);
-     }
-     result.cancel();
-     Assert.assertEquals(Sets.newHashSet(expected), EmbeddedCollector.RESULTS);
-  }
-
-  private static class TestMultiOutputWithSideInputsFn extends 
OldDoFn<Integer, String> {
-    private static final long serialVersionUID = 1L;
-
-    final List<PCollectionView<Integer>> sideInputViews = new ArrayList<>();
-    final List<TupleTag<String>> sideOutputTupleTags = new ArrayList<>();
-
-    public TestMultiOutputWithSideInputsFn(List<PCollectionView<Integer>> 
sideInputViews,
-        List<TupleTag<String>> sideOutputTupleTags) {
-      this.sideInputViews.addAll(sideInputViews);
-      this.sideOutputTupleTags.addAll(sideOutputTupleTags);
-    }
-
-    @Override
-    public void processElement(ProcessContext c) throws Exception {
-      outputToAllWithSideInputs(c, "processing: " + c.element());
-    }
-
-    private void outputToAllWithSideInputs(ProcessContext c, String value) {
-      if (!sideInputViews.isEmpty()) {
-        List<Integer> sideInputValues = new ArrayList<>();
-        for (PCollectionView<Integer> sideInputView : sideInputViews) {
-          sideInputValues.add(c.sideInput(sideInputView));
-        }
-        value += ": " + sideInputValues;
-      }
-      c.output(value);
-      for (TupleTag<String> sideOutputTupleTag : sideOutputTupleTags) {
-        c.sideOutput(sideOutputTupleTag,
-                     sideOutputTupleTag.getId() + ": " + value);
-      }
-    }
-
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5553c603/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/ReadUnboundTranslatorTest.java
----------------------------------------------------------------------
diff --git 
a/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/ReadUnboundTranslatorTest.java
 
b/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/ReadUnboundTranslatorTest.java
deleted file mode 100644
index 71c5354..0000000
--- 
a/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/ReadUnboundTranslatorTest.java
+++ /dev/null
@@ -1,129 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.apex.translators;
-
-import com.datatorrent.api.DAG;
-import com.google.common.collect.ContiguousSet;
-import com.google.common.collect.DiscreteDomain;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Range;
-import com.google.common.collect.Sets;
-
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-
-import org.apache.beam.runners.apex.ApexPipelineOptions;
-import org.apache.beam.runners.apex.ApexRunner;
-import org.apache.beam.runners.apex.ApexRunnerResult;
-import 
org.apache.beam.runners.apex.translators.io.ApexReadUnboundedInputOperator;
-import org.apache.beam.runners.apex.translators.utils.CollectionSource;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.io.CountingSource;
-import org.apache.beam.sdk.io.Read;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.transforms.OldDoFn;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.junit.Assert;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * integration test for {@link ReadUnboundedTranslator}.
- */
-public class ReadUnboundTranslatorTest {
-  private static final Logger LOG = 
LoggerFactory.getLogger(ReadUnboundTranslatorTest.class);
-
-  @Test
-  public void test() throws Exception {
-    ApexPipelineOptions options = PipelineOptionsFactory.create()
-        .as(ApexPipelineOptions.class);
-    EmbeddedCollector.RESULTS.clear();
-    options.setApplicationName("ReadUnbound");
-    options.setRunner(ApexRunner.class);
-    Pipeline p = Pipeline.create(options);
-
-    List<String> collection = Lists.newArrayList("1", "2", "3", "4", "5");
-    CollectionSource<String> source = new CollectionSource<>(collection, 
StringUtf8Coder.of());
-    p.apply(Read.from(source))
-        .apply(ParDo.of(new EmbeddedCollector()));
-
-    ApexRunnerResult result = (ApexRunnerResult) p.run();
-    DAG dag = result.getApexDAG();
-    DAG.OperatorMeta om = dag.getOperatorMeta("Read(CollectionSource)");
-    Assert.assertNotNull(om);
-    Assert.assertEquals(om.getOperator().getClass(), 
ApexReadUnboundedInputOperator.class);
-
-    long timeout = System.currentTimeMillis() + 30000;
-    while (System.currentTimeMillis() < timeout) {
-      if (EmbeddedCollector.RESULTS.containsAll(collection)) {
-        break;
-      }
-      LOG.info("Waiting for expected results.");
-      Thread.sleep(1000);
-    }
-    Assert.assertEquals(Sets.newHashSet(collection), 
EmbeddedCollector.RESULTS);
-  }
-
-  @Test
-  public void testReadBounded() throws Exception {
-    ApexPipelineOptions options = PipelineOptionsFactory.create()
-        .as(ApexPipelineOptions.class);
-    EmbeddedCollector.RESULTS.clear();
-    options.setApplicationName("ReadBounded");
-    options.setRunner(ApexRunner.class);
-    Pipeline p = Pipeline.create(options);
-
-    Set<Long> expected = ContiguousSet.create(Range.closedOpen(0L, 10L), 
DiscreteDomain.longs());
-    p.apply(Read.from(CountingSource.upTo(10)))
-        .apply(ParDo.of(new EmbeddedCollector()));
-
-    ApexRunnerResult result = (ApexRunnerResult) p.run();
-    DAG dag = result.getApexDAG();
-    DAG.OperatorMeta om = dag.getOperatorMeta("Read(BoundedCountingSource)");
-    Assert.assertNotNull(om);
-    Assert.assertEquals(om.getOperator().getClass(), 
ApexReadUnboundedInputOperator.class);
-
-    long timeout = System.currentTimeMillis() + 30000;
-    while (System.currentTimeMillis() < timeout) {
-      if (EmbeddedCollector.RESULTS.containsAll(expected)) {
-        break;
-      }
-      LOG.info("Waiting for expected results.");
-      Thread.sleep(1000);
-    }
-    Assert.assertEquals(Sets.newHashSet(expected), EmbeddedCollector.RESULTS);
-  }
-
-  @SuppressWarnings("serial")
-  private static class EmbeddedCollector extends OldDoFn<Object, Void> {
-    protected static final HashSet<Object> RESULTS = new HashSet<>();
-
-    public EmbeddedCollector() {
-    }
-
-    @Override
-    public void processElement(ProcessContext c) throws Exception {
-      RESULTS.add(c.element());
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5553c603/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/utils/ApexStateInternalsTest.java
----------------------------------------------------------------------
diff --git 
a/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/utils/ApexStateInternalsTest.java
 
b/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/utils/ApexStateInternalsTest.java
deleted file mode 100644
index 055d98c..0000000
--- 
a/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/utils/ApexStateInternalsTest.java
+++ /dev/null
@@ -1,361 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.apex.translators.utils;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertThat;
-
-import com.datatorrent.lib.util.KryoCloneUtils;
-
-import java.util.Arrays;
-
-import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.coders.VarIntCoder;
-import org.apache.beam.sdk.transforms.Sum;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
-import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
-import org.apache.beam.sdk.util.state.AccumulatorCombiningState;
-import org.apache.beam.sdk.util.state.BagState;
-import org.apache.beam.sdk.util.state.CombiningState;
-import org.apache.beam.sdk.util.state.ReadableState;
-import org.apache.beam.sdk.util.state.StateMerging;
-import org.apache.beam.sdk.util.state.StateNamespace;
-import org.apache.beam.sdk.util.state.StateNamespaceForTest;
-import org.apache.beam.sdk.util.state.StateTag;
-import org.apache.beam.sdk.util.state.StateTags;
-import org.apache.beam.sdk.util.state.ValueState;
-import org.apache.beam.sdk.util.state.WatermarkHoldState;
-import org.hamcrest.Matchers;
-import org.joda.time.Instant;
-import org.junit.Before;
-import org.junit.Test;
-
-/**
- * Tests for {@link ApexStateInternals}. This is based on the tests for
- * {@code InMemoryStateInternals}.
- */
-public class ApexStateInternalsTest {
-  private static final BoundedWindow WINDOW_1 = new IntervalWindow(new 
Instant(0), new Instant(10));
-  private static final StateNamespace NAMESPACE_1 = new 
StateNamespaceForTest("ns1");
-  private static final StateNamespace NAMESPACE_2 = new 
StateNamespaceForTest("ns2");
-  private static final StateNamespace NAMESPACE_3 = new 
StateNamespaceForTest("ns3");
-
-  private static final StateTag<Object, ValueState<String>> STRING_VALUE_ADDR =
-      StateTags.value("stringValue", StringUtf8Coder.of());
-  private static final StateTag<Object, AccumulatorCombiningState<Integer, 
int[], Integer>>
-      SUM_INTEGER_ADDR = StateTags.combiningValueFromInputInternal(
-          "sumInteger", VarIntCoder.of(), new Sum.SumIntegerFn());
-  private static final StateTag<Object, BagState<String>> STRING_BAG_ADDR =
-      StateTags.bag("stringBag", StringUtf8Coder.of());
-  private static final StateTag<Object, WatermarkHoldState<BoundedWindow>>
-      WATERMARK_EARLIEST_ADDR =
-      StateTags.watermarkStateInternal("watermark", 
OutputTimeFns.outputAtEarliestInputTimestamp());
-  private static final StateTag<Object, WatermarkHoldState<BoundedWindow>>
-      WATERMARK_LATEST_ADDR =
-      StateTags.watermarkStateInternal("watermark", 
OutputTimeFns.outputAtLatestInputTimestamp());
-  private static final StateTag<Object, WatermarkHoldState<BoundedWindow>> 
WATERMARK_EOW_ADDR =
-      StateTags.watermarkStateInternal("watermark", 
OutputTimeFns.outputAtEndOfWindow());
-
-  private ApexStateInternals<String> underTest;
-
-  @Before
-  public void initStateInternals() {
-    underTest = new ApexStateInternals<>(null);
-  }
-
-  @Test
-  public void testBag() throws Exception {
-    BagState<String> value = underTest.state(NAMESPACE_1, STRING_BAG_ADDR);
-
-    assertEquals(value, underTest.state(NAMESPACE_1, STRING_BAG_ADDR));
-    assertFalse(value.equals(underTest.state(NAMESPACE_2, STRING_BAG_ADDR)));
-
-    assertThat(value.read(), Matchers.emptyIterable());
-    value.add("hello");
-    assertThat(value.read(), Matchers.containsInAnyOrder("hello"));
-
-    value.add("world");
-    assertThat(value.read(), Matchers.containsInAnyOrder("hello", "world"));
-
-    value.clear();
-    assertThat(value.read(), Matchers.emptyIterable());
-    assertEquals(underTest.state(NAMESPACE_1, STRING_BAG_ADDR), value);
-
-  }
-
-  @Test
-  public void testBagIsEmpty() throws Exception {
-    BagState<String> value = underTest.state(NAMESPACE_1, STRING_BAG_ADDR);
-
-    assertThat(value.isEmpty().read(), Matchers.is(true));
-    ReadableState<Boolean> readFuture = value.isEmpty();
-    value.add("hello");
-    assertThat(readFuture.read(), Matchers.is(false));
-
-    value.clear();
-    assertThat(readFuture.read(), Matchers.is(true));
-  }
-
-  @Test
-  public void testMergeBagIntoSource() throws Exception {
-    BagState<String> bag1 = underTest.state(NAMESPACE_1, STRING_BAG_ADDR);
-    BagState<String> bag2 = underTest.state(NAMESPACE_2, STRING_BAG_ADDR);
-
-    bag1.add("Hello");
-    bag2.add("World");
-    bag1.add("!");
-
-    StateMerging.mergeBags(Arrays.asList(bag1, bag2), bag1);
-
-    // Reading the merged bag gets both the contents
-    assertThat(bag1.read(), Matchers.containsInAnyOrder("Hello", "World", 
"!"));
-    assertThat(bag2.read(), Matchers.emptyIterable());
-  }
-
-  @Test
-  public void testMergeBagIntoNewNamespace() throws Exception {
-    BagState<String> bag1 = underTest.state(NAMESPACE_1, STRING_BAG_ADDR);
-    BagState<String> bag2 = underTest.state(NAMESPACE_2, STRING_BAG_ADDR);
-    BagState<String> bag3 = underTest.state(NAMESPACE_3, STRING_BAG_ADDR);
-
-    bag1.add("Hello");
-    bag2.add("World");
-    bag1.add("!");
-
-    StateMerging.mergeBags(Arrays.asList(bag1, bag2, bag3), bag3);
-
-    // Reading the merged bag gets both the contents
-    assertThat(bag3.read(), Matchers.containsInAnyOrder("Hello", "World", 
"!"));
-    assertThat(bag1.read(), Matchers.emptyIterable());
-    assertThat(bag2.read(), Matchers.emptyIterable());
-  }
-
-  @Test
-  public void testCombiningValue() throws Exception {
-    CombiningState<Integer, Integer> value = underTest.state(NAMESPACE_1, 
SUM_INTEGER_ADDR);
-
-    // State instances are cached, but depend on the namespace.
-    assertEquals(value, underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR));
-    assertFalse(value.equals(underTest.state(NAMESPACE_2, SUM_INTEGER_ADDR)));
-
-    assertThat(value.read(), Matchers.equalTo(0));
-    value.add(2);
-    assertThat(value.read(), Matchers.equalTo(2));
-
-    value.add(3);
-    assertThat(value.read(), Matchers.equalTo(5));
-
-    value.clear();
-    assertThat(value.read(), Matchers.equalTo(0));
-    assertEquals(underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR), value);
-  }
-
-  @Test
-  public void testCombiningIsEmpty() throws Exception {
-    CombiningState<Integer, Integer> value = underTest.state(NAMESPACE_1, 
SUM_INTEGER_ADDR);
-
-    assertThat(value.isEmpty().read(), Matchers.is(true));
-    ReadableState<Boolean> readFuture = value.isEmpty();
-    value.add(5);
-    assertThat(readFuture.read(), Matchers.is(false));
-
-    value.clear();
-    assertThat(readFuture.read(), Matchers.is(true));
-  }
-
-  @Test
-  public void testMergeCombiningValueIntoSource() throws Exception {
-    AccumulatorCombiningState<Integer, int[], Integer> value1 =
-        underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR);
-    AccumulatorCombiningState<Integer, int[], Integer> value2 =
-        underTest.state(NAMESPACE_2, SUM_INTEGER_ADDR);
-
-    value1.add(5);
-    value2.add(10);
-    value1.add(6);
-
-    assertThat(value1.read(), Matchers.equalTo(11));
-    assertThat(value2.read(), Matchers.equalTo(10));
-
-    // Merging clears the old values and updates the result value.
-    StateMerging.mergeCombiningValues(Arrays.asList(value1, value2), value1);
-
-    assertThat(value1.read(), Matchers.equalTo(21));
-    assertThat(value2.read(), Matchers.equalTo(0));
-  }
-
-  @Test
-  public void testMergeCombiningValueIntoNewNamespace() throws Exception {
-    AccumulatorCombiningState<Integer, int[], Integer> value1 =
-        underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR);
-    AccumulatorCombiningState<Integer, int[], Integer> value2 =
-        underTest.state(NAMESPACE_2, SUM_INTEGER_ADDR);
-    AccumulatorCombiningState<Integer, int[], Integer> value3 =
-        underTest.state(NAMESPACE_3, SUM_INTEGER_ADDR);
-
-    value1.add(5);
-    value2.add(10);
-    value1.add(6);
-
-    StateMerging.mergeCombiningValues(Arrays.asList(value1, value2), value3);
-
-    // Merging clears the old values and updates the result value.
-    assertThat(value1.read(), Matchers.equalTo(0));
-    assertThat(value2.read(), Matchers.equalTo(0));
-    assertThat(value3.read(), Matchers.equalTo(21));
-  }
-
-  @Test
-  public void testWatermarkEarliestState() throws Exception {
-    WatermarkHoldState<BoundedWindow> value =
-        underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR);
-
-    // State instances are cached, but depend on the namespace.
-    assertEquals(value, underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR));
-    assertFalse(value.equals(underTest.state(NAMESPACE_2, 
WATERMARK_EARLIEST_ADDR)));
-
-    assertThat(value.read(), Matchers.nullValue());
-    value.add(new Instant(2000));
-    assertThat(value.read(), Matchers.equalTo(new Instant(2000)));
-
-    value.add(new Instant(3000));
-    assertThat(value.read(), Matchers.equalTo(new Instant(2000)));
-
-    value.add(new Instant(1000));
-    assertThat(value.read(), Matchers.equalTo(new Instant(1000)));
-
-    value.clear();
-    assertThat(value.read(), Matchers.equalTo(null));
-    assertEquals(underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR), value);
-  }
-
-  @Test
-  public void testWatermarkLatestState() throws Exception {
-    WatermarkHoldState<BoundedWindow> value =
-        underTest.state(NAMESPACE_1, WATERMARK_LATEST_ADDR);
-
-    // State instances are cached, but depend on the namespace.
-    assertEquals(value, underTest.state(NAMESPACE_1, WATERMARK_LATEST_ADDR));
-    assertFalse(value.equals(underTest.state(NAMESPACE_2, 
WATERMARK_LATEST_ADDR)));
-
-    assertThat(value.read(), Matchers.nullValue());
-    value.add(new Instant(2000));
-    assertThat(value.read(), Matchers.equalTo(new Instant(2000)));
-
-    value.add(new Instant(3000));
-    assertThat(value.read(), Matchers.equalTo(new Instant(3000)));
-
-    value.add(new Instant(1000));
-    assertThat(value.read(), Matchers.equalTo(new Instant(3000)));
-
-    value.clear();
-    assertThat(value.read(), Matchers.equalTo(null));
-    assertEquals(underTest.state(NAMESPACE_1, WATERMARK_LATEST_ADDR), value);
-  }
-
-  @Test
-  public void testWatermarkEndOfWindowState() throws Exception {
-    WatermarkHoldState<BoundedWindow> value = underTest.state(NAMESPACE_1, 
WATERMARK_EOW_ADDR);
-
-    // State instances are cached, but depend on the namespace.
-    assertEquals(value, underTest.state(NAMESPACE_1, WATERMARK_EOW_ADDR));
-    assertFalse(value.equals(underTest.state(NAMESPACE_2, 
WATERMARK_EOW_ADDR)));
-
-    assertThat(value.read(), Matchers.nullValue());
-    value.add(new Instant(2000));
-    assertThat(value.read(), Matchers.equalTo(new Instant(2000)));
-
-    value.clear();
-    assertThat(value.read(), Matchers.equalTo(null));
-    assertEquals(underTest.state(NAMESPACE_1, WATERMARK_EOW_ADDR), value);
-  }
-
-  @Test
-  public void testWatermarkStateIsEmpty() throws Exception {
-    WatermarkHoldState<BoundedWindow> value =
-        underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR);
-
-    assertThat(value.isEmpty().read(), Matchers.is(true));
-    ReadableState<Boolean> readFuture = value.isEmpty();
-    value.add(new Instant(1000));
-    assertThat(readFuture.read(), Matchers.is(false));
-
-    value.clear();
-    assertThat(readFuture.read(), Matchers.is(true));
-  }
-
-  @Test
-  public void testMergeEarliestWatermarkIntoSource() throws Exception {
-    WatermarkHoldState<BoundedWindow> value1 =
-        underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR);
-    WatermarkHoldState<BoundedWindow> value2 =
-        underTest.state(NAMESPACE_2, WATERMARK_EARLIEST_ADDR);
-
-    value1.add(new Instant(3000));
-    value2.add(new Instant(5000));
-    value1.add(new Instant(4000));
-    value2.add(new Instant(2000));
-
-    // Merging clears the old values and updates the merged value.
-    StateMerging.mergeWatermarks(Arrays.asList(value1, value2), value1, 
WINDOW_1);
-
-    assertThat(value1.read(), Matchers.equalTo(new Instant(2000)));
-    assertThat(value2.read(), Matchers.equalTo(null));
-  }
-
-  @Test
-  public void testMergeLatestWatermarkIntoSource() throws Exception {
-    WatermarkHoldState<BoundedWindow> value1 =
-        underTest.state(NAMESPACE_1, WATERMARK_LATEST_ADDR);
-    WatermarkHoldState<BoundedWindow> value2 =
-        underTest.state(NAMESPACE_2, WATERMARK_LATEST_ADDR);
-    WatermarkHoldState<BoundedWindow> value3 =
-        underTest.state(NAMESPACE_3, WATERMARK_LATEST_ADDR);
-
-    value1.add(new Instant(3000));
-    value2.add(new Instant(5000));
-    value1.add(new Instant(4000));
-    value2.add(new Instant(2000));
-
-    // Merging clears the old values and updates the result value.
-    StateMerging.mergeWatermarks(Arrays.asList(value1, value2), value3, 
WINDOW_1);
-
-    // Merging clears the old values and updates the result value.
-    assertThat(value3.read(), Matchers.equalTo(new Instant(5000)));
-    assertThat(value1.read(), Matchers.equalTo(null));
-    assertThat(value2.read(), Matchers.equalTo(null));
-  }
-
-  @Test
-  public void testSerialization() throws Exception {
-    ApexStateInternals<String> original = new ApexStateInternals<String>(null);
-    ValueState<String> value = original.state(NAMESPACE_1, STRING_VALUE_ADDR);
-    assertEquals(original.state(NAMESPACE_1, STRING_VALUE_ADDR), value);
-    value.write("hello");
-
-    ApexStateInternals<String> cloned;
-    assertNotNull("Serialization", cloned = 
KryoCloneUtils.cloneObject(original));
-    ValueState<String> clonedValue = cloned.state(NAMESPACE_1, 
STRING_VALUE_ADDR);
-    assertThat(clonedValue.read(), Matchers.equalTo("hello"));
-    assertEquals(cloned.state(NAMESPACE_1, STRING_VALUE_ADDR), value);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5553c603/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/utils/CollectionSource.java
----------------------------------------------------------------------
diff --git 
a/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/utils/CollectionSource.java
 
b/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/utils/CollectionSource.java
deleted file mode 100644
index c368bb2..0000000
--- 
a/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/utils/CollectionSource.java
+++ /dev/null
@@ -1,136 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.apex.translators.utils;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
-import java.util.NoSuchElementException;
-
-import javax.annotation.Nullable;
-
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.io.UnboundedSource;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.joda.time.Instant;
-
-/**
- * collection as {@link UnboundedSource}, used for tests.
- */
-public class CollectionSource<T> extends UnboundedSource<T, 
UnboundedSource.CheckpointMark> {
-  private static final long serialVersionUID = 1L;
-  private final Collection<T> collection;
-  private final Coder<T> coder;
-
-  public CollectionSource(Collection<T> collection, Coder<T> coder) {
-    this.collection = collection;
-    this.coder = coder;
-  }
-
-  @Override
-  public List<? extends UnboundedSource<T, CheckpointMark>> 
generateInitialSplits(
-      int desiredNumSplits, PipelineOptions options) throws Exception {
-    return Collections.singletonList(this);
-  }
-
-  @Override
-  public UnboundedReader<T> createReader(PipelineOptions options,
-      @Nullable UnboundedSource.CheckpointMark checkpointMark) {
-    return new CollectionReader<>(collection, this);
-  }
-
-  @Nullable
-  @Override
-  public Coder<CheckpointMark> getCheckpointMarkCoder() {
-    return null;
-  }
-
-  @Override
-  public void validate() {
-  }
-
-  @Override
-  public Coder<T> getDefaultOutputCoder() {
-    return coder;
-  }
-
-  private static class CollectionReader<T> extends 
UnboundedSource.UnboundedReader<T>
-      implements Serializable {
-
-    private T current;
-    private final CollectionSource<T> source;
-    private final Collection<T> collection;
-    private Iterator<T> iterator;
-
-    public CollectionReader(Collection<T> collection, CollectionSource<T> 
source) {
-      this.collection = collection;
-      this.source = source;
-    }
-
-    @Override
-    public boolean start() throws IOException {
-      if (null == iterator) {
-        iterator = collection.iterator();
-      }
-      return advance();
-    }
-
-    @Override
-    public boolean advance() throws IOException {
-      if (iterator.hasNext()) {
-        current = iterator.next();
-        return true;
-      } else {
-        return false;
-      }
-    }
-
-    @Override
-    public Instant getWatermark() {
-      return Instant.now();
-    }
-
-    @Override
-    public UnboundedSource.CheckpointMark getCheckpointMark() {
-      return null;
-    }
-
-    @Override
-    public UnboundedSource<T, ?> getCurrentSource() {
-      return source;
-    }
-
-    @Override
-    public T getCurrent() throws NoSuchElementException {
-      return current;
-    }
-
-    @Override
-    public Instant getCurrentTimestamp() throws NoSuchElementException {
-      return Instant.now();
-    }
-
-    @Override
-    public void close() throws IOException {
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5553c603/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/utils/PipelineOptionsTest.java
----------------------------------------------------------------------
diff --git 
a/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/utils/PipelineOptionsTest.java
 
b/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/utils/PipelineOptionsTest.java
deleted file mode 100644
index e67efa9..0000000
--- 
a/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/utils/PipelineOptionsTest.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.apex.translators.utils;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-
-import com.datatorrent.common.util.FSStorageAgent;
-import com.esotericsoftware.kryo.serializers.FieldSerializer.Bind;
-import com.esotericsoftware.kryo.serializers.JavaSerializer;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-
-import org.apache.beam.runners.apex.ApexPipelineOptions;
-import org.apache.beam.sdk.options.Default;
-import org.apache.beam.sdk.options.Description;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-/**
- * Tests the serialization of PipelineOptions.
- */
-public class PipelineOptionsTest {
-
-  /**
-   * Interface for testing.
-   */
-  public interface MyOptions extends ApexPipelineOptions {
-    @Description("Bla bla bla")
-    @Default.String("Hello")
-    String getTestOption();
-    void setTestOption(String value);
-  }
-
-  private static class MyOptionsWrapper {
-    private MyOptionsWrapper() {
-      this(null); // required for Kryo
-    }
-    private MyOptionsWrapper(ApexPipelineOptions options) {
-      this.options = new SerializablePipelineOptions(options);
-    }
-    @Bind(JavaSerializer.class)
-    private final SerializablePipelineOptions options;
-  }
-
-  private static MyOptions options;
-
-  private static final String[] args = new String[]{"--testOption=nothing"};
-
-  @BeforeClass
-  public static void beforeTest() {
-    options = PipelineOptionsFactory.fromArgs(args).as(MyOptions.class);
-  }
-
-  @Test
-  public void testSerialization() {
-    MyOptionsWrapper wrapper = new 
MyOptionsWrapper(PipelineOptionsTest.options);
-    ByteArrayOutputStream bos = new ByteArrayOutputStream();
-    FSStorageAgent.store(bos, wrapper);
-
-    ByteArrayInputStream bis = new ByteArrayInputStream(bos.toByteArray());
-    MyOptionsWrapper wrapperCopy = (MyOptionsWrapper) 
FSStorageAgent.retrieve(bis);
-    assertNotNull(wrapperCopy.options);
-    assertEquals("nothing", 
wrapperCopy.options.get().as(MyOptions.class).getTestOption());
-  }
-
-}


Reply via email to