Repository: incubator-beam Updated Branches: refs/heads/apex-runner 9197d1e05 -> c08ebbe79
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5553c603/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/ParDoBoundTranslatorTest.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/ParDoBoundTranslatorTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/ParDoBoundTranslatorTest.java deleted file mode 100644 index 2379a9e..0000000 --- a/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/ParDoBoundTranslatorTest.java +++ /dev/null @@ -1,340 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.runners.apex.translators; - -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - -import com.datatorrent.api.DAG; -import com.datatorrent.api.Sink; -import com.datatorrent.lib.util.KryoCloneUtils; -import com.google.common.collect.Lists; -import com.google.common.collect.Sets; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashSet; -import java.util.List; -import java.util.regex.Pattern; - -import org.apache.beam.runners.apex.ApexPipelineOptions; -import org.apache.beam.runners.apex.ApexRunner; -import org.apache.beam.runners.apex.ApexRunnerResult; -import org.apache.beam.runners.apex.TestApexRunner; -import org.apache.beam.runners.apex.translators.functions.ApexParDoOperator; -import org.apache.beam.runners.apex.translators.io.ApexReadUnboundedInputOperator; -import org.apache.beam.runners.apex.translators.utils.ApexStateInternals; -import org.apache.beam.runners.apex.translators.utils.ApexStreamTuple; -import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.SerializableCoder; -import org.apache.beam.sdk.coders.VarIntCoder; -import org.apache.beam.sdk.options.PipelineOptionsFactory; -import org.apache.beam.sdk.testing.PAssert; -import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.transforms.OldDoFn; -import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.transforms.Sum; -import org.apache.beam.sdk.transforms.View; -import org.apache.beam.sdk.util.WindowedValue; -import org.apache.beam.sdk.util.WindowingStrategy; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PCollectionTuple; -import org.apache.beam.sdk.values.PCollectionView; -import org.apache.beam.sdk.values.TupleTag; -import org.apache.beam.sdk.values.TupleTagList; -import org.junit.Assert; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * integration test for {@link ParDoBoundTranslator}. - */ -@RunWith(JUnit4.class) -public class ParDoBoundTranslatorTest { - private static final Logger LOG = LoggerFactory.getLogger(ParDoBoundTranslatorTest.class); - private static final long SLEEP_MILLIS = 500; - private static final long TIMEOUT_MILLIS = 30000; - - @Test - public void test() throws Exception { - ApexPipelineOptions options = PipelineOptionsFactory.create() - .as(ApexPipelineOptions.class); - options.setApplicationName("ParDoBound"); - options.setRunner(ApexRunner.class); - - Pipeline p = Pipeline.create(options); - - List<Integer> collection = Lists.newArrayList(1, 2, 3, 4, 5); - List<Integer> expected = Lists.newArrayList(6, 7, 8, 9, 10); - p.apply(Create.of(collection).withCoder(SerializableCoder.of(Integer.class))) - .apply(ParDo.of(new Add(5))) - .apply(ParDo.of(new EmbeddedCollector())); - - ApexRunnerResult result = (ApexRunnerResult) p.run(); - DAG dag = result.getApexDAG(); - - DAG.OperatorMeta om = dag.getOperatorMeta("Create.Values"); - Assert.assertNotNull(om); - Assert.assertEquals(om.getOperator().getClass(), ApexReadUnboundedInputOperator.class); - - om = dag.getOperatorMeta("ParDo(Add)"); - Assert.assertNotNull(om); - Assert.assertEquals(om.getOperator().getClass(), ApexParDoOperator.class); - - long timeout = System.currentTimeMillis() + TIMEOUT_MILLIS; - while (System.currentTimeMillis() < timeout) { - if (EmbeddedCollector.RESULTS.containsAll(expected)) { - break; - } - LOG.info("Waiting for expected results."); - Thread.sleep(SLEEP_MILLIS); - } - Assert.assertEquals(Sets.newHashSet(expected), EmbeddedCollector.RESULTS); - } - - @SuppressWarnings("serial") - private static class Add extends OldDoFn<Integer, Integer> { - private Integer number; - private PCollectionView<Integer> sideInputView; - - private Add(Integer number) { - this.number = number; - } - - private Add(PCollectionView<Integer> sideInputView) { - this.sideInputView = sideInputView; - } - - @Override - public void processElement(ProcessContext c) throws Exception { - if (sideInputView != null) { - number = c.sideInput(sideInputView); - } - c.output(c.element() + number); - } - } - - private static class EmbeddedCollector extends OldDoFn<Object, Void> { - private static final long serialVersionUID = 1L; - protected static final HashSet<Object> RESULTS = new HashSet<>(); - - public EmbeddedCollector() { - RESULTS.clear(); - } - - @Override - public void processElement(ProcessContext c) throws Exception { - RESULTS.add(c.element()); - } - } - - private static Throwable runExpectingAssertionFailure(Pipeline pipeline) { - // We cannot use thrown.expect(AssertionError.class) because the AssertionError - // is first caught by JUnit and causes a test failure. - try { - pipeline.run(); - } catch (AssertionError exc) { - return exc; - } - fail("assertion should have failed"); - throw new RuntimeException("unreachable"); - } - - @Test - public void testAssertionFailure() throws Exception { - ApexPipelineOptions options = PipelineOptionsFactory.create() - .as(ApexPipelineOptions.class); - options.setRunner(TestApexRunner.class); - Pipeline pipeline = Pipeline.create(options); - - PCollection<Integer> pcollection = pipeline - .apply(Create.of(1, 2, 3, 4)); - PAssert.that(pcollection).containsInAnyOrder(2, 1, 4, 3, 7); - - Throwable exc = runExpectingAssertionFailure(pipeline); - Pattern expectedPattern = Pattern.compile( - "Expected: iterable over \\[((<4>|<7>|<3>|<2>|<1>)(, )?){5}\\] in any order"); - // A loose pattern, but should get the job done. - assertTrue( - "Expected error message from PAssert with substring matching " - + expectedPattern - + " but the message was \"" - + exc.getMessage() - + "\"", - expectedPattern.matcher(exc.getMessage()).find()); - } - - @Test - public void testContainsInAnyOrder() throws Exception { - ApexPipelineOptions options = PipelineOptionsFactory.create().as(ApexPipelineOptions.class); - options.setRunner(TestApexRunner.class); - Pipeline pipeline = Pipeline.create(options); - PCollection<Integer> pcollection = pipeline.apply(Create.of(1, 2, 3, 4)); - PAssert.that(pcollection).containsInAnyOrder(2, 1, 4, 3); - // TODO: terminate faster based on processed assertion vs. auto-shutdown - pipeline.run(); - } - - @Test - public void testSerialization() throws Exception { - ApexPipelineOptions options = PipelineOptionsFactory.create() - .as(ApexPipelineOptions.class); - options.setRunner(TestApexRunner.class); - Pipeline pipeline = Pipeline.create(options); - Coder<WindowedValue<Integer>> coder = WindowedValue.getValueOnlyCoder(VarIntCoder.of()); - - PCollectionView<Integer> singletonView = pipeline.apply(Create.of(1)) - .apply(Sum.integersGlobally().asSingletonView()); - - ApexParDoOperator<Integer, Integer> operator = new ApexParDoOperator<>(options, - new Add(singletonView), new TupleTag<Integer>(), TupleTagList.empty().getAll(), - WindowingStrategy.globalDefault(), - Collections.<PCollectionView<?>>singletonList(singletonView), - coder, - new ApexStateInternals.ApexStateInternalsFactory<Void>() - ); - operator.setup(null); - operator.beginWindow(0); - WindowedValue<Integer> wv1 = WindowedValue.valueInGlobalWindow(1); - WindowedValue<Iterable<?>> sideInput = WindowedValue.<Iterable<?>>valueInGlobalWindow( - Lists.<Integer>newArrayList(22)); - operator.input.process(ApexStreamTuple.DataTuple.of(wv1)); // pushed back input - - final List<Object> results = Lists.newArrayList(); - Sink<Object> sink = new Sink<Object>() { - @Override - public void put(Object tuple) { - results.add(tuple); - } - @Override - public int getCount(boolean reset) { - return 0; - } - }; - - // verify pushed back input checkpointing - Assert.assertNotNull("Serialization", operator = KryoCloneUtils.cloneObject(operator)); - operator.output.setSink(sink); - operator.setup(null); - operator.beginWindow(1); - WindowedValue<Integer> wv2 = WindowedValue.valueInGlobalWindow(2); - operator.sideInput1.process(ApexStreamTuple.DataTuple.of(sideInput)); - Assert.assertEquals("number outputs", 1, results.size()); - Assert.assertEquals("result", WindowedValue.valueInGlobalWindow(23), - ((ApexStreamTuple.DataTuple) results.get(0)).getValue()); - - // verify side input checkpointing - results.clear(); - Assert.assertNotNull("Serialization", operator = KryoCloneUtils.cloneObject(operator)); - operator.output.setSink(sink); - operator.setup(null); - operator.beginWindow(2); - operator.input.process(ApexStreamTuple.DataTuple.of(wv2)); - Assert.assertEquals("number outputs", 1, results.size()); - Assert.assertEquals("result", WindowedValue.valueInGlobalWindow(24), - ((ApexStreamTuple.DataTuple) results.get(0)).getValue()); - } - - @Test - public void testMultiOutputParDoWithSideInputs() throws Exception { - ApexPipelineOptions options = PipelineOptionsFactory.create().as(ApexPipelineOptions.class); - options.setRunner(ApexRunner.class); // non-blocking run - Pipeline pipeline = Pipeline.create(options); - - List<Integer> inputs = Arrays.asList(3, -42, 666); - final TupleTag<String> mainOutputTag = new TupleTag<>("main"); - final TupleTag<Void> sideOutputTag = new TupleTag<>("sideOutput"); - - PCollectionView<Integer> sideInput1 = pipeline - .apply("CreateSideInput1", Create.of(11)) - .apply("ViewSideInput1", View.<Integer>asSingleton()); - PCollectionView<Integer> sideInputUnread = pipeline - .apply("CreateSideInputUnread", Create.of(-3333)) - .apply("ViewSideInputUnread", View.<Integer>asSingleton()); - PCollectionView<Integer> sideInput2 = pipeline - .apply("CreateSideInput2", Create.of(222)) - .apply("ViewSideInput2", View.<Integer>asSingleton()); - - PCollectionTuple outputs = pipeline - .apply(Create.of(inputs)) - .apply(ParDo.withSideInputs(sideInput1) - .withSideInputs(sideInputUnread) - .withSideInputs(sideInput2) - .withOutputTags(mainOutputTag, TupleTagList.of(sideOutputTag)) - .of(new TestMultiOutputWithSideInputsFn( - Arrays.asList(sideInput1, sideInput2), - Arrays.<TupleTag<String>>asList()))); - - outputs.get(mainOutputTag).apply(ParDo.of(new EmbeddedCollector())); - ApexRunnerResult result = (ApexRunnerResult) pipeline.run(); - - HashSet<String> expected = Sets.newHashSet("processing: 3: [11, 222]", - "processing: -42: [11, 222]", "processing: 666: [11, 222]"); - long timeout = System.currentTimeMillis() + TIMEOUT_MILLIS; - while (System.currentTimeMillis() < timeout) { - if (EmbeddedCollector.RESULTS.containsAll(expected)) { - break; - } - LOG.info("Waiting for expected results."); - Thread.sleep(SLEEP_MILLIS); - } - result.cancel(); - Assert.assertEquals(Sets.newHashSet(expected), EmbeddedCollector.RESULTS); - } - - private static class TestMultiOutputWithSideInputsFn extends OldDoFn<Integer, String> { - private static final long serialVersionUID = 1L; - - final List<PCollectionView<Integer>> sideInputViews = new ArrayList<>(); - final List<TupleTag<String>> sideOutputTupleTags = new ArrayList<>(); - - public TestMultiOutputWithSideInputsFn(List<PCollectionView<Integer>> sideInputViews, - List<TupleTag<String>> sideOutputTupleTags) { - this.sideInputViews.addAll(sideInputViews); - this.sideOutputTupleTags.addAll(sideOutputTupleTags); - } - - @Override - public void processElement(ProcessContext c) throws Exception { - outputToAllWithSideInputs(c, "processing: " + c.element()); - } - - private void outputToAllWithSideInputs(ProcessContext c, String value) { - if (!sideInputViews.isEmpty()) { - List<Integer> sideInputValues = new ArrayList<>(); - for (PCollectionView<Integer> sideInputView : sideInputViews) { - sideInputValues.add(c.sideInput(sideInputView)); - } - value += ": " + sideInputValues; - } - c.output(value); - for (TupleTag<String> sideOutputTupleTag : sideOutputTupleTags) { - c.sideOutput(sideOutputTupleTag, - sideOutputTupleTag.getId() + ": " + value); - } - } - - } - -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5553c603/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/ReadUnboundTranslatorTest.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/ReadUnboundTranslatorTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/ReadUnboundTranslatorTest.java deleted file mode 100644 index 71c5354..0000000 --- a/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/ReadUnboundTranslatorTest.java +++ /dev/null @@ -1,129 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.runners.apex.translators; - -import com.datatorrent.api.DAG; -import com.google.common.collect.ContiguousSet; -import com.google.common.collect.DiscreteDomain; -import com.google.common.collect.Lists; -import com.google.common.collect.Range; -import com.google.common.collect.Sets; - -import java.util.HashSet; -import java.util.List; -import java.util.Set; - -import org.apache.beam.runners.apex.ApexPipelineOptions; -import org.apache.beam.runners.apex.ApexRunner; -import org.apache.beam.runners.apex.ApexRunnerResult; -import org.apache.beam.runners.apex.translators.io.ApexReadUnboundedInputOperator; -import org.apache.beam.runners.apex.translators.utils.CollectionSource; -import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.coders.StringUtf8Coder; -import org.apache.beam.sdk.io.CountingSource; -import org.apache.beam.sdk.io.Read; -import org.apache.beam.sdk.options.PipelineOptionsFactory; -import org.apache.beam.sdk.transforms.OldDoFn; -import org.apache.beam.sdk.transforms.ParDo; -import org.junit.Assert; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * integration test for {@link ReadUnboundedTranslator}. - */ -public class ReadUnboundTranslatorTest { - private static final Logger LOG = LoggerFactory.getLogger(ReadUnboundTranslatorTest.class); - - @Test - public void test() throws Exception { - ApexPipelineOptions options = PipelineOptionsFactory.create() - .as(ApexPipelineOptions.class); - EmbeddedCollector.RESULTS.clear(); - options.setApplicationName("ReadUnbound"); - options.setRunner(ApexRunner.class); - Pipeline p = Pipeline.create(options); - - List<String> collection = Lists.newArrayList("1", "2", "3", "4", "5"); - CollectionSource<String> source = new CollectionSource<>(collection, StringUtf8Coder.of()); - p.apply(Read.from(source)) - .apply(ParDo.of(new EmbeddedCollector())); - - ApexRunnerResult result = (ApexRunnerResult) p.run(); - DAG dag = result.getApexDAG(); - DAG.OperatorMeta om = dag.getOperatorMeta("Read(CollectionSource)"); - Assert.assertNotNull(om); - Assert.assertEquals(om.getOperator().getClass(), ApexReadUnboundedInputOperator.class); - - long timeout = System.currentTimeMillis() + 30000; - while (System.currentTimeMillis() < timeout) { - if (EmbeddedCollector.RESULTS.containsAll(collection)) { - break; - } - LOG.info("Waiting for expected results."); - Thread.sleep(1000); - } - Assert.assertEquals(Sets.newHashSet(collection), EmbeddedCollector.RESULTS); - } - - @Test - public void testReadBounded() throws Exception { - ApexPipelineOptions options = PipelineOptionsFactory.create() - .as(ApexPipelineOptions.class); - EmbeddedCollector.RESULTS.clear(); - options.setApplicationName("ReadBounded"); - options.setRunner(ApexRunner.class); - Pipeline p = Pipeline.create(options); - - Set<Long> expected = ContiguousSet.create(Range.closedOpen(0L, 10L), DiscreteDomain.longs()); - p.apply(Read.from(CountingSource.upTo(10))) - .apply(ParDo.of(new EmbeddedCollector())); - - ApexRunnerResult result = (ApexRunnerResult) p.run(); - DAG dag = result.getApexDAG(); - DAG.OperatorMeta om = dag.getOperatorMeta("Read(BoundedCountingSource)"); - Assert.assertNotNull(om); - Assert.assertEquals(om.getOperator().getClass(), ApexReadUnboundedInputOperator.class); - - long timeout = System.currentTimeMillis() + 30000; - while (System.currentTimeMillis() < timeout) { - if (EmbeddedCollector.RESULTS.containsAll(expected)) { - break; - } - LOG.info("Waiting for expected results."); - Thread.sleep(1000); - } - Assert.assertEquals(Sets.newHashSet(expected), EmbeddedCollector.RESULTS); - } - - @SuppressWarnings("serial") - private static class EmbeddedCollector extends OldDoFn<Object, Void> { - protected static final HashSet<Object> RESULTS = new HashSet<>(); - - public EmbeddedCollector() { - } - - @Override - public void processElement(ProcessContext c) throws Exception { - RESULTS.add(c.element()); - } - } - -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5553c603/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/utils/ApexStateInternalsTest.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/utils/ApexStateInternalsTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/utils/ApexStateInternalsTest.java deleted file mode 100644 index 055d98c..0000000 --- a/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/utils/ApexStateInternalsTest.java +++ /dev/null @@ -1,361 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.apex.translators.utils; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertThat; - -import com.datatorrent.lib.util.KryoCloneUtils; - -import java.util.Arrays; - -import org.apache.beam.sdk.coders.StringUtf8Coder; -import org.apache.beam.sdk.coders.VarIntCoder; -import org.apache.beam.sdk.transforms.Sum; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.transforms.windowing.IntervalWindow; -import org.apache.beam.sdk.transforms.windowing.OutputTimeFns; -import org.apache.beam.sdk.util.state.AccumulatorCombiningState; -import org.apache.beam.sdk.util.state.BagState; -import org.apache.beam.sdk.util.state.CombiningState; -import org.apache.beam.sdk.util.state.ReadableState; -import org.apache.beam.sdk.util.state.StateMerging; -import org.apache.beam.sdk.util.state.StateNamespace; -import org.apache.beam.sdk.util.state.StateNamespaceForTest; -import org.apache.beam.sdk.util.state.StateTag; -import org.apache.beam.sdk.util.state.StateTags; -import org.apache.beam.sdk.util.state.ValueState; -import org.apache.beam.sdk.util.state.WatermarkHoldState; -import org.hamcrest.Matchers; -import org.joda.time.Instant; -import org.junit.Before; -import org.junit.Test; - -/** - * Tests for {@link ApexStateInternals}. This is based on the tests for - * {@code InMemoryStateInternals}. - */ -public class ApexStateInternalsTest { - private static final BoundedWindow WINDOW_1 = new IntervalWindow(new Instant(0), new Instant(10)); - private static final StateNamespace NAMESPACE_1 = new StateNamespaceForTest("ns1"); - private static final StateNamespace NAMESPACE_2 = new StateNamespaceForTest("ns2"); - private static final StateNamespace NAMESPACE_3 = new StateNamespaceForTest("ns3"); - - private static final StateTag<Object, ValueState<String>> STRING_VALUE_ADDR = - StateTags.value("stringValue", StringUtf8Coder.of()); - private static final StateTag<Object, AccumulatorCombiningState<Integer, int[], Integer>> - SUM_INTEGER_ADDR = StateTags.combiningValueFromInputInternal( - "sumInteger", VarIntCoder.of(), new Sum.SumIntegerFn()); - private static final StateTag<Object, BagState<String>> STRING_BAG_ADDR = - StateTags.bag("stringBag", StringUtf8Coder.of()); - private static final StateTag<Object, WatermarkHoldState<BoundedWindow>> - WATERMARK_EARLIEST_ADDR = - StateTags.watermarkStateInternal("watermark", OutputTimeFns.outputAtEarliestInputTimestamp()); - private static final StateTag<Object, WatermarkHoldState<BoundedWindow>> - WATERMARK_LATEST_ADDR = - StateTags.watermarkStateInternal("watermark", OutputTimeFns.outputAtLatestInputTimestamp()); - private static final StateTag<Object, WatermarkHoldState<BoundedWindow>> WATERMARK_EOW_ADDR = - StateTags.watermarkStateInternal("watermark", OutputTimeFns.outputAtEndOfWindow()); - - private ApexStateInternals<String> underTest; - - @Before - public void initStateInternals() { - underTest = new ApexStateInternals<>(null); - } - - @Test - public void testBag() throws Exception { - BagState<String> value = underTest.state(NAMESPACE_1, STRING_BAG_ADDR); - - assertEquals(value, underTest.state(NAMESPACE_1, STRING_BAG_ADDR)); - assertFalse(value.equals(underTest.state(NAMESPACE_2, STRING_BAG_ADDR))); - - assertThat(value.read(), Matchers.emptyIterable()); - value.add("hello"); - assertThat(value.read(), Matchers.containsInAnyOrder("hello")); - - value.add("world"); - assertThat(value.read(), Matchers.containsInAnyOrder("hello", "world")); - - value.clear(); - assertThat(value.read(), Matchers.emptyIterable()); - assertEquals(underTest.state(NAMESPACE_1, STRING_BAG_ADDR), value); - - } - - @Test - public void testBagIsEmpty() throws Exception { - BagState<String> value = underTest.state(NAMESPACE_1, STRING_BAG_ADDR); - - assertThat(value.isEmpty().read(), Matchers.is(true)); - ReadableState<Boolean> readFuture = value.isEmpty(); - value.add("hello"); - assertThat(readFuture.read(), Matchers.is(false)); - - value.clear(); - assertThat(readFuture.read(), Matchers.is(true)); - } - - @Test - public void testMergeBagIntoSource() throws Exception { - BagState<String> bag1 = underTest.state(NAMESPACE_1, STRING_BAG_ADDR); - BagState<String> bag2 = underTest.state(NAMESPACE_2, STRING_BAG_ADDR); - - bag1.add("Hello"); - bag2.add("World"); - bag1.add("!"); - - StateMerging.mergeBags(Arrays.asList(bag1, bag2), bag1); - - // Reading the merged bag gets both the contents - assertThat(bag1.read(), Matchers.containsInAnyOrder("Hello", "World", "!")); - assertThat(bag2.read(), Matchers.emptyIterable()); - } - - @Test - public void testMergeBagIntoNewNamespace() throws Exception { - BagState<String> bag1 = underTest.state(NAMESPACE_1, STRING_BAG_ADDR); - BagState<String> bag2 = underTest.state(NAMESPACE_2, STRING_BAG_ADDR); - BagState<String> bag3 = underTest.state(NAMESPACE_3, STRING_BAG_ADDR); - - bag1.add("Hello"); - bag2.add("World"); - bag1.add("!"); - - StateMerging.mergeBags(Arrays.asList(bag1, bag2, bag3), bag3); - - // Reading the merged bag gets both the contents - assertThat(bag3.read(), Matchers.containsInAnyOrder("Hello", "World", "!")); - assertThat(bag1.read(), Matchers.emptyIterable()); - assertThat(bag2.read(), Matchers.emptyIterable()); - } - - @Test - public void testCombiningValue() throws Exception { - CombiningState<Integer, Integer> value = underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR); - - // State instances are cached, but depend on the namespace. - assertEquals(value, underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR)); - assertFalse(value.equals(underTest.state(NAMESPACE_2, SUM_INTEGER_ADDR))); - - assertThat(value.read(), Matchers.equalTo(0)); - value.add(2); - assertThat(value.read(), Matchers.equalTo(2)); - - value.add(3); - assertThat(value.read(), Matchers.equalTo(5)); - - value.clear(); - assertThat(value.read(), Matchers.equalTo(0)); - assertEquals(underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR), value); - } - - @Test - public void testCombiningIsEmpty() throws Exception { - CombiningState<Integer, Integer> value = underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR); - - assertThat(value.isEmpty().read(), Matchers.is(true)); - ReadableState<Boolean> readFuture = value.isEmpty(); - value.add(5); - assertThat(readFuture.read(), Matchers.is(false)); - - value.clear(); - assertThat(readFuture.read(), Matchers.is(true)); - } - - @Test - public void testMergeCombiningValueIntoSource() throws Exception { - AccumulatorCombiningState<Integer, int[], Integer> value1 = - underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR); - AccumulatorCombiningState<Integer, int[], Integer> value2 = - underTest.state(NAMESPACE_2, SUM_INTEGER_ADDR); - - value1.add(5); - value2.add(10); - value1.add(6); - - assertThat(value1.read(), Matchers.equalTo(11)); - assertThat(value2.read(), Matchers.equalTo(10)); - - // Merging clears the old values and updates the result value. - StateMerging.mergeCombiningValues(Arrays.asList(value1, value2), value1); - - assertThat(value1.read(), Matchers.equalTo(21)); - assertThat(value2.read(), Matchers.equalTo(0)); - } - - @Test - public void testMergeCombiningValueIntoNewNamespace() throws Exception { - AccumulatorCombiningState<Integer, int[], Integer> value1 = - underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR); - AccumulatorCombiningState<Integer, int[], Integer> value2 = - underTest.state(NAMESPACE_2, SUM_INTEGER_ADDR); - AccumulatorCombiningState<Integer, int[], Integer> value3 = - underTest.state(NAMESPACE_3, SUM_INTEGER_ADDR); - - value1.add(5); - value2.add(10); - value1.add(6); - - StateMerging.mergeCombiningValues(Arrays.asList(value1, value2), value3); - - // Merging clears the old values and updates the result value. - assertThat(value1.read(), Matchers.equalTo(0)); - assertThat(value2.read(), Matchers.equalTo(0)); - assertThat(value3.read(), Matchers.equalTo(21)); - } - - @Test - public void testWatermarkEarliestState() throws Exception { - WatermarkHoldState<BoundedWindow> value = - underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR); - - // State instances are cached, but depend on the namespace. - assertEquals(value, underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR)); - assertFalse(value.equals(underTest.state(NAMESPACE_2, WATERMARK_EARLIEST_ADDR))); - - assertThat(value.read(), Matchers.nullValue()); - value.add(new Instant(2000)); - assertThat(value.read(), Matchers.equalTo(new Instant(2000))); - - value.add(new Instant(3000)); - assertThat(value.read(), Matchers.equalTo(new Instant(2000))); - - value.add(new Instant(1000)); - assertThat(value.read(), Matchers.equalTo(new Instant(1000))); - - value.clear(); - assertThat(value.read(), Matchers.equalTo(null)); - assertEquals(underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR), value); - } - - @Test - public void testWatermarkLatestState() throws Exception { - WatermarkHoldState<BoundedWindow> value = - underTest.state(NAMESPACE_1, WATERMARK_LATEST_ADDR); - - // State instances are cached, but depend on the namespace. - assertEquals(value, underTest.state(NAMESPACE_1, WATERMARK_LATEST_ADDR)); - assertFalse(value.equals(underTest.state(NAMESPACE_2, WATERMARK_LATEST_ADDR))); - - assertThat(value.read(), Matchers.nullValue()); - value.add(new Instant(2000)); - assertThat(value.read(), Matchers.equalTo(new Instant(2000))); - - value.add(new Instant(3000)); - assertThat(value.read(), Matchers.equalTo(new Instant(3000))); - - value.add(new Instant(1000)); - assertThat(value.read(), Matchers.equalTo(new Instant(3000))); - - value.clear(); - assertThat(value.read(), Matchers.equalTo(null)); - assertEquals(underTest.state(NAMESPACE_1, WATERMARK_LATEST_ADDR), value); - } - - @Test - public void testWatermarkEndOfWindowState() throws Exception { - WatermarkHoldState<BoundedWindow> value = underTest.state(NAMESPACE_1, WATERMARK_EOW_ADDR); - - // State instances are cached, but depend on the namespace. - assertEquals(value, underTest.state(NAMESPACE_1, WATERMARK_EOW_ADDR)); - assertFalse(value.equals(underTest.state(NAMESPACE_2, WATERMARK_EOW_ADDR))); - - assertThat(value.read(), Matchers.nullValue()); - value.add(new Instant(2000)); - assertThat(value.read(), Matchers.equalTo(new Instant(2000))); - - value.clear(); - assertThat(value.read(), Matchers.equalTo(null)); - assertEquals(underTest.state(NAMESPACE_1, WATERMARK_EOW_ADDR), value); - } - - @Test - public void testWatermarkStateIsEmpty() throws Exception { - WatermarkHoldState<BoundedWindow> value = - underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR); - - assertThat(value.isEmpty().read(), Matchers.is(true)); - ReadableState<Boolean> readFuture = value.isEmpty(); - value.add(new Instant(1000)); - assertThat(readFuture.read(), Matchers.is(false)); - - value.clear(); - assertThat(readFuture.read(), Matchers.is(true)); - } - - @Test - public void testMergeEarliestWatermarkIntoSource() throws Exception { - WatermarkHoldState<BoundedWindow> value1 = - underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR); - WatermarkHoldState<BoundedWindow> value2 = - underTest.state(NAMESPACE_2, WATERMARK_EARLIEST_ADDR); - - value1.add(new Instant(3000)); - value2.add(new Instant(5000)); - value1.add(new Instant(4000)); - value2.add(new Instant(2000)); - - // Merging clears the old values and updates the merged value. - StateMerging.mergeWatermarks(Arrays.asList(value1, value2), value1, WINDOW_1); - - assertThat(value1.read(), Matchers.equalTo(new Instant(2000))); - assertThat(value2.read(), Matchers.equalTo(null)); - } - - @Test - public void testMergeLatestWatermarkIntoSource() throws Exception { - WatermarkHoldState<BoundedWindow> value1 = - underTest.state(NAMESPACE_1, WATERMARK_LATEST_ADDR); - WatermarkHoldState<BoundedWindow> value2 = - underTest.state(NAMESPACE_2, WATERMARK_LATEST_ADDR); - WatermarkHoldState<BoundedWindow> value3 = - underTest.state(NAMESPACE_3, WATERMARK_LATEST_ADDR); - - value1.add(new Instant(3000)); - value2.add(new Instant(5000)); - value1.add(new Instant(4000)); - value2.add(new Instant(2000)); - - // Merging clears the old values and updates the result value. - StateMerging.mergeWatermarks(Arrays.asList(value1, value2), value3, WINDOW_1); - - // Merging clears the old values and updates the result value. - assertThat(value3.read(), Matchers.equalTo(new Instant(5000))); - assertThat(value1.read(), Matchers.equalTo(null)); - assertThat(value2.read(), Matchers.equalTo(null)); - } - - @Test - public void testSerialization() throws Exception { - ApexStateInternals<String> original = new ApexStateInternals<String>(null); - ValueState<String> value = original.state(NAMESPACE_1, STRING_VALUE_ADDR); - assertEquals(original.state(NAMESPACE_1, STRING_VALUE_ADDR), value); - value.write("hello"); - - ApexStateInternals<String> cloned; - assertNotNull("Serialization", cloned = KryoCloneUtils.cloneObject(original)); - ValueState<String> clonedValue = cloned.state(NAMESPACE_1, STRING_VALUE_ADDR); - assertThat(clonedValue.read(), Matchers.equalTo("hello")); - assertEquals(cloned.state(NAMESPACE_1, STRING_VALUE_ADDR), value); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5553c603/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/utils/CollectionSource.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/utils/CollectionSource.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/utils/CollectionSource.java deleted file mode 100644 index c368bb2..0000000 --- a/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/utils/CollectionSource.java +++ /dev/null @@ -1,136 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.runners.apex.translators.utils; - -import java.io.IOException; -import java.io.Serializable; -import java.util.Collection; -import java.util.Collections; -import java.util.Iterator; -import java.util.List; -import java.util.NoSuchElementException; - -import javax.annotation.Nullable; - -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.io.UnboundedSource; -import org.apache.beam.sdk.options.PipelineOptions; -import org.joda.time.Instant; - -/** - * collection as {@link UnboundedSource}, used for tests. - */ -public class CollectionSource<T> extends UnboundedSource<T, UnboundedSource.CheckpointMark> { - private static final long serialVersionUID = 1L; - private final Collection<T> collection; - private final Coder<T> coder; - - public CollectionSource(Collection<T> collection, Coder<T> coder) { - this.collection = collection; - this.coder = coder; - } - - @Override - public List<? extends UnboundedSource<T, CheckpointMark>> generateInitialSplits( - int desiredNumSplits, PipelineOptions options) throws Exception { - return Collections.singletonList(this); - } - - @Override - public UnboundedReader<T> createReader(PipelineOptions options, - @Nullable UnboundedSource.CheckpointMark checkpointMark) { - return new CollectionReader<>(collection, this); - } - - @Nullable - @Override - public Coder<CheckpointMark> getCheckpointMarkCoder() { - return null; - } - - @Override - public void validate() { - } - - @Override - public Coder<T> getDefaultOutputCoder() { - return coder; - } - - private static class CollectionReader<T> extends UnboundedSource.UnboundedReader<T> - implements Serializable { - - private T current; - private final CollectionSource<T> source; - private final Collection<T> collection; - private Iterator<T> iterator; - - public CollectionReader(Collection<T> collection, CollectionSource<T> source) { - this.collection = collection; - this.source = source; - } - - @Override - public boolean start() throws IOException { - if (null == iterator) { - iterator = collection.iterator(); - } - return advance(); - } - - @Override - public boolean advance() throws IOException { - if (iterator.hasNext()) { - current = iterator.next(); - return true; - } else { - return false; - } - } - - @Override - public Instant getWatermark() { - return Instant.now(); - } - - @Override - public UnboundedSource.CheckpointMark getCheckpointMark() { - return null; - } - - @Override - public UnboundedSource<T, ?> getCurrentSource() { - return source; - } - - @Override - public T getCurrent() throws NoSuchElementException { - return current; - } - - @Override - public Instant getCurrentTimestamp() throws NoSuchElementException { - return Instant.now(); - } - - @Override - public void close() throws IOException { - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5553c603/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/utils/PipelineOptionsTest.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/utils/PipelineOptionsTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/utils/PipelineOptionsTest.java deleted file mode 100644 index e67efa9..0000000 --- a/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/utils/PipelineOptionsTest.java +++ /dev/null @@ -1,84 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.apex.translators.utils; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; - -import com.datatorrent.common.util.FSStorageAgent; -import com.esotericsoftware.kryo.serializers.FieldSerializer.Bind; -import com.esotericsoftware.kryo.serializers.JavaSerializer; - -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; - -import org.apache.beam.runners.apex.ApexPipelineOptions; -import org.apache.beam.sdk.options.Default; -import org.apache.beam.sdk.options.Description; -import org.apache.beam.sdk.options.PipelineOptionsFactory; -import org.junit.BeforeClass; -import org.junit.Test; - -/** - * Tests the serialization of PipelineOptions. - */ -public class PipelineOptionsTest { - - /** - * Interface for testing. - */ - public interface MyOptions extends ApexPipelineOptions { - @Description("Bla bla bla") - @Default.String("Hello") - String getTestOption(); - void setTestOption(String value); - } - - private static class MyOptionsWrapper { - private MyOptionsWrapper() { - this(null); // required for Kryo - } - private MyOptionsWrapper(ApexPipelineOptions options) { - this.options = new SerializablePipelineOptions(options); - } - @Bind(JavaSerializer.class) - private final SerializablePipelineOptions options; - } - - private static MyOptions options; - - private static final String[] args = new String[]{"--testOption=nothing"}; - - @BeforeClass - public static void beforeTest() { - options = PipelineOptionsFactory.fromArgs(args).as(MyOptions.class); - } - - @Test - public void testSerialization() { - MyOptionsWrapper wrapper = new MyOptionsWrapper(PipelineOptionsTest.options); - ByteArrayOutputStream bos = new ByteArrayOutputStream(); - FSStorageAgent.store(bos, wrapper); - - ByteArrayInputStream bis = new ByteArrayInputStream(bos.toByteArray()); - MyOptionsWrapper wrapperCopy = (MyOptionsWrapper) FSStorageAgent.retrieve(bis); - assertNotNull(wrapperCopy.options); - assertEquals("nothing", wrapperCopy.options.get().as(MyOptions.class).getTestOption()); - } - -}