http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5553c603/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/SerializablePipelineOptions.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/SerializablePipelineOptions.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/SerializablePipelineOptions.java deleted file mode 100644 index d32b869..0000000 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/SerializablePipelineOptions.java +++ /dev/null @@ -1,60 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.apex.translators.utils; - -import com.fasterxml.jackson.databind.ObjectMapper; - -import java.io.Externalizable; -import java.io.IOException; -import java.io.ObjectInput; -import java.io.ObjectOutput; - -import org.apache.beam.runners.apex.ApexPipelineOptions; -import org.apache.beam.sdk.options.PipelineOptions; - -/** - * A wrapper to enable serialization of {@link PipelineOptions}. - */ -public class SerializablePipelineOptions implements Externalizable { - - private transient ApexPipelineOptions pipelineOptions; - - public SerializablePipelineOptions(ApexPipelineOptions pipelineOptions) { - this.pipelineOptions = pipelineOptions; - } - - public SerializablePipelineOptions() { - } - - public ApexPipelineOptions get() { - return this.pipelineOptions; - } - - @Override - public void writeExternal(ObjectOutput out) throws IOException { - out.writeUTF(new ObjectMapper().writeValueAsString(pipelineOptions)); - } - - @Override - public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - String s = in.readUTF(); - this.pipelineOptions = new ObjectMapper().readValue(s, PipelineOptions.class) - .as(ApexPipelineOptions.class); - } - -}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5553c603/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/ValueAndCoderKryoSerializable.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/ValueAndCoderKryoSerializable.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/ValueAndCoderKryoSerializable.java deleted file mode 100644 index c06c500..0000000 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/ValueAndCoderKryoSerializable.java +++ /dev/null @@ -1,77 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.apex.translators.utils; - -import com.esotericsoftware.kryo.Kryo; -import com.esotericsoftware.kryo.KryoSerializable; -import com.esotericsoftware.kryo.io.Input; -import com.esotericsoftware.kryo.io.Output; -import com.esotericsoftware.kryo.serializers.JavaSerializer; - -import java.io.IOException; - -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.Coder.Context; - - -/** - * A {@link KryoSerializable} holder that uses the specified {@link Coder}. - * @param <T> - */ -public class ValueAndCoderKryoSerializable<T> implements KryoSerializable { - private static final JavaSerializer JAVA_SERIALIZER = new JavaSerializer(); - private T value; - private Coder<T> coder; - - public ValueAndCoderKryoSerializable(T value, Coder<T> coder) { - this.value = value; - this.coder = coder; - } - - @SuppressWarnings("unused") // for Kryo - private ValueAndCoderKryoSerializable() { - } - - public T get() { - return value; - } - - @Override - public void write(Kryo kryo, Output output) { - try { - kryo.writeClass(output, coder.getClass()); - kryo.writeObject(output, coder, JAVA_SERIALIZER); - coder.encode(value, output, Context.OUTER); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - - @Override - public void read(Kryo kryo, Input input) { - try { - @SuppressWarnings("unchecked") - Class<Coder<T>> type = kryo.readClass(input).getType(); - coder = kryo.readObject(input, type, JAVA_SERIALIZER); - value = coder.decode(input, Context.OUTER); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5553c603/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/package-info.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/package-info.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/package-info.java deleted file mode 100644 index 4aeba35..0000000 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/package-info.java +++ /dev/null @@ -1,22 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * Implementation of the Beam runner for Apache Apex. - */ -package org.apache.beam.runners.apex.translators.utils; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5553c603/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ApexGroupByKeyOperatorTest.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ApexGroupByKeyOperatorTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ApexGroupByKeyOperatorTest.java new file mode 100644 index 0000000..c0ddb83 --- /dev/null +++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ApexGroupByKeyOperatorTest.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.apex.translation; + +import com.datatorrent.api.Sink; +import com.google.common.collect.Lists; + +import java.util.List; + +import org.apache.beam.runners.apex.ApexPipelineOptions; +import org.apache.beam.runners.apex.TestApexRunner; +import org.apache.beam.runners.apex.translation.operators.ApexGroupByKeyOperator; +import org.apache.beam.runners.apex.translation.utils.ApexStateInternals; +import org.apache.beam.runners.apex.translation.utils.ApexStreamTuple; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.coders.VarIntCoder; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.FixedWindows; +import org.apache.beam.sdk.transforms.windowing.IntervalWindow; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.util.WindowingStrategy; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollection.IsBounded; +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.junit.Assert; +import org.junit.Test; + +/** + * Test for {@link ApexGroupByKeyOperator}. + */ +public class ApexGroupByKeyOperatorTest { + + @Test + public void testGlobalWindowMinTimestamp() throws Exception { + ApexPipelineOptions options = PipelineOptionsFactory.create() + .as(ApexPipelineOptions.class); + options.setRunner(TestApexRunner.class); + Pipeline pipeline = Pipeline.create(options); + + WindowingStrategy<?, ?> ws = WindowingStrategy.of(FixedWindows.of( + Duration.standardSeconds(10))); + PCollection<KV<String, Integer>> input = PCollection.createPrimitiveOutputInternal(pipeline, + ws, IsBounded.BOUNDED); + input.setCoder(KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of())); + + ApexGroupByKeyOperator<String, Integer> operator = new ApexGroupByKeyOperator<>(options, + input, new ApexStateInternals.ApexStateInternalsFactory<String>() + ); + + final List<Object> results = Lists.newArrayList(); + Sink<Object> sink = new Sink<Object>() { + @Override + public void put(Object tuple) { + results.add(tuple); + } + @Override + public int getCount(boolean reset) { + return 0; + } + }; + operator.output.setSink(sink); + operator.setup(null); + operator.beginWindow(1); + + Instant windowStart = BoundedWindow.TIMESTAMP_MIN_VALUE; + BoundedWindow window = new IntervalWindow(windowStart, windowStart.plus(10000)); + PaneInfo paneInfo = PaneInfo.NO_FIRING; + + WindowedValue<KV<String, Integer>> wv1 = + WindowedValue.of(KV.of("foo", 1), windowStart, window, paneInfo); + operator.input.process(ApexStreamTuple.DataTuple.of(wv1)); + + WindowedValue<KV<String, Integer>> wv2 = + WindowedValue.of(KV.of("foo", 1), windowStart, window, paneInfo); + operator.input.process(ApexStreamTuple.DataTuple.of(wv2)); + + ApexStreamTuple<WindowedValue<KV<String, Integer>>> watermark = + ApexStreamTuple.WatermarkTuple.of(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()); + + Assert.assertEquals("number outputs", 0, results.size()); + operator.input.process(watermark); + Assert.assertEquals("number outputs", 2, results.size()); + @SuppressWarnings({ "unchecked", "rawtypes" }) + ApexStreamTuple.DataTuple<WindowedValue<KV<String, Iterable<Integer>>>> dataTuple = + (ApexStreamTuple.DataTuple) results.get(0); + List<Integer> counts = Lists.newArrayList(1, 1); + Assert.assertEquals("iterable", KV.of("foo", counts), dataTuple.getValue().getValue()); + Assert.assertEquals("expected watermark", watermark, results.get(1)); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5553c603/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/FlattenPCollectionTranslatorTest.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/FlattenPCollectionTranslatorTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/FlattenPCollectionTranslatorTest.java new file mode 100644 index 0000000..6b62a58 --- /dev/null +++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/FlattenPCollectionTranslatorTest.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.apex.translation; + +import com.google.common.collect.Sets; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Set; + +import org.apache.beam.runners.apex.ApexPipelineOptions; +import org.apache.beam.runners.apex.ApexRunner; +import org.apache.beam.runners.apex.ApexRunnerResult; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.Flatten; +import org.apache.beam.sdk.transforms.OldDoFn; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionList; +import org.junit.Assert; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Integration test for {@link FlattenPCollectionTranslator}. + */ +public class FlattenPCollectionTranslatorTest { + private static final Logger LOG = LoggerFactory.getLogger(FlattenPCollectionTranslatorTest.class); + + @Test + public void test() throws Exception { + ApexPipelineOptions options = PipelineOptionsFactory.as(ApexPipelineOptions.class); + options.setApplicationName("FlattenPCollection"); + options.setRunner(ApexRunner.class); + Pipeline p = Pipeline.create(options); + + String[][] collections = { + {"1"}, {"2"}, {"3"}, {"4"}, {"5"} + }; + + Set<String> expected = Sets.newHashSet(); + List<PCollection<String>> pcList = new ArrayList<PCollection<String>>(); + for (String[] collection : collections) { + pcList.add(p.apply(Create.of(collection).withCoder(StringUtf8Coder.of()))); + expected.addAll(Arrays.asList(collection)); + } + + PCollection<String> actual = PCollectionList.of(pcList).apply(Flatten.<String>pCollections()); + actual.apply(ParDo.of(new EmbeddedCollector())); + + ApexRunnerResult result = (ApexRunnerResult) p.run(); + // TODO: verify translation + result.getApexDAG(); + long timeout = System.currentTimeMillis() + 30000; + while (System.currentTimeMillis() < timeout + && EmbeddedCollector.RESULTS.size() < expected.size()) { + LOG.info("Waiting for expected results."); + Thread.sleep(500); + } + + Assert.assertEquals("number results", expected.size(), EmbeddedCollector.RESULTS.size()); + Assert.assertEquals(expected, Sets.newHashSet(EmbeddedCollector.RESULTS)); + } + + @SuppressWarnings("serial") + private static class EmbeddedCollector extends OldDoFn<Object, Void> { + protected static final ArrayList<Object> RESULTS = new ArrayList<>(); + + public EmbeddedCollector() { + } + + @Override + public void processElement(ProcessContext c) throws Exception { + RESULTS.add(c.element()); + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5553c603/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/GroupByKeyTranslatorTest.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/GroupByKeyTranslatorTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/GroupByKeyTranslatorTest.java new file mode 100644 index 0000000..d627cd9 --- /dev/null +++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/GroupByKeyTranslatorTest.java @@ -0,0 +1,246 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.apex.translation; + +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; + +import java.io.IOException; +import java.io.Serializable; +import java.util.Collections; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; + +import javax.annotation.Nullable; + +import org.apache.beam.runners.apex.ApexPipelineOptions; +import org.apache.beam.runners.apex.ApexRunner; +import org.apache.beam.runners.apex.ApexRunnerResult; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.io.Read; +import org.apache.beam.sdk.io.UnboundedSource; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.transforms.Count; +import org.apache.beam.sdk.transforms.OldDoFn; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.windowing.FixedWindows; +import org.apache.beam.sdk.transforms.windowing.OutputTimeFns; +import org.apache.beam.sdk.transforms.windowing.Window; +import org.apache.beam.sdk.values.KV; +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.junit.Assert; +import org.junit.Test; + +/** + * Integration test for {@link GroupByKeyTranslator}. + */ +public class GroupByKeyTranslatorTest { + + @SuppressWarnings({"unchecked"}) + @Test + public void test() throws Exception { + ApexPipelineOptions options = + PipelineOptionsFactory.as(ApexPipelineOptions.class); + options.setApplicationName("GroupByKey"); + options.setRunner(ApexRunner.class); + Pipeline p = Pipeline.create(options); + + List<KV<String, Instant>> data = + Lists.newArrayList( + KV.of("foo", new Instant(1000)), KV.of("foo", new Instant(1000)), + KV.of("foo", new Instant(2000)), + KV.of("bar", new Instant(1000)), KV.of("bar", new Instant(2000)), + KV.of("bar", new Instant(2000)) + ); + + // expected results assume outputAtLatestInputTimestamp + List<KV<Instant, KV<String, Long>>> expected = + Lists.newArrayList( + KV.of(new Instant(1000), KV.of("foo", 2L)), + KV.of(new Instant(1000), KV.of("bar", 1L)), + KV.of(new Instant(2000), KV.of("foo", 1L)), + KV.of(new Instant(2000), KV.of("bar", 2L)) + ); + + p.apply(Read.from(new TestSource(data, new Instant(5000)))) + .apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds(1))) + .withOutputTimeFn(OutputTimeFns.outputAtLatestInputTimestamp())) + .apply(Count.<String>perElement()) + .apply(ParDo.of(new KeyedByTimestamp<KV<String, Long>>())) + .apply(ParDo.of(new EmbeddedCollector())) + ; + + ApexRunnerResult result = (ApexRunnerResult) p.run(); + result.getApexDAG(); + + long timeout = System.currentTimeMillis() + 30000; + while (System.currentTimeMillis() < timeout) { + if (EmbeddedCollector.RESULTS.containsAll(expected)) { + break; + } + Thread.sleep(1000); + } + Assert.assertEquals(Sets.newHashSet(expected), EmbeddedCollector.RESULTS); + + } + + @SuppressWarnings("serial") + private static class EmbeddedCollector extends OldDoFn<Object, Void> { + protected static final HashSet<Object> RESULTS = new HashSet<>(); + + public EmbeddedCollector() { + } + + @Override + public void processElement(ProcessContext c) throws Exception { + RESULTS.add(c.element()); + } + } + + private static class KeyedByTimestamp<T> extends OldDoFn<T, KV<Instant, T>> { + + @Override + public void processElement(ProcessContext c) throws Exception { + c.output(KV.of(c.timestamp(), c.element())); + } + } + + private static class TestSource extends UnboundedSource<String, UnboundedSource.CheckpointMark> { + + private final List<KV<String, Instant>> data; + private final Instant watermark; + + public TestSource(List<KV<String, Instant>> data, Instant watermark) { + this.data = data; + this.watermark = watermark; + } + + @Override + public List<? extends UnboundedSource<String, CheckpointMark>> generateInitialSplits( + int desiredNumSplits, PipelineOptions options) throws Exception { + return Collections.<UnboundedSource<String, CheckpointMark>>singletonList(this); + } + + @Override + public UnboundedReader<String> createReader(PipelineOptions options, + @Nullable CheckpointMark checkpointMark) { + return new TestReader(data, watermark, this); + } + + @Nullable + @Override + public Coder<CheckpointMark> getCheckpointMarkCoder() { + return null; + } + + @Override + public void validate() { + } + + @Override + public Coder<String> getDefaultOutputCoder() { + return StringUtf8Coder.of(); + } + + private static class TestReader extends UnboundedReader<String> implements Serializable { + + private static final long serialVersionUID = 7526472295622776147L; + + private final List<KV<String, Instant>> data; + private final TestSource source; + + private Iterator<KV<String, Instant>> iterator; + private String currentRecord; + private Instant currentTimestamp; + private Instant watermark; + private boolean collected; + + public TestReader(List<KV<String, Instant>> data, Instant watermark, TestSource source) { + this.data = data; + this.source = source; + this.watermark = watermark; + } + + @Override + public boolean start() throws IOException { + iterator = data.iterator(); + return advance(); + } + + @Override + public boolean advance() throws IOException { + if (iterator.hasNext()) { + KV<String, Instant> kv = iterator.next(); + collected = false; + currentRecord = kv.getKey(); + currentTimestamp = kv.getValue(); + return true; + } else { + return false; + } + } + + @Override + public byte[] getCurrentRecordId() throws NoSuchElementException { + return new byte[0]; + } + + @Override + public String getCurrent() throws NoSuchElementException { + collected = true; + return this.currentRecord; + } + + @Override + public Instant getCurrentTimestamp() throws NoSuchElementException { + return currentTimestamp; + } + + @Override + public void close() throws IOException { + } + + @Override + public Instant getWatermark() { + if (!iterator.hasNext() && collected) { + return watermark; + } else { + return new Instant(0); + } + } + + @Override + public CheckpointMark getCheckpointMark() { + return null; + } + + @Override + public UnboundedSource<String, ?> getCurrentSource() { + return this.source; + } + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5553c603/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ParDoBoundTranslatorTest.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ParDoBoundTranslatorTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ParDoBoundTranslatorTest.java new file mode 100644 index 0000000..2e86152 --- /dev/null +++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ParDoBoundTranslatorTest.java @@ -0,0 +1,340 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.apex.translation; + +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import com.datatorrent.api.DAG; +import com.datatorrent.api.Sink; +import com.datatorrent.lib.util.KryoCloneUtils; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.regex.Pattern; + +import org.apache.beam.runners.apex.ApexPipelineOptions; +import org.apache.beam.runners.apex.ApexRunner; +import org.apache.beam.runners.apex.ApexRunnerResult; +import org.apache.beam.runners.apex.TestApexRunner; +import org.apache.beam.runners.apex.translation.operators.ApexParDoOperator; +import org.apache.beam.runners.apex.translation.operators.ApexReadUnboundedInputOperator; +import org.apache.beam.runners.apex.translation.utils.ApexStateInternals; +import org.apache.beam.runners.apex.translation.utils.ApexStreamTuple; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.SerializableCoder; +import org.apache.beam.sdk.coders.VarIntCoder; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.OldDoFn; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.Sum; +import org.apache.beam.sdk.transforms.View; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.util.WindowingStrategy; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionTuple; +import org.apache.beam.sdk.values.PCollectionView; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.TupleTagList; +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * integration test for {@link ParDoBoundTranslator}. + */ +@RunWith(JUnit4.class) +public class ParDoBoundTranslatorTest { + private static final Logger LOG = LoggerFactory.getLogger(ParDoBoundTranslatorTest.class); + private static final long SLEEP_MILLIS = 500; + private static final long TIMEOUT_MILLIS = 30000; + + @Test + public void test() throws Exception { + ApexPipelineOptions options = PipelineOptionsFactory.create() + .as(ApexPipelineOptions.class); + options.setApplicationName("ParDoBound"); + options.setRunner(ApexRunner.class); + + Pipeline p = Pipeline.create(options); + + List<Integer> collection = Lists.newArrayList(1, 2, 3, 4, 5); + List<Integer> expected = Lists.newArrayList(6, 7, 8, 9, 10); + p.apply(Create.of(collection).withCoder(SerializableCoder.of(Integer.class))) + .apply(ParDo.of(new Add(5))) + .apply(ParDo.of(new EmbeddedCollector())); + + ApexRunnerResult result = (ApexRunnerResult) p.run(); + DAG dag = result.getApexDAG(); + + DAG.OperatorMeta om = dag.getOperatorMeta("Create.Values"); + Assert.assertNotNull(om); + Assert.assertEquals(om.getOperator().getClass(), ApexReadUnboundedInputOperator.class); + + om = dag.getOperatorMeta("ParDo(Add)"); + Assert.assertNotNull(om); + Assert.assertEquals(om.getOperator().getClass(), ApexParDoOperator.class); + + long timeout = System.currentTimeMillis() + TIMEOUT_MILLIS; + while (System.currentTimeMillis() < timeout) { + if (EmbeddedCollector.RESULTS.containsAll(expected)) { + break; + } + LOG.info("Waiting for expected results."); + Thread.sleep(SLEEP_MILLIS); + } + Assert.assertEquals(Sets.newHashSet(expected), EmbeddedCollector.RESULTS); + } + + @SuppressWarnings("serial") + private static class Add extends OldDoFn<Integer, Integer> { + private Integer number; + private PCollectionView<Integer> sideInputView; + + private Add(Integer number) { + this.number = number; + } + + private Add(PCollectionView<Integer> sideInputView) { + this.sideInputView = sideInputView; + } + + @Override + public void processElement(ProcessContext c) throws Exception { + if (sideInputView != null) { + number = c.sideInput(sideInputView); + } + c.output(c.element() + number); + } + } + + private static class EmbeddedCollector extends OldDoFn<Object, Void> { + private static final long serialVersionUID = 1L; + protected static final HashSet<Object> RESULTS = new HashSet<>(); + + public EmbeddedCollector() { + RESULTS.clear(); + } + + @Override + public void processElement(ProcessContext c) throws Exception { + RESULTS.add(c.element()); + } + } + + private static Throwable runExpectingAssertionFailure(Pipeline pipeline) { + // We cannot use thrown.expect(AssertionError.class) because the AssertionError + // is first caught by JUnit and causes a test failure. + try { + pipeline.run(); + } catch (AssertionError exc) { + return exc; + } + fail("assertion should have failed"); + throw new RuntimeException("unreachable"); + } + + @Test + public void testAssertionFailure() throws Exception { + ApexPipelineOptions options = PipelineOptionsFactory.create() + .as(ApexPipelineOptions.class); + options.setRunner(TestApexRunner.class); + Pipeline pipeline = Pipeline.create(options); + + PCollection<Integer> pcollection = pipeline + .apply(Create.of(1, 2, 3, 4)); + PAssert.that(pcollection).containsInAnyOrder(2, 1, 4, 3, 7); + + Throwable exc = runExpectingAssertionFailure(pipeline); + Pattern expectedPattern = Pattern.compile( + "Expected: iterable over \\[((<4>|<7>|<3>|<2>|<1>)(, )?){5}\\] in any order"); + // A loose pattern, but should get the job done. + assertTrue( + "Expected error message from PAssert with substring matching " + + expectedPattern + + " but the message was \"" + + exc.getMessage() + + "\"", + expectedPattern.matcher(exc.getMessage()).find()); + } + + @Test + public void testContainsInAnyOrder() throws Exception { + ApexPipelineOptions options = PipelineOptionsFactory.create().as(ApexPipelineOptions.class); + options.setRunner(TestApexRunner.class); + Pipeline pipeline = Pipeline.create(options); + PCollection<Integer> pcollection = pipeline.apply(Create.of(1, 2, 3, 4)); + PAssert.that(pcollection).containsInAnyOrder(2, 1, 4, 3); + // TODO: terminate faster based on processed assertion vs. auto-shutdown + pipeline.run(); + } + + @Test + public void testSerialization() throws Exception { + ApexPipelineOptions options = PipelineOptionsFactory.create() + .as(ApexPipelineOptions.class); + options.setRunner(TestApexRunner.class); + Pipeline pipeline = Pipeline.create(options); + Coder<WindowedValue<Integer>> coder = WindowedValue.getValueOnlyCoder(VarIntCoder.of()); + + PCollectionView<Integer> singletonView = pipeline.apply(Create.of(1)) + .apply(Sum.integersGlobally().asSingletonView()); + + ApexParDoOperator<Integer, Integer> operator = new ApexParDoOperator<>(options, + new Add(singletonView), new TupleTag<Integer>(), TupleTagList.empty().getAll(), + WindowingStrategy.globalDefault(), + Collections.<PCollectionView<?>>singletonList(singletonView), + coder, + new ApexStateInternals.ApexStateInternalsFactory<Void>() + ); + operator.setup(null); + operator.beginWindow(0); + WindowedValue<Integer> wv1 = WindowedValue.valueInGlobalWindow(1); + WindowedValue<Iterable<?>> sideInput = WindowedValue.<Iterable<?>>valueInGlobalWindow( + Lists.<Integer>newArrayList(22)); + operator.input.process(ApexStreamTuple.DataTuple.of(wv1)); // pushed back input + + final List<Object> results = Lists.newArrayList(); + Sink<Object> sink = new Sink<Object>() { + @Override + public void put(Object tuple) { + results.add(tuple); + } + @Override + public int getCount(boolean reset) { + return 0; + } + }; + + // verify pushed back input checkpointing + Assert.assertNotNull("Serialization", operator = KryoCloneUtils.cloneObject(operator)); + operator.output.setSink(sink); + operator.setup(null); + operator.beginWindow(1); + WindowedValue<Integer> wv2 = WindowedValue.valueInGlobalWindow(2); + operator.sideInput1.process(ApexStreamTuple.DataTuple.of(sideInput)); + Assert.assertEquals("number outputs", 1, results.size()); + Assert.assertEquals("result", WindowedValue.valueInGlobalWindow(23), + ((ApexStreamTuple.DataTuple) results.get(0)).getValue()); + + // verify side input checkpointing + results.clear(); + Assert.assertNotNull("Serialization", operator = KryoCloneUtils.cloneObject(operator)); + operator.output.setSink(sink); + operator.setup(null); + operator.beginWindow(2); + operator.input.process(ApexStreamTuple.DataTuple.of(wv2)); + Assert.assertEquals("number outputs", 1, results.size()); + Assert.assertEquals("result", WindowedValue.valueInGlobalWindow(24), + ((ApexStreamTuple.DataTuple) results.get(0)).getValue()); + } + + @Test + public void testMultiOutputParDoWithSideInputs() throws Exception { + ApexPipelineOptions options = PipelineOptionsFactory.create().as(ApexPipelineOptions.class); + options.setRunner(ApexRunner.class); // non-blocking run + Pipeline pipeline = Pipeline.create(options); + + List<Integer> inputs = Arrays.asList(3, -42, 666); + final TupleTag<String> mainOutputTag = new TupleTag<>("main"); + final TupleTag<Void> sideOutputTag = new TupleTag<>("sideOutput"); + + PCollectionView<Integer> sideInput1 = pipeline + .apply("CreateSideInput1", Create.of(11)) + .apply("ViewSideInput1", View.<Integer>asSingleton()); + PCollectionView<Integer> sideInputUnread = pipeline + .apply("CreateSideInputUnread", Create.of(-3333)) + .apply("ViewSideInputUnread", View.<Integer>asSingleton()); + PCollectionView<Integer> sideInput2 = pipeline + .apply("CreateSideInput2", Create.of(222)) + .apply("ViewSideInput2", View.<Integer>asSingleton()); + + PCollectionTuple outputs = pipeline + .apply(Create.of(inputs)) + .apply(ParDo.withSideInputs(sideInput1) + .withSideInputs(sideInputUnread) + .withSideInputs(sideInput2) + .withOutputTags(mainOutputTag, TupleTagList.of(sideOutputTag)) + .of(new TestMultiOutputWithSideInputsFn( + Arrays.asList(sideInput1, sideInput2), + Arrays.<TupleTag<String>>asList()))); + + outputs.get(mainOutputTag).apply(ParDo.of(new EmbeddedCollector())); + ApexRunnerResult result = (ApexRunnerResult) pipeline.run(); + + HashSet<String> expected = Sets.newHashSet("processing: 3: [11, 222]", + "processing: -42: [11, 222]", "processing: 666: [11, 222]"); + long timeout = System.currentTimeMillis() + TIMEOUT_MILLIS; + while (System.currentTimeMillis() < timeout) { + if (EmbeddedCollector.RESULTS.containsAll(expected)) { + break; + } + LOG.info("Waiting for expected results."); + Thread.sleep(SLEEP_MILLIS); + } + result.cancel(); + Assert.assertEquals(Sets.newHashSet(expected), EmbeddedCollector.RESULTS); + } + + private static class TestMultiOutputWithSideInputsFn extends OldDoFn<Integer, String> { + private static final long serialVersionUID = 1L; + + final List<PCollectionView<Integer>> sideInputViews = new ArrayList<>(); + final List<TupleTag<String>> sideOutputTupleTags = new ArrayList<>(); + + public TestMultiOutputWithSideInputsFn(List<PCollectionView<Integer>> sideInputViews, + List<TupleTag<String>> sideOutputTupleTags) { + this.sideInputViews.addAll(sideInputViews); + this.sideOutputTupleTags.addAll(sideOutputTupleTags); + } + + @Override + public void processElement(ProcessContext c) throws Exception { + outputToAllWithSideInputs(c, "processing: " + c.element()); + } + + private void outputToAllWithSideInputs(ProcessContext c, String value) { + if (!sideInputViews.isEmpty()) { + List<Integer> sideInputValues = new ArrayList<>(); + for (PCollectionView<Integer> sideInputView : sideInputViews) { + sideInputValues.add(c.sideInput(sideInputView)); + } + value += ": " + sideInputValues; + } + c.output(value); + for (TupleTag<String> sideOutputTupleTag : sideOutputTupleTags) { + c.sideOutput(sideOutputTupleTag, + sideOutputTupleTag.getId() + ": " + value); + } + } + + } + +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5553c603/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ReadUnboundTranslatorTest.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ReadUnboundTranslatorTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ReadUnboundTranslatorTest.java new file mode 100644 index 0000000..96ba663 --- /dev/null +++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ReadUnboundTranslatorTest.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.apex.translation; + +import com.datatorrent.api.DAG; +import com.google.common.collect.ContiguousSet; +import com.google.common.collect.DiscreteDomain; +import com.google.common.collect.Lists; +import com.google.common.collect.Range; +import com.google.common.collect.Sets; + +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import org.apache.beam.runners.apex.ApexPipelineOptions; +import org.apache.beam.runners.apex.ApexRunner; +import org.apache.beam.runners.apex.ApexRunnerResult; +import org.apache.beam.runners.apex.translation.operators.ApexReadUnboundedInputOperator; +import org.apache.beam.runners.apex.translation.utils.CollectionSource; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.io.CountingSource; +import org.apache.beam.sdk.io.Read; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.transforms.OldDoFn; +import org.apache.beam.sdk.transforms.ParDo; +import org.junit.Assert; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * integration test for {@link ReadUnboundedTranslator}. + */ +public class ReadUnboundTranslatorTest { + private static final Logger LOG = LoggerFactory.getLogger(ReadUnboundTranslatorTest.class); + + @Test + public void test() throws Exception { + ApexPipelineOptions options = PipelineOptionsFactory.create() + .as(ApexPipelineOptions.class); + EmbeddedCollector.RESULTS.clear(); + options.setApplicationName("ReadUnbound"); + options.setRunner(ApexRunner.class); + Pipeline p = Pipeline.create(options); + + List<String> collection = Lists.newArrayList("1", "2", "3", "4", "5"); + CollectionSource<String> source = new CollectionSource<>(collection, StringUtf8Coder.of()); + p.apply(Read.from(source)) + .apply(ParDo.of(new EmbeddedCollector())); + + ApexRunnerResult result = (ApexRunnerResult) p.run(); + DAG dag = result.getApexDAG(); + DAG.OperatorMeta om = dag.getOperatorMeta("Read(CollectionSource)"); + Assert.assertNotNull(om); + Assert.assertEquals(om.getOperator().getClass(), ApexReadUnboundedInputOperator.class); + + long timeout = System.currentTimeMillis() + 30000; + while (System.currentTimeMillis() < timeout) { + if (EmbeddedCollector.RESULTS.containsAll(collection)) { + break; + } + LOG.info("Waiting for expected results."); + Thread.sleep(1000); + } + Assert.assertEquals(Sets.newHashSet(collection), EmbeddedCollector.RESULTS); + } + + @Test + public void testReadBounded() throws Exception { + ApexPipelineOptions options = PipelineOptionsFactory.create() + .as(ApexPipelineOptions.class); + EmbeddedCollector.RESULTS.clear(); + options.setApplicationName("ReadBounded"); + options.setRunner(ApexRunner.class); + Pipeline p = Pipeline.create(options); + + Set<Long> expected = ContiguousSet.create(Range.closedOpen(0L, 10L), DiscreteDomain.longs()); + p.apply(Read.from(CountingSource.upTo(10))) + .apply(ParDo.of(new EmbeddedCollector())); + + ApexRunnerResult result = (ApexRunnerResult) p.run(); + DAG dag = result.getApexDAG(); + DAG.OperatorMeta om = dag.getOperatorMeta("Read(BoundedCountingSource)"); + Assert.assertNotNull(om); + Assert.assertEquals(om.getOperator().getClass(), ApexReadUnboundedInputOperator.class); + + long timeout = System.currentTimeMillis() + 30000; + while (System.currentTimeMillis() < timeout) { + if (EmbeddedCollector.RESULTS.containsAll(expected)) { + break; + } + LOG.info("Waiting for expected results."); + Thread.sleep(1000); + } + Assert.assertEquals(Sets.newHashSet(expected), EmbeddedCollector.RESULTS); + } + + @SuppressWarnings("serial") + private static class EmbeddedCollector extends OldDoFn<Object, Void> { + protected static final HashSet<Object> RESULTS = new HashSet<>(); + + public EmbeddedCollector() { + } + + @Override + public void processElement(ProcessContext c) throws Exception { + RESULTS.add(c.element()); + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5553c603/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternalsTest.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternalsTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternalsTest.java new file mode 100644 index 0000000..1801358 --- /dev/null +++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternalsTest.java @@ -0,0 +1,361 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.apex.translation.utils; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertThat; + +import com.datatorrent.lib.util.KryoCloneUtils; + +import java.util.Arrays; + +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.coders.VarIntCoder; +import org.apache.beam.sdk.transforms.Sum; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.IntervalWindow; +import org.apache.beam.sdk.transforms.windowing.OutputTimeFns; +import org.apache.beam.sdk.util.state.AccumulatorCombiningState; +import org.apache.beam.sdk.util.state.BagState; +import org.apache.beam.sdk.util.state.CombiningState; +import org.apache.beam.sdk.util.state.ReadableState; +import org.apache.beam.sdk.util.state.StateMerging; +import org.apache.beam.sdk.util.state.StateNamespace; +import org.apache.beam.sdk.util.state.StateNamespaceForTest; +import org.apache.beam.sdk.util.state.StateTag; +import org.apache.beam.sdk.util.state.StateTags; +import org.apache.beam.sdk.util.state.ValueState; +import org.apache.beam.sdk.util.state.WatermarkHoldState; +import org.hamcrest.Matchers; +import org.joda.time.Instant; +import org.junit.Before; +import org.junit.Test; + +/** + * Tests for {@link ApexStateInternals}. This is based on the tests for + * {@code InMemoryStateInternals}. + */ +public class ApexStateInternalsTest { + private static final BoundedWindow WINDOW_1 = new IntervalWindow(new Instant(0), new Instant(10)); + private static final StateNamespace NAMESPACE_1 = new StateNamespaceForTest("ns1"); + private static final StateNamespace NAMESPACE_2 = new StateNamespaceForTest("ns2"); + private static final StateNamespace NAMESPACE_3 = new StateNamespaceForTest("ns3"); + + private static final StateTag<Object, ValueState<String>> STRING_VALUE_ADDR = + StateTags.value("stringValue", StringUtf8Coder.of()); + private static final StateTag<Object, AccumulatorCombiningState<Integer, int[], Integer>> + SUM_INTEGER_ADDR = StateTags.combiningValueFromInputInternal( + "sumInteger", VarIntCoder.of(), new Sum.SumIntegerFn()); + private static final StateTag<Object, BagState<String>> STRING_BAG_ADDR = + StateTags.bag("stringBag", StringUtf8Coder.of()); + private static final StateTag<Object, WatermarkHoldState<BoundedWindow>> + WATERMARK_EARLIEST_ADDR = + StateTags.watermarkStateInternal("watermark", OutputTimeFns.outputAtEarliestInputTimestamp()); + private static final StateTag<Object, WatermarkHoldState<BoundedWindow>> + WATERMARK_LATEST_ADDR = + StateTags.watermarkStateInternal("watermark", OutputTimeFns.outputAtLatestInputTimestamp()); + private static final StateTag<Object, WatermarkHoldState<BoundedWindow>> WATERMARK_EOW_ADDR = + StateTags.watermarkStateInternal("watermark", OutputTimeFns.outputAtEndOfWindow()); + + private ApexStateInternals<String> underTest; + + @Before + public void initStateInternals() { + underTest = new ApexStateInternals<>(null); + } + + @Test + public void testBag() throws Exception { + BagState<String> value = underTest.state(NAMESPACE_1, STRING_BAG_ADDR); + + assertEquals(value, underTest.state(NAMESPACE_1, STRING_BAG_ADDR)); + assertFalse(value.equals(underTest.state(NAMESPACE_2, STRING_BAG_ADDR))); + + assertThat(value.read(), Matchers.emptyIterable()); + value.add("hello"); + assertThat(value.read(), Matchers.containsInAnyOrder("hello")); + + value.add("world"); + assertThat(value.read(), Matchers.containsInAnyOrder("hello", "world")); + + value.clear(); + assertThat(value.read(), Matchers.emptyIterable()); + assertEquals(underTest.state(NAMESPACE_1, STRING_BAG_ADDR), value); + + } + + @Test + public void testBagIsEmpty() throws Exception { + BagState<String> value = underTest.state(NAMESPACE_1, STRING_BAG_ADDR); + + assertThat(value.isEmpty().read(), Matchers.is(true)); + ReadableState<Boolean> readFuture = value.isEmpty(); + value.add("hello"); + assertThat(readFuture.read(), Matchers.is(false)); + + value.clear(); + assertThat(readFuture.read(), Matchers.is(true)); + } + + @Test + public void testMergeBagIntoSource() throws Exception { + BagState<String> bag1 = underTest.state(NAMESPACE_1, STRING_BAG_ADDR); + BagState<String> bag2 = underTest.state(NAMESPACE_2, STRING_BAG_ADDR); + + bag1.add("Hello"); + bag2.add("World"); + bag1.add("!"); + + StateMerging.mergeBags(Arrays.asList(bag1, bag2), bag1); + + // Reading the merged bag gets both the contents + assertThat(bag1.read(), Matchers.containsInAnyOrder("Hello", "World", "!")); + assertThat(bag2.read(), Matchers.emptyIterable()); + } + + @Test + public void testMergeBagIntoNewNamespace() throws Exception { + BagState<String> bag1 = underTest.state(NAMESPACE_1, STRING_BAG_ADDR); + BagState<String> bag2 = underTest.state(NAMESPACE_2, STRING_BAG_ADDR); + BagState<String> bag3 = underTest.state(NAMESPACE_3, STRING_BAG_ADDR); + + bag1.add("Hello"); + bag2.add("World"); + bag1.add("!"); + + StateMerging.mergeBags(Arrays.asList(bag1, bag2, bag3), bag3); + + // Reading the merged bag gets both the contents + assertThat(bag3.read(), Matchers.containsInAnyOrder("Hello", "World", "!")); + assertThat(bag1.read(), Matchers.emptyIterable()); + assertThat(bag2.read(), Matchers.emptyIterable()); + } + + @Test + public void testCombiningValue() throws Exception { + CombiningState<Integer, Integer> value = underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR); + + // State instances are cached, but depend on the namespace. + assertEquals(value, underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR)); + assertFalse(value.equals(underTest.state(NAMESPACE_2, SUM_INTEGER_ADDR))); + + assertThat(value.read(), Matchers.equalTo(0)); + value.add(2); + assertThat(value.read(), Matchers.equalTo(2)); + + value.add(3); + assertThat(value.read(), Matchers.equalTo(5)); + + value.clear(); + assertThat(value.read(), Matchers.equalTo(0)); + assertEquals(underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR), value); + } + + @Test + public void testCombiningIsEmpty() throws Exception { + CombiningState<Integer, Integer> value = underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR); + + assertThat(value.isEmpty().read(), Matchers.is(true)); + ReadableState<Boolean> readFuture = value.isEmpty(); + value.add(5); + assertThat(readFuture.read(), Matchers.is(false)); + + value.clear(); + assertThat(readFuture.read(), Matchers.is(true)); + } + + @Test + public void testMergeCombiningValueIntoSource() throws Exception { + AccumulatorCombiningState<Integer, int[], Integer> value1 = + underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR); + AccumulatorCombiningState<Integer, int[], Integer> value2 = + underTest.state(NAMESPACE_2, SUM_INTEGER_ADDR); + + value1.add(5); + value2.add(10); + value1.add(6); + + assertThat(value1.read(), Matchers.equalTo(11)); + assertThat(value2.read(), Matchers.equalTo(10)); + + // Merging clears the old values and updates the result value. + StateMerging.mergeCombiningValues(Arrays.asList(value1, value2), value1); + + assertThat(value1.read(), Matchers.equalTo(21)); + assertThat(value2.read(), Matchers.equalTo(0)); + } + + @Test + public void testMergeCombiningValueIntoNewNamespace() throws Exception { + AccumulatorCombiningState<Integer, int[], Integer> value1 = + underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR); + AccumulatorCombiningState<Integer, int[], Integer> value2 = + underTest.state(NAMESPACE_2, SUM_INTEGER_ADDR); + AccumulatorCombiningState<Integer, int[], Integer> value3 = + underTest.state(NAMESPACE_3, SUM_INTEGER_ADDR); + + value1.add(5); + value2.add(10); + value1.add(6); + + StateMerging.mergeCombiningValues(Arrays.asList(value1, value2), value3); + + // Merging clears the old values and updates the result value. + assertThat(value1.read(), Matchers.equalTo(0)); + assertThat(value2.read(), Matchers.equalTo(0)); + assertThat(value3.read(), Matchers.equalTo(21)); + } + + @Test + public void testWatermarkEarliestState() throws Exception { + WatermarkHoldState<BoundedWindow> value = + underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR); + + // State instances are cached, but depend on the namespace. + assertEquals(value, underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR)); + assertFalse(value.equals(underTest.state(NAMESPACE_2, WATERMARK_EARLIEST_ADDR))); + + assertThat(value.read(), Matchers.nullValue()); + value.add(new Instant(2000)); + assertThat(value.read(), Matchers.equalTo(new Instant(2000))); + + value.add(new Instant(3000)); + assertThat(value.read(), Matchers.equalTo(new Instant(2000))); + + value.add(new Instant(1000)); + assertThat(value.read(), Matchers.equalTo(new Instant(1000))); + + value.clear(); + assertThat(value.read(), Matchers.equalTo(null)); + assertEquals(underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR), value); + } + + @Test + public void testWatermarkLatestState() throws Exception { + WatermarkHoldState<BoundedWindow> value = + underTest.state(NAMESPACE_1, WATERMARK_LATEST_ADDR); + + // State instances are cached, but depend on the namespace. + assertEquals(value, underTest.state(NAMESPACE_1, WATERMARK_LATEST_ADDR)); + assertFalse(value.equals(underTest.state(NAMESPACE_2, WATERMARK_LATEST_ADDR))); + + assertThat(value.read(), Matchers.nullValue()); + value.add(new Instant(2000)); + assertThat(value.read(), Matchers.equalTo(new Instant(2000))); + + value.add(new Instant(3000)); + assertThat(value.read(), Matchers.equalTo(new Instant(3000))); + + value.add(new Instant(1000)); + assertThat(value.read(), Matchers.equalTo(new Instant(3000))); + + value.clear(); + assertThat(value.read(), Matchers.equalTo(null)); + assertEquals(underTest.state(NAMESPACE_1, WATERMARK_LATEST_ADDR), value); + } + + @Test + public void testWatermarkEndOfWindowState() throws Exception { + WatermarkHoldState<BoundedWindow> value = underTest.state(NAMESPACE_1, WATERMARK_EOW_ADDR); + + // State instances are cached, but depend on the namespace. + assertEquals(value, underTest.state(NAMESPACE_1, WATERMARK_EOW_ADDR)); + assertFalse(value.equals(underTest.state(NAMESPACE_2, WATERMARK_EOW_ADDR))); + + assertThat(value.read(), Matchers.nullValue()); + value.add(new Instant(2000)); + assertThat(value.read(), Matchers.equalTo(new Instant(2000))); + + value.clear(); + assertThat(value.read(), Matchers.equalTo(null)); + assertEquals(underTest.state(NAMESPACE_1, WATERMARK_EOW_ADDR), value); + } + + @Test + public void testWatermarkStateIsEmpty() throws Exception { + WatermarkHoldState<BoundedWindow> value = + underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR); + + assertThat(value.isEmpty().read(), Matchers.is(true)); + ReadableState<Boolean> readFuture = value.isEmpty(); + value.add(new Instant(1000)); + assertThat(readFuture.read(), Matchers.is(false)); + + value.clear(); + assertThat(readFuture.read(), Matchers.is(true)); + } + + @Test + public void testMergeEarliestWatermarkIntoSource() throws Exception { + WatermarkHoldState<BoundedWindow> value1 = + underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR); + WatermarkHoldState<BoundedWindow> value2 = + underTest.state(NAMESPACE_2, WATERMARK_EARLIEST_ADDR); + + value1.add(new Instant(3000)); + value2.add(new Instant(5000)); + value1.add(new Instant(4000)); + value2.add(new Instant(2000)); + + // Merging clears the old values and updates the merged value. + StateMerging.mergeWatermarks(Arrays.asList(value1, value2), value1, WINDOW_1); + + assertThat(value1.read(), Matchers.equalTo(new Instant(2000))); + assertThat(value2.read(), Matchers.equalTo(null)); + } + + @Test + public void testMergeLatestWatermarkIntoSource() throws Exception { + WatermarkHoldState<BoundedWindow> value1 = + underTest.state(NAMESPACE_1, WATERMARK_LATEST_ADDR); + WatermarkHoldState<BoundedWindow> value2 = + underTest.state(NAMESPACE_2, WATERMARK_LATEST_ADDR); + WatermarkHoldState<BoundedWindow> value3 = + underTest.state(NAMESPACE_3, WATERMARK_LATEST_ADDR); + + value1.add(new Instant(3000)); + value2.add(new Instant(5000)); + value1.add(new Instant(4000)); + value2.add(new Instant(2000)); + + // Merging clears the old values and updates the result value. + StateMerging.mergeWatermarks(Arrays.asList(value1, value2), value3, WINDOW_1); + + // Merging clears the old values and updates the result value. + assertThat(value3.read(), Matchers.equalTo(new Instant(5000))); + assertThat(value1.read(), Matchers.equalTo(null)); + assertThat(value2.read(), Matchers.equalTo(null)); + } + + @Test + public void testSerialization() throws Exception { + ApexStateInternals<String> original = new ApexStateInternals<String>(null); + ValueState<String> value = original.state(NAMESPACE_1, STRING_VALUE_ADDR); + assertEquals(original.state(NAMESPACE_1, STRING_VALUE_ADDR), value); + value.write("hello"); + + ApexStateInternals<String> cloned; + assertNotNull("Serialization", cloned = KryoCloneUtils.cloneObject(original)); + ValueState<String> clonedValue = cloned.state(NAMESPACE_1, STRING_VALUE_ADDR); + assertThat(clonedValue.read(), Matchers.equalTo("hello")); + assertEquals(cloned.state(NAMESPACE_1, STRING_VALUE_ADDR), value); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5553c603/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/CollectionSource.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/CollectionSource.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/CollectionSource.java new file mode 100644 index 0000000..c3b35f9 --- /dev/null +++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/CollectionSource.java @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.apex.translation.utils; + +import java.io.IOException; +import java.io.Serializable; +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; + +import javax.annotation.Nullable; + +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.io.UnboundedSource; +import org.apache.beam.sdk.options.PipelineOptions; +import org.joda.time.Instant; + +/** + * collection as {@link UnboundedSource}, used for tests. + */ +public class CollectionSource<T> extends UnboundedSource<T, UnboundedSource.CheckpointMark> { + private static final long serialVersionUID = 1L; + private final Collection<T> collection; + private final Coder<T> coder; + + public CollectionSource(Collection<T> collection, Coder<T> coder) { + this.collection = collection; + this.coder = coder; + } + + @Override + public List<? extends UnboundedSource<T, CheckpointMark>> generateInitialSplits( + int desiredNumSplits, PipelineOptions options) throws Exception { + return Collections.singletonList(this); + } + + @Override + public UnboundedReader<T> createReader(PipelineOptions options, + @Nullable UnboundedSource.CheckpointMark checkpointMark) { + return new CollectionReader<>(collection, this); + } + + @Nullable + @Override + public Coder<CheckpointMark> getCheckpointMarkCoder() { + return null; + } + + @Override + public void validate() { + } + + @Override + public Coder<T> getDefaultOutputCoder() { + return coder; + } + + private static class CollectionReader<T> extends UnboundedSource.UnboundedReader<T> + implements Serializable { + + private T current; + private final CollectionSource<T> source; + private final Collection<T> collection; + private Iterator<T> iterator; + + public CollectionReader(Collection<T> collection, CollectionSource<T> source) { + this.collection = collection; + this.source = source; + } + + @Override + public boolean start() throws IOException { + if (null == iterator) { + iterator = collection.iterator(); + } + return advance(); + } + + @Override + public boolean advance() throws IOException { + if (iterator.hasNext()) { + current = iterator.next(); + return true; + } else { + return false; + } + } + + @Override + public Instant getWatermark() { + return Instant.now(); + } + + @Override + public UnboundedSource.CheckpointMark getCheckpointMark() { + return null; + } + + @Override + public UnboundedSource<T, ?> getCurrentSource() { + return source; + } + + @Override + public T getCurrent() throws NoSuchElementException { + return current; + } + + @Override + public Instant getCurrentTimestamp() throws NoSuchElementException { + return Instant.now(); + } + + @Override + public void close() throws IOException { + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5553c603/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/PipelineOptionsTest.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/PipelineOptionsTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/PipelineOptionsTest.java new file mode 100644 index 0000000..d5eb9a9 --- /dev/null +++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/PipelineOptionsTest.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.apex.translation.utils; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; + +import com.datatorrent.common.util.FSStorageAgent; +import com.esotericsoftware.kryo.serializers.FieldSerializer.Bind; +import com.esotericsoftware.kryo.serializers.JavaSerializer; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; + +import org.apache.beam.runners.apex.ApexPipelineOptions; +import org.apache.beam.sdk.options.Default; +import org.apache.beam.sdk.options.Description; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.junit.BeforeClass; +import org.junit.Test; + +/** + * Tests the serialization of PipelineOptions. + */ +public class PipelineOptionsTest { + + /** + * Interface for testing. + */ + public interface MyOptions extends ApexPipelineOptions { + @Description("Bla bla bla") + @Default.String("Hello") + String getTestOption(); + void setTestOption(String value); + } + + private static class MyOptionsWrapper { + private MyOptionsWrapper() { + this(null); // required for Kryo + } + private MyOptionsWrapper(ApexPipelineOptions options) { + this.options = new SerializablePipelineOptions(options); + } + @Bind(JavaSerializer.class) + private final SerializablePipelineOptions options; + } + + private static MyOptions options; + + private static final String[] args = new String[]{"--testOption=nothing"}; + + @BeforeClass + public static void beforeTest() { + options = PipelineOptionsFactory.fromArgs(args).as(MyOptions.class); + } + + @Test + public void testSerialization() { + MyOptionsWrapper wrapper = new MyOptionsWrapper(PipelineOptionsTest.options); + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + FSStorageAgent.store(bos, wrapper); + + ByteArrayInputStream bis = new ByteArrayInputStream(bos.toByteArray()); + MyOptionsWrapper wrapperCopy = (MyOptionsWrapper) FSStorageAgent.retrieve(bis); + assertNotNull(wrapperCopy.options); + assertEquals("nothing", wrapperCopy.options.get().as(MyOptions.class).getTestOption()); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5553c603/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/ApexGroupByKeyOperatorTest.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/ApexGroupByKeyOperatorTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/ApexGroupByKeyOperatorTest.java deleted file mode 100644 index 3e8d575..0000000 --- a/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/ApexGroupByKeyOperatorTest.java +++ /dev/null @@ -1,112 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.apex.translators; - -import com.datatorrent.api.Sink; -import com.google.common.collect.Lists; - -import java.util.List; - -import org.apache.beam.runners.apex.ApexPipelineOptions; -import org.apache.beam.runners.apex.TestApexRunner; -import org.apache.beam.runners.apex.translators.functions.ApexGroupByKeyOperator; -import org.apache.beam.runners.apex.translators.utils.ApexStateInternals; -import org.apache.beam.runners.apex.translators.utils.ApexStreamTuple; -import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.coders.KvCoder; -import org.apache.beam.sdk.coders.StringUtf8Coder; -import org.apache.beam.sdk.coders.VarIntCoder; -import org.apache.beam.sdk.options.PipelineOptionsFactory; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.transforms.windowing.FixedWindows; -import org.apache.beam.sdk.transforms.windowing.IntervalWindow; -import org.apache.beam.sdk.transforms.windowing.PaneInfo; -import org.apache.beam.sdk.util.WindowedValue; -import org.apache.beam.sdk.util.WindowingStrategy; -import org.apache.beam.sdk.values.KV; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PCollection.IsBounded; -import org.joda.time.Duration; -import org.joda.time.Instant; -import org.junit.Assert; -import org.junit.Test; - -/** - * Test for {@link ApexGroupByKeyOperator}. - */ -public class ApexGroupByKeyOperatorTest { - - @Test - public void testGlobalWindowMinTimestamp() throws Exception { - ApexPipelineOptions options = PipelineOptionsFactory.create() - .as(ApexPipelineOptions.class); - options.setRunner(TestApexRunner.class); - Pipeline pipeline = Pipeline.create(options); - - WindowingStrategy<?, ?> ws = WindowingStrategy.of(FixedWindows.of( - Duration.standardSeconds(10))); - PCollection<KV<String, Integer>> input = PCollection.createPrimitiveOutputInternal(pipeline, - ws, IsBounded.BOUNDED); - input.setCoder(KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of())); - - ApexGroupByKeyOperator<String, Integer> operator = new ApexGroupByKeyOperator<>(options, - input, new ApexStateInternals.ApexStateInternalsFactory<String>() - ); - - final List<Object> results = Lists.newArrayList(); - Sink<Object> sink = new Sink<Object>() { - @Override - public void put(Object tuple) { - results.add(tuple); - } - @Override - public int getCount(boolean reset) { - return 0; - } - }; - operator.output.setSink(sink); - operator.setup(null); - operator.beginWindow(1); - - Instant windowStart = BoundedWindow.TIMESTAMP_MIN_VALUE; - BoundedWindow window = new IntervalWindow(windowStart, windowStart.plus(10000)); - PaneInfo paneInfo = PaneInfo.NO_FIRING; - - WindowedValue<KV<String, Integer>> wv1 = - WindowedValue.of(KV.of("foo", 1), windowStart, window, paneInfo); - operator.input.process(ApexStreamTuple.DataTuple.of(wv1)); - - WindowedValue<KV<String, Integer>> wv2 = - WindowedValue.of(KV.of("foo", 1), windowStart, window, paneInfo); - operator.input.process(ApexStreamTuple.DataTuple.of(wv2)); - - ApexStreamTuple<WindowedValue<KV<String, Integer>>> watermark = - ApexStreamTuple.WatermarkTuple.of(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()); - - Assert.assertEquals("number outputs", 0, results.size()); - operator.input.process(watermark); - Assert.assertEquals("number outputs", 2, results.size()); - @SuppressWarnings({ "unchecked", "rawtypes" }) - ApexStreamTuple.DataTuple<WindowedValue<KV<String, Iterable<Integer>>>> dataTuple = - (ApexStreamTuple.DataTuple) results.get(0); - List<Integer> counts = Lists.newArrayList(1, 1); - Assert.assertEquals("iterable", KV.of("foo", counts), dataTuple.getValue().getValue()); - Assert.assertEquals("expected watermark", watermark, results.get(1)); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5553c603/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/FlattenPCollectionTranslatorTest.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/FlattenPCollectionTranslatorTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/FlattenPCollectionTranslatorTest.java deleted file mode 100644 index 7defc77..0000000 --- a/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/FlattenPCollectionTranslatorTest.java +++ /dev/null @@ -1,99 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.runners.apex.translators; - -import com.google.common.collect.Sets; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.Set; - -import org.apache.beam.runners.apex.ApexPipelineOptions; -import org.apache.beam.runners.apex.ApexRunner; -import org.apache.beam.runners.apex.ApexRunnerResult; -import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.coders.StringUtf8Coder; -import org.apache.beam.sdk.options.PipelineOptionsFactory; -import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.transforms.Flatten; -import org.apache.beam.sdk.transforms.OldDoFn; -import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PCollectionList; -import org.junit.Assert; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Integration test for {@link FlattenPCollectionTranslator}. - */ -public class FlattenPCollectionTranslatorTest { - private static final Logger LOG = LoggerFactory.getLogger(FlattenPCollectionTranslatorTest.class); - - @Test - public void test() throws Exception { - ApexPipelineOptions options = PipelineOptionsFactory.as(ApexPipelineOptions.class); - options.setApplicationName("FlattenPCollection"); - options.setRunner(ApexRunner.class); - Pipeline p = Pipeline.create(options); - - String[][] collections = { - {"1"}, {"2"}, {"3"}, {"4"}, {"5"} - }; - - Set<String> expected = Sets.newHashSet(); - List<PCollection<String>> pcList = new ArrayList<PCollection<String>>(); - for (String[] collection : collections) { - pcList.add(p.apply(Create.of(collection).withCoder(StringUtf8Coder.of()))); - expected.addAll(Arrays.asList(collection)); - } - - PCollection<String> actual = PCollectionList.of(pcList).apply(Flatten.<String>pCollections()); - actual.apply(ParDo.of(new EmbeddedCollector())); - - ApexRunnerResult result = (ApexRunnerResult) p.run(); - // TODO: verify translation - result.getApexDAG(); - long timeout = System.currentTimeMillis() + 30000; - while (System.currentTimeMillis() < timeout - && EmbeddedCollector.RESULTS.size() < expected.size()) { - LOG.info("Waiting for expected results."); - Thread.sleep(500); - } - - Assert.assertEquals("number results", expected.size(), EmbeddedCollector.RESULTS.size()); - Assert.assertEquals(expected, Sets.newHashSet(EmbeddedCollector.RESULTS)); - } - - @SuppressWarnings("serial") - private static class EmbeddedCollector extends OldDoFn<Object, Void> { - protected static final ArrayList<Object> RESULTS = new ArrayList<>(); - - public EmbeddedCollector() { - } - - @Override - public void processElement(ProcessContext c) throws Exception { - RESULTS.add(c.element()); - } - } - -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5553c603/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/GroupByKeyTranslatorTest.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/GroupByKeyTranslatorTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/GroupByKeyTranslatorTest.java deleted file mode 100644 index e67e29e..0000000 --- a/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/GroupByKeyTranslatorTest.java +++ /dev/null @@ -1,246 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.runners.apex.translators; - -import com.google.common.collect.Lists; -import com.google.common.collect.Sets; - -import java.io.IOException; -import java.io.Serializable; -import java.util.Collections; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.NoSuchElementException; - -import javax.annotation.Nullable; - -import org.apache.beam.runners.apex.ApexPipelineOptions; -import org.apache.beam.runners.apex.ApexRunner; -import org.apache.beam.runners.apex.ApexRunnerResult; -import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.StringUtf8Coder; -import org.apache.beam.sdk.io.Read; -import org.apache.beam.sdk.io.UnboundedSource; -import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.options.PipelineOptionsFactory; -import org.apache.beam.sdk.transforms.Count; -import org.apache.beam.sdk.transforms.OldDoFn; -import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.transforms.windowing.FixedWindows; -import org.apache.beam.sdk.transforms.windowing.OutputTimeFns; -import org.apache.beam.sdk.transforms.windowing.Window; -import org.apache.beam.sdk.values.KV; -import org.joda.time.Duration; -import org.joda.time.Instant; -import org.junit.Assert; -import org.junit.Test; - -/** - * Integration test for {@link GroupByKeyTranslator}. - */ -public class GroupByKeyTranslatorTest { - - @SuppressWarnings({"unchecked"}) - @Test - public void test() throws Exception { - ApexPipelineOptions options = - PipelineOptionsFactory.as(ApexPipelineOptions.class); - options.setApplicationName("GroupByKey"); - options.setRunner(ApexRunner.class); - Pipeline p = Pipeline.create(options); - - List<KV<String, Instant>> data = - Lists.newArrayList( - KV.of("foo", new Instant(1000)), KV.of("foo", new Instant(1000)), - KV.of("foo", new Instant(2000)), - KV.of("bar", new Instant(1000)), KV.of("bar", new Instant(2000)), - KV.of("bar", new Instant(2000)) - ); - - // expected results assume outputAtLatestInputTimestamp - List<KV<Instant, KV<String, Long>>> expected = - Lists.newArrayList( - KV.of(new Instant(1000), KV.of("foo", 2L)), - KV.of(new Instant(1000), KV.of("bar", 1L)), - KV.of(new Instant(2000), KV.of("foo", 1L)), - KV.of(new Instant(2000), KV.of("bar", 2L)) - ); - - p.apply(Read.from(new TestSource(data, new Instant(5000)))) - .apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds(1))) - .withOutputTimeFn(OutputTimeFns.outputAtLatestInputTimestamp())) - .apply(Count.<String>perElement()) - .apply(ParDo.of(new KeyedByTimestamp<KV<String, Long>>())) - .apply(ParDo.of(new EmbeddedCollector())) - ; - - ApexRunnerResult result = (ApexRunnerResult) p.run(); - result.getApexDAG(); - - long timeout = System.currentTimeMillis() + 30000; - while (System.currentTimeMillis() < timeout) { - if (EmbeddedCollector.RESULTS.containsAll(expected)) { - break; - } - Thread.sleep(1000); - } - Assert.assertEquals(Sets.newHashSet(expected), EmbeddedCollector.RESULTS); - - } - - @SuppressWarnings("serial") - private static class EmbeddedCollector extends OldDoFn<Object, Void> { - protected static final HashSet<Object> RESULTS = new HashSet<>(); - - public EmbeddedCollector() { - } - - @Override - public void processElement(ProcessContext c) throws Exception { - RESULTS.add(c.element()); - } - } - - private static class KeyedByTimestamp<T> extends OldDoFn<T, KV<Instant, T>> { - - @Override - public void processElement(ProcessContext c) throws Exception { - c.output(KV.of(c.timestamp(), c.element())); - } - } - - private static class TestSource extends UnboundedSource<String, UnboundedSource.CheckpointMark> { - - private final List<KV<String, Instant>> data; - private final Instant watermark; - - public TestSource(List<KV<String, Instant>> data, Instant watermark) { - this.data = data; - this.watermark = watermark; - } - - @Override - public List<? extends UnboundedSource<String, CheckpointMark>> generateInitialSplits( - int desiredNumSplits, PipelineOptions options) throws Exception { - return Collections.<UnboundedSource<String, CheckpointMark>>singletonList(this); - } - - @Override - public UnboundedReader<String> createReader(PipelineOptions options, - @Nullable CheckpointMark checkpointMark) { - return new TestReader(data, watermark, this); - } - - @Nullable - @Override - public Coder<CheckpointMark> getCheckpointMarkCoder() { - return null; - } - - @Override - public void validate() { - } - - @Override - public Coder<String> getDefaultOutputCoder() { - return StringUtf8Coder.of(); - } - - private static class TestReader extends UnboundedReader<String> implements Serializable { - - private static final long serialVersionUID = 7526472295622776147L; - - private final List<KV<String, Instant>> data; - private final TestSource source; - - private Iterator<KV<String, Instant>> iterator; - private String currentRecord; - private Instant currentTimestamp; - private Instant watermark; - private boolean collected; - - public TestReader(List<KV<String, Instant>> data, Instant watermark, TestSource source) { - this.data = data; - this.source = source; - this.watermark = watermark; - } - - @Override - public boolean start() throws IOException { - iterator = data.iterator(); - return advance(); - } - - @Override - public boolean advance() throws IOException { - if (iterator.hasNext()) { - KV<String, Instant> kv = iterator.next(); - collected = false; - currentRecord = kv.getKey(); - currentTimestamp = kv.getValue(); - return true; - } else { - return false; - } - } - - @Override - public byte[] getCurrentRecordId() throws NoSuchElementException { - return new byte[0]; - } - - @Override - public String getCurrent() throws NoSuchElementException { - collected = true; - return this.currentRecord; - } - - @Override - public Instant getCurrentTimestamp() throws NoSuchElementException { - return currentTimestamp; - } - - @Override - public void close() throws IOException { - } - - @Override - public Instant getWatermark() { - if (!iterator.hasNext() && collected) { - return watermark; - } else { - return new Instant(0); - } - } - - @Override - public CheckpointMark getCheckpointMark() { - return null; - } - - @Override - public UnboundedSource<String, ?> getCurrentSource() { - return this.source; - } - } - } - -}