http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/util/DefaultStepContext.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/util/DefaultStepContext.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/util/DefaultStepContext.java new file mode 100644 index 0000000..481b7fb --- /dev/null +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/util/DefaultStepContext.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.jstorm.translation.util; + +import org.apache.beam.runners.core.ExecutionContext; +import org.apache.beam.runners.core.TimerInternals; +import org.apache.beam.runners.core.StateInternals; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.values.TupleTag; + +import java.io.IOException; + +import static com.google.common.base.Preconditions.checkNotNull; + +/** + * Default StepContext for running DoFn This does not allow accessing state or timer internals. + */ +public class DefaultStepContext implements ExecutionContext.StepContext { + + private TimerInternals timerInternals; + + private StateInternals stateInternals; + + public DefaultStepContext(TimerInternals timerInternals, StateInternals stateInternals) { + this.timerInternals = checkNotNull(timerInternals, "timerInternals"); + this.stateInternals = checkNotNull(stateInternals, "stateInternals"); + } + + @Override + public String getStepName() { + return null; + } + + @Override + public String getTransformName() { + return null; + } + + @Override + public void noteOutput(WindowedValue<?> windowedValue) { + + } + + @Override + public void noteOutput(TupleTag<?> tupleTag, WindowedValue<?> windowedValue) { + + } + + @Override + public <T, W extends BoundedWindow> void writePCollectionViewData(TupleTag<?> tag, Iterable<WindowedValue<T>> data, + Coder<Iterable<WindowedValue<T>>> dataCoder, W window, Coder<W> windowCoder) throws IOException { + throw new UnsupportedOperationException("Writing side-input data is not supported."); + } + + @Override + public StateInternals stateInternals() { + return stateInternals; + } + + @Override + public TimerInternals timerInternals() { + return timerInternals; + } + + public void setStateInternals(StateInternals stateInternals) { + this.stateInternals = stateInternals; + } + + public void setTimerInternals(TimerInternals timerInternals) { + this.timerInternals = timerInternals; + } +}
http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/util/RunnerUtils.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/util/RunnerUtils.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/util/RunnerUtils.java new file mode 100644 index 0000000..cbf815a --- /dev/null +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/util/RunnerUtils.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.jstorm.util; + +import org.apache.beam.runners.jstorm.translation.runtime.Executor; + +import org.apache.beam.runners.jstorm.translation.runtime.GroupByWindowExecutor; +import org.apache.beam.runners.jstorm.translation.runtime.MultiStatefulDoFnExecutor; +import org.apache.beam.runners.jstorm.translation.runtime.StatefulDoFnExecutor; +import org.apache.beam.runners.core.KeyedWorkItem; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.values.KV; + +public class RunnerUtils { + /** + * Convert WindowedValue<KV<>> into KeyedWorkItem<K, WindowedValue<V>> + * @param elem + * @return + */ + public static <K, V> KeyedWorkItem<K, V> toKeyedWorkItem(WindowedValue<KV<K, V>> elem) { + WindowedValue<KV<K, V>> kvElem = (WindowedValue<KV<K, V>>) elem; + SingletonKeyedWorkItem<K, V> workItem = SingletonKeyedWorkItem.of( + kvElem.getValue().getKey(), + kvElem.withValue(kvElem.getValue().getValue())); + return workItem; + } + + public static boolean isGroupByKeyExecutor (Executor executor) { + if (executor instanceof GroupByWindowExecutor) { + return true; + } else if (executor instanceof StatefulDoFnExecutor || + executor instanceof MultiStatefulDoFnExecutor) { + return true; + } else { + return false; + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/util/SerializedPipelineOptions.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/util/SerializedPipelineOptions.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/util/SerializedPipelineOptions.java new file mode 100644 index 0000000..391699b --- /dev/null +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/util/SerializedPipelineOptions.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.jstorm.util; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.beam.sdk.options.PipelineOptions; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.Serializable; + +import static com.google.common.base.Preconditions.checkNotNull; + +/** + * Encapsulates the PipelineOptions in serialized form to ship them to the cluster. + */ +public class SerializedPipelineOptions implements Serializable { + + private final byte[] serializedOptions; + + /** Lazily initialized copy of deserialized options */ + private transient PipelineOptions pipelineOptions; + + public SerializedPipelineOptions(PipelineOptions options) { + checkNotNull(options, "PipelineOptions must not be null."); + + try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) { + new ObjectMapper().writeValue(baos, options); + this.serializedOptions = baos.toByteArray(); + } catch (Exception e) { + throw new RuntimeException("Couldn't serialize PipelineOptions.", e); + } + + } + + public PipelineOptions getPipelineOptions() { + if (pipelineOptions == null) { + try { + pipelineOptions = new ObjectMapper().readValue(serializedOptions, PipelineOptions.class); + } catch (IOException e) { + throw new RuntimeException("Couldn't deserialize the PipelineOptions.", e); + } + } + + return pipelineOptions; + } + +} http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/util/SingletonKeyedWorkItem.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/util/SingletonKeyedWorkItem.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/util/SingletonKeyedWorkItem.java new file mode 100644 index 0000000..dee5f1a --- /dev/null +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/util/SingletonKeyedWorkItem.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.jstorm.util; + +import java.util.Collections; +import org.apache.beam.runners.core.KeyedWorkItem; +import org.apache.beam.runners.core.TimerInternals; +import org.apache.beam.sdk.util.WindowedValue; + +/** + * Singleton keyed word item. + * @param <K> + * @param <ElemT> + */ +public class SingletonKeyedWorkItem<K, ElemT> implements KeyedWorkItem<K, ElemT> { + + final K key; + final WindowedValue<ElemT> value; + + private SingletonKeyedWorkItem(K key, WindowedValue<ElemT> value) { + this.key = key; + this.value = value; + } + + public static <K, ElemT> SingletonKeyedWorkItem<K, ElemT> of(K key, WindowedValue<ElemT> value) { + return new SingletonKeyedWorkItem<K, ElemT>(key, value); + } + + @Override + public K key() { + return key; + } + + public WindowedValue<ElemT> value() { + return value; + } + + @Override + public Iterable<TimerInternals.TimerData> timersIterable() { + return Collections.EMPTY_LIST; + } + + @Override + public Iterable<WindowedValue<ElemT>> elementsIterable() { + return Collections.singletonList(value); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/test/java/com/alibaba/jstorm/beam/translation/runtime/state/JStormStateInternalsTest.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/test/java/com/alibaba/jstorm/beam/translation/runtime/state/JStormStateInternalsTest.java b/runners/jstorm/src/test/java/com/alibaba/jstorm/beam/translation/runtime/state/JStormStateInternalsTest.java deleted file mode 100644 index 0ecffff..0000000 --- a/runners/jstorm/src/test/java/com/alibaba/jstorm/beam/translation/runtime/state/JStormStateInternalsTest.java +++ /dev/null @@ -1,219 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.jstorm.beam.translation.runtime.state; - -import avro.shaded.com.google.common.collect.Maps; -import com.alibaba.jstorm.beam.translation.runtime.TimerServiceImpl; -import com.alibaba.jstorm.cache.IKvStoreManager; -import com.alibaba.jstorm.cache.rocksdb.RocksDbKvStoreManagerFactory; -import com.alibaba.jstorm.utils.KryoSerializer; - -import org.apache.beam.runners.core.StateNamespaces; -import org.apache.beam.runners.core.StateTags; -import org.apache.beam.sdk.coders.BigEndianIntegerCoder; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.CoderRegistry; -import org.apache.beam.sdk.state.*; -import org.apache.beam.sdk.transforms.Combine; -import org.apache.beam.sdk.transforms.Max; -import org.apache.beam.sdk.transforms.windowing.TimestampCombiner; -import org.joda.time.Instant; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TemporaryFolder; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -import java.util.Iterator; -import java.util.Map; - -import static org.hamcrest.Matchers.containsInAnyOrder; -import static org.hamcrest.Matchers.hasEntry; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertThat; - -/** - * Tests for {@link JStormStateInternals}. - */ -@RunWith(JUnit4.class) -public class JStormStateInternalsTest { - - @Rule - public final TemporaryFolder tmp = new TemporaryFolder(); - - private JStormStateInternals<String> jstormStateInternals; - - @Before - public void setup() throws Exception { - IKvStoreManager kvStoreManager = RocksDbKvStoreManagerFactory.getManager( - Maps.newHashMap(), - "test", - tmp.toString(), - new KryoSerializer(Maps.newHashMap())); - jstormStateInternals = new JStormStateInternals("key-1", kvStoreManager, new TimerServiceImpl(), 0); - } - - @Test - public void testValueState() throws Exception { - ValueState<Integer> valueState = jstormStateInternals.state( - StateNamespaces.global(), StateTags.value("state-id-a", BigEndianIntegerCoder.of())); - valueState.write(Integer.MIN_VALUE); - assertEquals(Integer.MIN_VALUE, valueState.read().longValue()); - valueState.write(Integer.MAX_VALUE); - assertEquals(Integer.MAX_VALUE, valueState.read().longValue()); - } - - @Test - public void testValueStateIdenticalId() throws Exception { - ValueState<Integer> valueState = jstormStateInternals.state( - StateNamespaces.global(), StateTags.value("state-id-a", BigEndianIntegerCoder.of())); - ValueState<Integer> valueStateIdentical = jstormStateInternals.state( - StateNamespaces.global(), StateTags.value("state-id-a", BigEndianIntegerCoder.of())); - - valueState.write(Integer.MIN_VALUE); - assertEquals(Integer.MIN_VALUE, valueState.read().longValue()); - assertEquals(Integer.MIN_VALUE, valueStateIdentical.read().longValue()); - valueState.write(Integer.MAX_VALUE); - assertEquals(Integer.MAX_VALUE, valueState.read().longValue()); - assertEquals(Integer.MAX_VALUE, valueStateIdentical.read().longValue()); - } - - @Test - public void testBagState() throws Exception { - BagState<Integer> bagStateA = jstormStateInternals.state( - StateNamespaces.global(), StateTags.bag("state-id-a", BigEndianIntegerCoder.of())); - BagState<Integer> bagStateB = jstormStateInternals.state( - StateNamespaces.global(), StateTags.bag("state-id-b", BigEndianIntegerCoder.of())); - - bagStateA.add(1); - bagStateA.add(0); - bagStateA.add(Integer.MAX_VALUE); - - bagStateB.add(0); - bagStateB.add(Integer.MIN_VALUE); - - Iterable<Integer> bagA = bagStateA.read(); - Iterable<Integer> bagB = bagStateB.read(); - assertThat(bagA, containsInAnyOrder(1, 0, Integer.MAX_VALUE)); - assertThat(bagB, containsInAnyOrder(0, Integer.MIN_VALUE)); - - bagStateA.clear(); - bagStateA.add(1); - bagStateB.add(0); - assertThat(bagStateA.read(), containsInAnyOrder(1)); - assertThat(bagStateB.read(), containsInAnyOrder(0, 0, Integer.MIN_VALUE)); - } - - @Test - public void testCombiningState() throws Exception { - Combine.CombineFn<Integer, int[], Integer> combineFn = Max.ofIntegers(); - Coder<int[]> accumCoder = combineFn.getAccumulatorCoder( - CoderRegistry.createDefault(), BigEndianIntegerCoder.of()); - - CombiningState<Integer, int[], Integer> combiningState = jstormStateInternals.state( - StateNamespaces.global(), - StateTags.combiningValue( - "state-id-a", - accumCoder, - combineFn)); - assertEquals(Integer.MIN_VALUE, combiningState.read().longValue()); - combiningState.add(10); - assertEquals(10, combiningState.read().longValue()); - combiningState.add(1); - assertEquals(10, combiningState.read().longValue()); - combiningState.add(Integer.MAX_VALUE); - assertEquals(Integer.MAX_VALUE, combiningState.read().longValue()); - } - - @Test - public void testWatermarkHoldState() throws Exception { - WatermarkHoldState watermarkHoldState = jstormStateInternals.state( - StateNamespaces.global(), - StateTags.watermarkStateInternal( - "state-id-a", - TimestampCombiner.EARLIEST)); - watermarkHoldState.add(new Instant(1)); - assertEquals(1, watermarkHoldState.read().getMillis()); - watermarkHoldState.add(new Instant(Integer.MIN_VALUE)); - assertEquals(Integer.MIN_VALUE, watermarkHoldState.read().getMillis()); - watermarkHoldState.add(new Instant(Integer.MAX_VALUE)); - assertEquals(Integer.MIN_VALUE, watermarkHoldState.read().getMillis()); - } - - @Test - public void testMapState() throws Exception { - MapState<Integer, Integer> mapStateA = jstormStateInternals.state( - StateNamespaces.global(), StateTags.map("state-id-a", BigEndianIntegerCoder.of(), BigEndianIntegerCoder.of())); - mapStateA.put(1, 1); - mapStateA.put(2, 22); - mapStateA.put(1, 12); - - Iterable<Integer> keys = mapStateA.keys().read(); - Iterable<Integer> values = mapStateA.values().read(); - assertThat(keys, containsInAnyOrder(1, 2)); - assertThat(values, containsInAnyOrder(12, 22)); - - Iterable<Map.Entry<Integer, Integer>> entries = mapStateA.entries().read(); - Iterator<Map.Entry<Integer, Integer>> itr = entries.iterator(); - Map.Entry<Integer, Integer> entry = itr.next(); - assertEquals((long) entry.getKey(), 1l); - assertEquals((long) entry.getValue(), 12l); - entry = itr.next(); - assertEquals((long) entry.getKey(), 2l); - assertEquals((long) entry.getValue(), 22l); - assertEquals(false, itr.hasNext()); - - mapStateA.remove(1); - keys = mapStateA.keys().read(); - values = mapStateA.values().read(); - assertThat(keys, containsInAnyOrder(2)); - assertThat(values, containsInAnyOrder(22)); - - entries = mapStateA.entries().read(); - itr = entries.iterator(); - entry = itr.next(); - assertEquals((long) entry.getKey(), 2l); - assertEquals((long) entry.getValue(), 22l); - assertEquals(false, itr.hasNext()); - } - - @Test - public void testMassiveDataOfBagState() { - BagState<Integer> bagStateA = jstormStateInternals.state( - StateNamespaces.global(), StateTags.bag("state-id-a", BigEndianIntegerCoder.of())); - - int count = 10000; - int n = 1; - while(n <= count) { - bagStateA.add(n); - n++; - } - - int readCount = 0; - int readN = 0; - Iterator<Integer> itr = bagStateA.read().iterator(); - while(itr.hasNext()) { - readN += itr.next(); - readCount++; - } - - assertEquals((long) readN, ((1 + count) * count) / 2); - assertEquals((long) readCount, count); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/test/java/com/alibaba/jstorm/beam/translation/translator/CoGroupByKeyTest.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/test/java/com/alibaba/jstorm/beam/translation/translator/CoGroupByKeyTest.java b/runners/jstorm/src/test/java/com/alibaba/jstorm/beam/translation/translator/CoGroupByKeyTest.java deleted file mode 100644 index 4f69c93..0000000 --- a/runners/jstorm/src/test/java/com/alibaba/jstorm/beam/translation/translator/CoGroupByKeyTest.java +++ /dev/null @@ -1,302 +0,0 @@ -package com.alibaba.jstorm.beam.translation.translator; - -import com.alibaba.jstorm.beam.StormPipelineOptions; -import com.alibaba.jstorm.beam.TestJStormRunner; -import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.coders.BigEndianIntegerCoder; -import org.apache.beam.sdk.coders.KvCoder; -import org.apache.beam.sdk.coders.StringUtf8Coder; -import org.apache.beam.sdk.options.PipelineOptionsFactory; -import org.apache.beam.sdk.testing.PAssert; -import org.apache.beam.sdk.testing.ValidatesRunner; -import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.transforms.SerializableFunction; -import org.apache.beam.sdk.transforms.join.CoGbkResult; -import org.apache.beam.sdk.transforms.join.CoGroupByKey; -import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple; -import org.apache.beam.sdk.transforms.windowing.FixedWindows; -import org.apache.beam.sdk.transforms.windowing.TimestampCombiner; -import org.apache.beam.sdk.transforms.windowing.Window; -import org.apache.beam.sdk.values.KV; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.TupleTag; -import org.joda.time.Duration; -import org.junit.Test; -import org.junit.experimental.categories.Category; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -import java.io.Serializable; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.Map; - -import static org.hamcrest.collection.IsIterableContainingInAnyOrder.containsInAnyOrder; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertThat; - -@RunWith(JUnit4.class) -public class CoGroupByKeyTest implements Serializable { - /** - * Converts the given list into a PCollection belonging to the provided - * Pipeline in such a way that coder inference needs to be performed. - */ - private PCollection<KV<Integer, String>> createInput(String name, - Pipeline p, List<KV<Integer, String>> list) { - return createInput(name, p, list, new ArrayList<Long>()); - } - - /** - * Converts the given list with timestamps into a PCollection. - */ - private PCollection<KV<Integer, String>> createInput(String name, - Pipeline p, List<KV<Integer, String>> list, List<Long> timestamps) { - PCollection<KV<Integer, String>> input; - if (timestamps.isEmpty()) { - input = p.apply("Create" + name, Create.of(list) - .withCoder(KvCoder.of(BigEndianIntegerCoder.of(), StringUtf8Coder.of()))); - } else { - input = p.apply("Create" + name, Create.timestamped(list, timestamps) - .withCoder(KvCoder.of(BigEndianIntegerCoder.of(), StringUtf8Coder.of()))); - } - return input.apply( - "Identity" + name, - ParDo.of( - new DoFn<KV<Integer, String>, KV<Integer, String>>() { - @ProcessElement - public void processElement(ProcessContext c) { - c.output(c.element()); - } - })); - } - - /** - * Returns a {@code PCollection<KV<Integer, CoGbkResult>>} containing the result - * of a {@link CoGroupByKey} over 2 {@code PCollection<KV<Integer, String>>}, - * where each {@link PCollection} has no duplicate keys and the key sets of - * each {@link PCollection} are intersecting but neither is a subset of the other. - */ - private PCollection<KV<Integer, CoGbkResult>> buildGetOnlyGbk( - Pipeline p, - TupleTag<String> tag1, - TupleTag<String> tag2) { - List<KV<Integer, String>> list1 = - Arrays.asList( - KV.of(1, "collection1-1"), - KV.of(2, "collection1-2")); - List<KV<Integer, String>> list2 = - Arrays.asList( - KV.of(2, "collection2-2"), - KV.of(3, "collection2-3")); - PCollection<KV<Integer, String>> collection1 = createInput("CreateList1", p, list1); - PCollection<KV<Integer, String>> collection2 = createInput("CreateList2", p, list2); - PCollection<KV<Integer, CoGbkResult>> coGbkResults = - KeyedPCollectionTuple.of(tag1, collection1) - .and(tag2, collection2) - .apply(CoGroupByKey.<Integer>create()); - return coGbkResults; - } - - @Test - @Category(ValidatesRunner.class) - public void testCoGroupByKeyGetOnly() { - StormPipelineOptions options = PipelineOptionsFactory.as(StormPipelineOptions.class); - options.setRunner(TestJStormRunner.class); - options.setLocalMode(true); - - Pipeline p = Pipeline.create(options); - - final TupleTag<String> tag1 = new TupleTag<>(); - final TupleTag<String> tag2 = new TupleTag<>(); - - PCollection<KV<Integer, CoGbkResult>> coGbkResults = - buildGetOnlyGbk(p, tag1, tag2); - - PAssert.thatMap(coGbkResults).satisfies( - new SerializableFunction<Map<Integer, CoGbkResult>, Void>() { - @Override - public Void apply(Map<Integer, CoGbkResult> results) { - assertEquals("collection1-1", results.get(1).getOnly(tag1)); - assertEquals("collection1-2", results.get(2).getOnly(tag1)); - assertEquals("collection2-2", results.get(2).getOnly(tag2)); - assertEquals("collection2-3", results.get(3).getOnly(tag2)); - return null; - } - }); - - p.run(); - } - - /** - * Returns a {@code PCollection<KV<Integer, CoGbkResult>>} containing the - * results of the {@code CoGroupByKey} over three - * {@code PCollection<KV<Integer, String>>}, each of which correlates - * a customer id to purchases, addresses, or names, respectively. - */ - private PCollection<KV<Integer, CoGbkResult>> buildPurchasesCoGbk( - Pipeline p, - TupleTag<String> purchasesTag, - TupleTag<String> addressesTag, - TupleTag<String> namesTag) { - List<KV<Integer, String>> idToPurchases = - Arrays.asList( - KV.of(2, "Boat"), - KV.of(1, "Shoes"), - KV.of(3, "Car"), - KV.of(1, "Book"), - KV.of(10, "Pens"), - KV.of(8, "House"), - KV.of(4, "Suit"), - KV.of(11, "House"), - KV.of(14, "Shoes"), - KV.of(2, "Suit"), - KV.of(8, "Suit Case"), - KV.of(3, "House")); - - List<KV<Integer, String>> idToAddress = - Arrays.asList( - KV.of(2, "53 S. 3rd"), - KV.of(10, "383 Jackson Street"), - KV.of(20, "3 W. Arizona"), - KV.of(3, "29 School Rd"), - KV.of(8, "6 Watling Rd")); - - List<KV<Integer, String>> idToName = - Arrays.asList( - KV.of(1, "John Smith"), - KV.of(2, "Sally James"), - KV.of(8, "Jeffery Spalding"), - KV.of(20, "Joan Lichtfield")); - - PCollection<KV<Integer, String>> purchasesTable = - createInput("CreateIdToPurchases", p, idToPurchases); - - PCollection<KV<Integer, String>> addressTable = - createInput("CreateIdToAddress", p, idToAddress); - - PCollection<KV<Integer, String>> nameTable = - createInput("CreateIdToName", p, idToName); - - PCollection<KV<Integer, CoGbkResult>> coGbkResults = - KeyedPCollectionTuple.of(namesTag, nameTable) - .and(addressesTag, addressTable) - .and(purchasesTag, purchasesTable) - .apply(CoGroupByKey.<Integer>create()); - return coGbkResults; - } - - /** - * Returns a {@code PCollection<KV<Integer, CoGbkResult>>} containing the - * results of the {@code CoGroupByKey} over 2 {@code PCollection<KV<Integer, String>>}, - * each of which correlates a customer id to clicks, purchases, respectively. - */ - private PCollection<KV<Integer, CoGbkResult>> buildPurchasesCoGbkWithWindowing( - Pipeline p, - TupleTag<String> clicksTag, - TupleTag<String> purchasesTag) { - List<KV<Integer, String>> idToClick = - Arrays.asList( - KV.of(1, "Click t0"), - KV.of(2, "Click t2"), - KV.of(1, "Click t4"), - KV.of(1, "Click t6"), - KV.of(2, "Click t8")); - - List<KV<Integer, String>> idToPurchases = - Arrays.asList( - KV.of(1, "Boat t1"), - KV.of(1, "Shoesi t2"), - KV.of(1, "Pens t3"), - KV.of(2, "House t4"), - KV.of(2, "Suit t5"), - KV.of(1, "Car t6"), - KV.of(1, "Book t7"), - KV.of(2, "House t8"), - KV.of(2, "Shoes t9"), - KV.of(2, "House t10")); - - PCollection<KV<Integer, String>> clicksTable = - createInput("CreateClicks", - p, - idToClick, - Arrays.asList(0L, 2L, 4L, 6L, 8L)) - .apply("WindowClicks", Window.<KV<Integer, String>>into( - FixedWindows.of(new Duration(4))) - .withTimestampCombiner(TimestampCombiner.EARLIEST)); - - PCollection<KV<Integer, String>> purchasesTable = - createInput("CreatePurchases", - p, - idToPurchases, - Arrays.asList(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L)) - .apply("WindowPurchases", Window.<KV<Integer, String>>into( - FixedWindows.of(new Duration(4))) - .withTimestampCombiner(TimestampCombiner.EARLIEST)); - - PCollection<KV<Integer, CoGbkResult>> coGbkResults = - KeyedPCollectionTuple.of(clicksTag, clicksTable) - .and(purchasesTag, purchasesTable) - .apply(CoGroupByKey.<Integer>create()); - return coGbkResults; - } - - @Test - @Category(ValidatesRunner.class) - public void testCoGroupByKey() { - StormPipelineOptions options = PipelineOptionsFactory.as(StormPipelineOptions.class); - options.setRunner(TestJStormRunner.class); - options.setLocalMode(true); - - Pipeline p = Pipeline.create(options); - - final TupleTag<String> namesTag = new TupleTag<>(); - final TupleTag<String> addressesTag = new TupleTag<>(); - final TupleTag<String> purchasesTag = new TupleTag<>(); - - - PCollection<KV<Integer, CoGbkResult>> coGbkResults = - buildPurchasesCoGbk(p, purchasesTag, addressesTag, namesTag); - - PAssert.thatMap(coGbkResults).satisfies( - new SerializableFunction<Map<Integer, CoGbkResult>, Void>() { - @Override - public Void apply(Map<Integer, CoGbkResult> results) { - CoGbkResult result1 = results.get(1); - assertEquals("John Smith", result1.getOnly(namesTag)); - assertThat(result1.getAll(purchasesTag), containsInAnyOrder("Shoes", "Book")); - - CoGbkResult result2 = results.get(2); - assertEquals("Sally James", result2.getOnly(namesTag)); - assertEquals("53 S. 3rd", result2.getOnly(addressesTag)); - assertThat(result2.getAll(purchasesTag), containsInAnyOrder("Suit", "Boat")); - - CoGbkResult result3 = results.get(3); - assertEquals("29 School Rd", result3.getOnly(addressesTag), "29 School Rd"); - assertThat(result3.getAll(purchasesTag), containsInAnyOrder("Car", "House")); - - CoGbkResult result8 = results.get(8); - assertEquals("Jeffery Spalding", result8.getOnly(namesTag)); - assertEquals("6 Watling Rd", result8.getOnly(addressesTag)); - assertThat(result8.getAll(purchasesTag), containsInAnyOrder("House", "Suit Case")); - - CoGbkResult result20 = results.get(20); - assertEquals("Joan Lichtfield", result20.getOnly(namesTag)); - assertEquals("3 W. Arizona", result20.getOnly(addressesTag)); - - assertEquals("383 Jackson Street", results.get(10).getOnly(addressesTag)); - - assertThat(results.get(4).getAll(purchasesTag), containsInAnyOrder("Suit")); - assertThat(results.get(10).getAll(purchasesTag), containsInAnyOrder("Pens")); - assertThat(results.get(11).getAll(purchasesTag), containsInAnyOrder("House")); - assertThat(results.get(14).getAll(purchasesTag), containsInAnyOrder("Shoes")); - - return null; - } - }); - - p.run(); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/test/java/com/alibaba/jstorm/beam/translation/translator/GroupByKeyTest.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/test/java/com/alibaba/jstorm/beam/translation/translator/GroupByKeyTest.java b/runners/jstorm/src/test/java/com/alibaba/jstorm/beam/translation/translator/GroupByKeyTest.java deleted file mode 100644 index 5ec6636..0000000 --- a/runners/jstorm/src/test/java/com/alibaba/jstorm/beam/translation/translator/GroupByKeyTest.java +++ /dev/null @@ -1,159 +0,0 @@ - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.jstorm.beam.translation.translator; - -import com.alibaba.jstorm.beam.StormPipelineOptions; - -import com.alibaba.jstorm.beam.TestJStormRunner; -import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.coders.BigEndianIntegerCoder; -import org.apache.beam.sdk.coders.KvCoder; -import org.apache.beam.sdk.coders.StringUtf8Coder; -import org.apache.beam.sdk.options.PipelineOptionsFactory; -import org.apache.beam.sdk.testing.PAssert; -import org.apache.beam.sdk.testing.ValidatesRunner; -import org.apache.beam.sdk.transforms.Count; -import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.transforms.GroupByKey; -import org.apache.beam.sdk.transforms.SerializableFunction; -import org.apache.beam.sdk.values.KV; -import org.apache.beam.sdk.values.PCollection; -import org.hamcrest.Description; -import org.hamcrest.Matcher; -import org.hamcrest.TypeSafeMatcher; -import org.junit.Test; -import org.junit.experimental.categories.Category; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -import java.util.Arrays; -import java.util.List; - -import static org.hamcrest.collection.IsIterableContainingInAnyOrder.containsInAnyOrder; -import static org.hamcrest.core.Is.is; -import static org.junit.Assert.assertThat; - -/** - * Tests for {@link GroupByKey} with {@link com.alibaba.jstorm.beam.StormRunner}. - */ -@RunWith(JUnit4.class) -public class GroupByKeyTest { - - static final String[] WORDS_ARRAY = new String[] { - "hi", "there", "hi", "hi", "sue", "bob", - "hi", "sue", "", "", "ZOW", "bob", "" }; - - static final List<String> WORDS = Arrays.asList(WORDS_ARRAY); - - @Test - public void testGroupByKey() { - StormPipelineOptions options = PipelineOptionsFactory.as(StormPipelineOptions.class); - options.setRunner(TestJStormRunner.class); - options.setLocalMode(true); - - Pipeline p = Pipeline.create(options); - - List<KV<String, Integer>> ungroupedPairs = Arrays.asList( - KV.of("k1", 3), - KV.of("k5", Integer.MAX_VALUE), - KV.of("k5", Integer.MIN_VALUE), - KV.of("k2", 66), - KV.of("k1", 4), - KV.of("k2", -33), - KV.of("k3", 0)); - - PCollection<KV<String, Integer>> input = - p.apply(Create.of(ungroupedPairs) - .withCoder(KvCoder.of(StringUtf8Coder.of(), BigEndianIntegerCoder.of()))); - - PCollection<KV<String, Iterable<Integer>>> output = - input.apply(GroupByKey.<String, Integer>create()); - - PAssert.that(output) - .satisfies(new AssertThatHasExpectedContentsForTestGroupByKey()); - - p.run(); - } - - @Test - public void testCountGloballyBasic() { - StormPipelineOptions options = PipelineOptionsFactory.as(StormPipelineOptions.class); - options.setRunner(TestJStormRunner.class); - options.setLocalMode(true); - - Pipeline p = Pipeline.create(options); - PCollection<String> input = p.apply(Create.of(WORDS)); - - PCollection<Long> output = - input.apply(Count.<String>globally()); - - PAssert.that(output) - .containsInAnyOrder(13L); - p.run(); - } - - static class AssertThatHasExpectedContentsForTestGroupByKey - implements SerializableFunction<Iterable<KV<String, Iterable<Integer>>>, - Void> { - @Override - public Void apply(Iterable<KV<String, Iterable<Integer>>> actual) { - assertThat(actual, containsInAnyOrder( - KvMatcher.isKv(is("k1"), containsInAnyOrder(3, 4)), - KvMatcher.isKv(is("k5"), containsInAnyOrder(Integer.MAX_VALUE, - Integer.MIN_VALUE)), - KvMatcher.isKv(is("k2"), containsInAnyOrder(66, -33)), - KvMatcher.isKv(is("k3"), containsInAnyOrder(0)))); - return null; - } - } - - /** - * Matcher for KVs. - */ - public static class KvMatcher<K, V> - extends TypeSafeMatcher<KV<? extends K, ? extends V>> { - final Matcher<? super K> keyMatcher; - final Matcher<? super V> valueMatcher; - - public static <K, V> KvMatcher<K, V> isKv(Matcher<K> keyMatcher, - Matcher<V> valueMatcher) { - return new KvMatcher<>(keyMatcher, valueMatcher); - } - - public KvMatcher(Matcher<? super K> keyMatcher, - Matcher<? super V> valueMatcher) { - this.keyMatcher = keyMatcher; - this.valueMatcher = valueMatcher; - } - - @Override - public boolean matchesSafely(KV<? extends K, ? extends V> kv) { - return keyMatcher.matches(kv.getKey()) - && valueMatcher.matches(kv.getValue()); - } - - @Override - public void describeTo(Description description) { - description - .appendText("a KV(").appendValue(keyMatcher) - .appendText(", ").appendValue(valueMatcher) - .appendText(")"); - } - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/test/java/com/alibaba/jstorm/beam/translation/translator/ParDoTest.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/test/java/com/alibaba/jstorm/beam/translation/translator/ParDoTest.java b/runners/jstorm/src/test/java/com/alibaba/jstorm/beam/translation/translator/ParDoTest.java deleted file mode 100644 index da0aafe..0000000 --- a/runners/jstorm/src/test/java/com/alibaba/jstorm/beam/translation/translator/ParDoTest.java +++ /dev/null @@ -1,626 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.jstorm.beam.translation.translator; - -import com.alibaba.jstorm.beam.StormPipelineOptions; - -import com.alibaba.jstorm.beam.TestJStormRunner; -import com.google.common.base.MoreObjects; -import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.coders.*; -import org.apache.beam.sdk.io.GenerateSequence; -import org.apache.beam.sdk.options.PipelineOptionsFactory; -import org.apache.beam.sdk.state.*; -import org.apache.beam.sdk.testing.PAssert; -import org.apache.beam.sdk.testing.UsesMapState; -import org.apache.beam.sdk.testing.UsesStatefulParDo; -import org.apache.beam.sdk.testing.ValidatesRunner; -import org.apache.beam.sdk.transforms.*; -import org.apache.beam.sdk.transforms.windowing.*; -import org.apache.beam.sdk.values.*; -import org.joda.time.Duration; -import org.junit.Test; -import org.junit.experimental.categories.Category; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.io.Serializable; -import java.util.*; - -import static org.hamcrest.Matchers.anyOf; -import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.collection.IsIterableContainingInAnyOrder.containsInAnyOrder; -import static org.hamcrest.collection.IsIterableContainingInOrder.contains; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertThat; - -/** - * Tests for {@link ParDo} with {@link com.alibaba.jstorm.beam.StormRunner}. - */ -@RunWith(JUnit4.class) -public class ParDoTest implements Serializable { - - @Test - public void testParDo() throws IOException { - StormPipelineOptions options = PipelineOptionsFactory.as(StormPipelineOptions.class); - options.setRunner(TestJStormRunner.class); - Pipeline pipeline = Pipeline.create(options); - - List<Integer> inputs = Arrays.asList(3, -42, 666); - - PCollection<String> output = pipeline - .apply(Create.of(inputs)) - .apply(ParDo.of(new TestDoFn())); - - PAssert.that(output) - .satisfies(ParDoTest.HasExpectedOutput.forInput(inputs)); - - pipeline.run(); - } - - @Test - public void testParDoWithSideInputs() throws IOException { - StormPipelineOptions options = PipelineOptionsFactory.as(StormPipelineOptions.class); - options.setRunner(TestJStormRunner.class); - Pipeline pipeline = Pipeline.create(options); - - List<Integer> inputs = Arrays.asList(3, -42, 666); - - PCollectionView<Integer> sideInput1 = pipeline - .apply("CreateSideInput1", Create.of(11)) - .apply("ViewSideInput1", View.<Integer>asSingleton()); - PCollectionView<Integer> sideInputUnread = pipeline - .apply("CreateSideInputUnread", Create.of(-3333)) - .apply("ViewSideInputUnread", View.<Integer>asSingleton()); - - PCollectionView<Integer> sideInput2 = pipeline - .apply("CreateSideInput2", Create.of(222)) - .apply("ViewSideInput2", View.<Integer>asSingleton()); - PCollection<String> output = pipeline - .apply(Create.of(inputs)) - .apply(ParDo.of(new TestDoFn( - Arrays.asList(sideInput1, sideInput2), - Arrays.<TupleTag<String>>asList())) - .withSideInputs(sideInput1, sideInputUnread, sideInput2)); - - PAssert.that(output) - .satisfies(ParDoTest.HasExpectedOutput - .forInput(inputs) - .andSideInputs(11, 222)); - - pipeline.run(); - } - - @Test - public void testParDoWithTaggedOutput() { - List<Integer> inputs = Arrays.asList(3, -42, 666); - - TupleTag<String> mainOutputTag = new TupleTag<String>("main"){}; - TupleTag<String> additionalOutputTag1 = new TupleTag<String>("additional1"){}; - TupleTag<String> additionalOutputTag2 = new TupleTag<String>("additional2"){}; - TupleTag<String> additionalOutputTag3 = new TupleTag<String>("additional3"){}; - TupleTag<String> additionalOutputTagUnwritten = new TupleTag<String>("unwrittenOutput"){}; - - StormPipelineOptions options = PipelineOptionsFactory.as(StormPipelineOptions.class); - options.setRunner(TestJStormRunner.class); - Pipeline pipeline = Pipeline.create(options); - - PCollectionTuple outputs = pipeline - .apply(Create.of(inputs)) - .apply(ParDo - .of(new TestDoFn( - Arrays.<PCollectionView<Integer>>asList(), - Arrays.asList(additionalOutputTag1, additionalOutputTag2, additionalOutputTag3))) - .withOutputTags( - mainOutputTag, - TupleTagList.of(additionalOutputTag3) - .and(additionalOutputTag1) - .and(additionalOutputTagUnwritten) - .and(additionalOutputTag2))); - - PAssert.that(outputs.get(mainOutputTag)) - .satisfies(ParDoTest.HasExpectedOutput.forInput(inputs)); - - PAssert.that(outputs.get(additionalOutputTag1)) - .satisfies(ParDoTest.HasExpectedOutput.forInput(inputs) - .fromOutput(additionalOutputTag1)); - PAssert.that(outputs.get(additionalOutputTag2)) - .satisfies(ParDoTest.HasExpectedOutput.forInput(inputs) - .fromOutput(additionalOutputTag2)); - PAssert.that(outputs.get(additionalOutputTag3)) - .satisfies(ParDoTest.HasExpectedOutput.forInput(inputs) - .fromOutput(additionalOutputTag3)); - PAssert.that(outputs.get(additionalOutputTagUnwritten)).empty(); - - pipeline.run(); - } - - @Test - public void testNoWindowFnDoesNotReassignWindows() { - StormPipelineOptions options = PipelineOptionsFactory.as(StormPipelineOptions.class); - options.setRunner(TestJStormRunner.class); - Pipeline pipeline = Pipeline.create(options); - - final PCollection<Long> initialWindows = - pipeline - .apply(GenerateSequence.from(0).to(10)) - .apply("AssignWindows", Window.into(new WindowOddEvenBuckets())); - - // Sanity check the window assignment to demonstrate the baseline - PAssert.that(initialWindows) - .inWindow(WindowOddEvenBuckets.EVEN_WINDOW) - .containsInAnyOrder(0L, 2L, 4L, 6L, 8L); - PAssert.that(initialWindows) - .inWindow(WindowOddEvenBuckets.ODD_WINDOW) - .containsInAnyOrder(1L, 3L, 5L, 7L, 9L); - - PCollection<Boolean> upOne = - initialWindows.apply( - "ModifyTypes", - MapElements.<Long, Boolean>via( - new SimpleFunction<Long, Boolean>() { - @Override - public Boolean apply(Long input) { - return input % 2 == 0; - } - })); - PAssert.that(upOne) - .inWindow(WindowOddEvenBuckets.EVEN_WINDOW) - .containsInAnyOrder(true, true, true, true, true); - PAssert.that(upOne) - .inWindow(WindowOddEvenBuckets.ODD_WINDOW) - .containsInAnyOrder(false, false, false, false, false); - - // The elements should be in the same windows, even though they would not be assigned to the - // same windows with the updated timestamps. If we try to apply the original WindowFn, the type - // will not be appropriate and the runner should crash, as a Boolean cannot be converted into - // a long. - PCollection<Boolean> updatedTrigger = - upOne.apply( - "UpdateWindowingStrategy", - Window.<Boolean>configure().triggering(Never.ever()) - .withAllowedLateness(Duration.ZERO) - .accumulatingFiredPanes()); - pipeline.run(); - } - - @Test - public void testValueStateSameId() { - StormPipelineOptions options = PipelineOptionsFactory.as(StormPipelineOptions.class); - options.setRunner(TestJStormRunner.class); - Pipeline pipeline = Pipeline.create(options); - - final String stateId = "foo"; - - DoFn<KV<String, Integer>, KV<String, Integer>> fn = - new DoFn<KV<String, Integer>, KV<String, Integer>>() { - - @StateId(stateId) - private final StateSpec<ValueState<Integer>> intState = - StateSpecs.value(VarIntCoder.of()); - - @ProcessElement - public void processElement( - ProcessContext c, @StateId(stateId) ValueState<Integer> state) { - Integer currentValue = MoreObjects.firstNonNull(state.read(), 0); - c.output(KV.of("sizzle", currentValue)); - state.write(currentValue + 1); - } - }; - - DoFn<KV<String, Integer>, Integer> fn2 = - new DoFn<KV<String, Integer>, Integer>() { - - @StateId(stateId) - private final StateSpec<ValueState<Integer>> intState = - StateSpecs.value(VarIntCoder.of()); - - @ProcessElement - public void processElement( - ProcessContext c, @StateId(stateId) ValueState<Integer> state) { - Integer currentValue = MoreObjects.firstNonNull(state.read(), 13); - c.output(currentValue); - state.write(currentValue + 13); - } - }; - - PCollection<KV<String, Integer>> intermediate = - pipeline.apply(Create.of(KV.of("hello", 42), KV.of("hello", 97), KV.of("hello", 84))) - .apply("First stateful ParDo", ParDo.of(fn)); - - PCollection<Integer> output = - intermediate.apply("Second stateful ParDo", ParDo.of(fn2)); - - PAssert.that(intermediate) - .containsInAnyOrder(KV.of("sizzle", 0), KV.of("sizzle", 1), KV.of("sizzle", 2)); - PAssert.that(output).containsInAnyOrder(13, 26, 39); - pipeline.run(); - } - - @Test - @Category({ValidatesRunner.class, UsesStatefulParDo.class}) - public void testValueStateTaggedOutput() { - StormPipelineOptions options = PipelineOptionsFactory.as(StormPipelineOptions.class); - options.setRunner(TestJStormRunner.class); - Pipeline pipeline = Pipeline.create(options); - - final String stateId = "foo"; - - final TupleTag<Integer> evenTag = new TupleTag<Integer>() {}; - final TupleTag<Integer> oddTag = new TupleTag<Integer>() {}; - - DoFn<KV<String, Integer>, Integer> fn = - new DoFn<KV<String, Integer>, Integer>() { - - @StateId(stateId) - private final StateSpec<ValueState<Integer>> intState = - StateSpecs.value(VarIntCoder.of()); - - @ProcessElement - public void processElement( - ProcessContext c, @StateId(stateId) ValueState<Integer> state) { - Integer currentValue = MoreObjects.firstNonNull(state.read(), 0); - if (currentValue % 2 == 0) { - c.output(currentValue); - } else { - c.output(oddTag, currentValue); - } - state.write(currentValue + 1); - } - }; - - PCollectionTuple output = - pipeline.apply( - Create.of( - KV.of("hello", 42), - KV.of("hello", 97), - KV.of("hello", 84), - KV.of("goodbye", 33), - KV.of("hello", 859), - KV.of("goodbye", 83945))) - .apply(ParDo.of(fn).withOutputTags(evenTag, TupleTagList.of(oddTag))); - - PCollection<Integer> evens = output.get(evenTag); - PCollection<Integer> odds = output.get(oddTag); - - // There are 0 and 2 from "hello" and just 0 from "goodbye" - PAssert.that(evens).containsInAnyOrder(0, 2, 0); - - // There are 1 and 3 from "hello" and just "1" from "goodbye" - PAssert.that(odds).containsInAnyOrder(1, 3, 1); - pipeline.run(); - } - - @Test - @Category({ValidatesRunner.class, UsesStatefulParDo.class, UsesMapState.class}) - public void testMapStateCoderInference() { - StormPipelineOptions options = PipelineOptionsFactory.as(StormPipelineOptions.class); - options.setRunner(TestJStormRunner.class); - Pipeline pipeline = Pipeline.create(options); - - final String stateId = "foo"; - final String countStateId = "count"; - Coder<MyInteger> myIntegerCoder = MyIntegerCoder.of(); - pipeline.getCoderRegistry().registerCoderForClass(MyInteger.class, myIntegerCoder); - - DoFn<KV<String, KV<String, Integer>>, KV<String, MyInteger>> fn = - new DoFn<KV<String, KV<String, Integer>>, KV<String, MyInteger>>() { - - @StateId(stateId) - private final StateSpec<MapState<String, MyInteger>> mapState = StateSpecs.map(); - - @StateId(countStateId) - private final StateSpec<CombiningState<Integer, int[], Integer>> - countState = StateSpecs.combiningFromInputInternal(VarIntCoder.of(), - Sum.ofIntegers()); - - @ProcessElement - public void processElement( - ProcessContext c, @StateId(stateId) MapState<String, MyInteger> state, - @StateId(countStateId) CombiningState<Integer, int[], Integer> - count) { - KV<String, Integer> value = c.element().getValue(); - state.put(value.getKey(), new MyInteger(value.getValue())); - count.add(1); - if (count.read() >= 4) { - Iterable<Map.Entry<String, MyInteger>> iterate = state.entries().read(); - for (Map.Entry<String, MyInteger> entry : iterate) { - c.output(KV.of(entry.getKey(), entry.getValue())); - } - } - } - }; - - PCollection<KV<String, MyInteger>> output = - pipeline.apply( - Create.of( - KV.of("hello", KV.of("a", 97)), KV.of("hello", KV.of("b", 42)), - KV.of("hello", KV.of("b", 42)), KV.of("hello", KV.of("c", 12)))) - .apply(ParDo.of(fn)).setCoder(KvCoder.of(StringUtf8Coder.of(), myIntegerCoder)); - - PAssert.that(output).containsInAnyOrder(KV.of("a", new MyInteger(97)), - KV.of("b", new MyInteger(42)), KV.of("c", new MyInteger(12))); - pipeline.run(); - } - - - private static class WindowOddEvenBuckets extends NonMergingWindowFn<Long, IntervalWindow> { - private static final IntervalWindow EVEN_WINDOW = - new IntervalWindow( - BoundedWindow.TIMESTAMP_MIN_VALUE, GlobalWindow.INSTANCE.maxTimestamp()); - private static final IntervalWindow ODD_WINDOW = - new IntervalWindow( - BoundedWindow.TIMESTAMP_MIN_VALUE, GlobalWindow.INSTANCE.maxTimestamp().minus(1)); - - @Override - public Collection<IntervalWindow> assignWindows(AssignContext c) throws Exception { - if (c.element() % 2 == 0) { - return Collections.singleton(EVEN_WINDOW); - } - return Collections.singleton(ODD_WINDOW); - } - - @Override - public boolean isCompatible(WindowFn<?, ?> other) { - return other instanceof WindowOddEvenBuckets; - } - - @Override - public Coder<IntervalWindow> windowCoder() { - return new IntervalWindow.IntervalWindowCoder(); - } - - @Override - public WindowMappingFn<IntervalWindow> getDefaultWindowMappingFn() { - throw new UnsupportedOperationException( - String.format("Can't use %s for side inputs", getClass().getSimpleName())); - } - } - - - static class TestDoFn extends DoFn<Integer, String> { - enum State {NOT_SET_UP, UNSTARTED, STARTED, PROCESSING, FINISHED} - - State state = State.NOT_SET_UP; - - final List<PCollectionView<Integer>> sideInputViews = new ArrayList<>(); - final List<TupleTag<String>> additionalOutputTupleTags = new ArrayList<>(); - - public TestDoFn() { - } - - public TestDoFn(List<PCollectionView<Integer>> sideInputViews, - List<TupleTag<String>> additionalOutputTupleTags) { - this.sideInputViews.addAll(sideInputViews); - this.additionalOutputTupleTags.addAll(additionalOutputTupleTags); - } - - @Setup - public void prepare() { - assertEquals(State.NOT_SET_UP, state); - state = State.UNSTARTED; - } - - @StartBundle - public void startBundle() { - assertThat(state, - anyOf(equalTo(State.UNSTARTED), equalTo(State.FINISHED))); - - state = State.STARTED; - } - - @ProcessElement - public void processElement(ProcessContext c) { - System.out.println("Recv elem: " + c.element()); - assertThat(state, - anyOf(equalTo(State.STARTED), equalTo(State.PROCESSING))); - state = State.PROCESSING; - outputToAllWithSideInputs(c, "processing: " + c.element()); - } - - @FinishBundle - public void finishBundle(FinishBundleContext c) { - assertThat(state, - anyOf(equalTo(State.STARTED), equalTo(State.PROCESSING))); - state = State.FINISHED; - c.output("finished", BoundedWindow.TIMESTAMP_MIN_VALUE, GlobalWindow.INSTANCE); - for (TupleTag<String> additionalOutputTupleTag : additionalOutputTupleTags) { - c.output( - additionalOutputTupleTag, - additionalOutputTupleTag.getId() + ": " + "finished", - BoundedWindow.TIMESTAMP_MIN_VALUE, - GlobalWindow.INSTANCE); - } - } - - private void outputToAllWithSideInputs(ProcessContext c, String value) { - if (!sideInputViews.isEmpty()) { - List<Integer> sideInputValues = new ArrayList<>(); - for (PCollectionView<Integer> sideInputView : sideInputViews) { - sideInputValues.add(c.sideInput(sideInputView)); - } - value += ": " + sideInputValues; - } - c.output(value); - for (TupleTag<String> additionalOutputTupleTag : additionalOutputTupleTags) { - c.output(additionalOutputTupleTag, - additionalOutputTupleTag.getId() + ": " + value); - } - } - } - - private static class MyInteger implements Comparable<MyInteger> { - private final int value; - - MyInteger(int value) { - this.value = value; - } - - public int getValue() { - return value; - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - - if (!(o instanceof MyInteger)) { - return false; - } - - MyInteger myInteger = (MyInteger) o; - - return value == myInteger.value; - - } - - @Override - public int hashCode() { - return value; - } - - @Override - public int compareTo(MyInteger o) { - return Integer.compare(this.getValue(), o.getValue()); - } - - @Override - public String toString() { - return "MyInteger{" + "value=" + value + '}'; - } - } - - private static class MyIntegerCoder extends AtomicCoder<MyInteger> { - private static final MyIntegerCoder INSTANCE = new MyIntegerCoder(); - - private final VarIntCoder delegate = VarIntCoder.of(); - - public static MyIntegerCoder of() { - return INSTANCE; - } - - @Override - public void encode(MyInteger value, OutputStream outStream) - throws CoderException, IOException { - delegate.encode(value.getValue(), outStream); - } - - @Override - public MyInteger decode(InputStream inStream) throws CoderException, - IOException { - return new MyInteger(delegate.decode(inStream)); - } - } - - /** PAssert "matcher" for expected output. */ - static class HasExpectedOutput - implements SerializableFunction<Iterable<String>, Void>, Serializable { - private final List<Integer> inputs; - private final List<Integer> sideInputs; - private final String additionalOutput; - private final boolean ordered; - - public static HasExpectedOutput forInput(List<Integer> inputs) { - return new HasExpectedOutput( - new ArrayList<Integer>(inputs), - new ArrayList<Integer>(), - "", - false); - } - - private HasExpectedOutput(List<Integer> inputs, - List<Integer> sideInputs, - String additionalOutput, - boolean ordered) { - this.inputs = inputs; - this.sideInputs = sideInputs; - this.additionalOutput = additionalOutput; - this.ordered = ordered; - } - - public HasExpectedOutput andSideInputs(Integer... sideInputValues) { - List<Integer> sideInputs = new ArrayList<>(); - for (Integer sideInputValue : sideInputValues) { - sideInputs.add(sideInputValue); - } - return new HasExpectedOutput(inputs, sideInputs, additionalOutput, ordered); - } - - public HasExpectedOutput fromOutput(TupleTag<String> outputTag) { - return fromOutput(outputTag.getId()); - } - public HasExpectedOutput fromOutput(String outputId) { - return new HasExpectedOutput(inputs, sideInputs, outputId, ordered); - } - - public HasExpectedOutput inOrder() { - return new HasExpectedOutput(inputs, sideInputs, additionalOutput, true); - } - - @Override - public Void apply(Iterable<String> outputs) { - List<String> processeds = new ArrayList<>(); - List<String> finisheds = new ArrayList<>(); - for (String output : outputs) { - if (output.contains("finished")) { - finisheds.add(output); - } else { - processeds.add(output); - } - } - - String sideInputsSuffix; - if (sideInputs.isEmpty()) { - sideInputsSuffix = ""; - } else { - sideInputsSuffix = ": " + sideInputs; - } - - String additionalOutputPrefix; - if (additionalOutput.isEmpty()) { - additionalOutputPrefix = ""; - } else { - additionalOutputPrefix = additionalOutput + ": "; - } - - List<String> expectedProcesseds = new ArrayList<>(); - for (Integer input : inputs) { - expectedProcesseds.add( - additionalOutputPrefix + "processing: " + input + sideInputsSuffix); - } - String[] expectedProcessedsArray = - expectedProcesseds.toArray(new String[expectedProcesseds.size()]); - if (!ordered || expectedProcesseds.isEmpty()) { - assertThat(processeds, containsInAnyOrder(expectedProcessedsArray)); - } else { - assertThat(processeds, contains(expectedProcessedsArray)); - } - - for (String finished : finisheds) { - assertEquals(additionalOutputPrefix + "finished", finished); - } - - return null; - } - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/test/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormStateInternalsTest.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/test/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormStateInternalsTest.java b/runners/jstorm/src/test/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormStateInternalsTest.java new file mode 100644 index 0000000..11c7c94 --- /dev/null +++ b/runners/jstorm/src/test/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormStateInternalsTest.java @@ -0,0 +1,219 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.jstorm.translation.runtime.state; + +import avro.shaded.com.google.common.collect.Maps; +import org.apache.beam.runners.jstorm.translation.runtime.TimerServiceImpl; +import com.alibaba.jstorm.cache.IKvStoreManager; +import com.alibaba.jstorm.cache.rocksdb.RocksDbKvStoreManagerFactory; +import com.alibaba.jstorm.utils.KryoSerializer; + +import org.apache.beam.runners.core.StateNamespaces; +import org.apache.beam.runners.core.StateTags; +import org.apache.beam.sdk.coders.BigEndianIntegerCoder; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderRegistry; +import org.apache.beam.sdk.state.*; +import org.apache.beam.sdk.transforms.Combine; +import org.apache.beam.sdk.transforms.Max; +import org.apache.beam.sdk.transforms.windowing.TimestampCombiner; +import org.joda.time.Instant; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +import java.util.Iterator; +import java.util.Map; + +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.hasEntry; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThat; + +/** + * Tests for {@link JStormStateInternals}. + */ +@RunWith(JUnit4.class) +public class JStormStateInternalsTest { + + @Rule + public final TemporaryFolder tmp = new TemporaryFolder(); + + private JStormStateInternals<String> jstormStateInternals; + + @Before + public void setup() throws Exception { + IKvStoreManager kvStoreManager = RocksDbKvStoreManagerFactory.getManager( + Maps.newHashMap(), + "test", + tmp.toString(), + new KryoSerializer(Maps.newHashMap())); + jstormStateInternals = new JStormStateInternals("key-1", kvStoreManager, new TimerServiceImpl(), 0); + } + + @Test + public void testValueState() throws Exception { + ValueState<Integer> valueState = jstormStateInternals.state( + StateNamespaces.global(), StateTags.value("state-id-a", BigEndianIntegerCoder.of())); + valueState.write(Integer.MIN_VALUE); + assertEquals(Integer.MIN_VALUE, valueState.read().longValue()); + valueState.write(Integer.MAX_VALUE); + assertEquals(Integer.MAX_VALUE, valueState.read().longValue()); + } + + @Test + public void testValueStateIdenticalId() throws Exception { + ValueState<Integer> valueState = jstormStateInternals.state( + StateNamespaces.global(), StateTags.value("state-id-a", BigEndianIntegerCoder.of())); + ValueState<Integer> valueStateIdentical = jstormStateInternals.state( + StateNamespaces.global(), StateTags.value("state-id-a", BigEndianIntegerCoder.of())); + + valueState.write(Integer.MIN_VALUE); + assertEquals(Integer.MIN_VALUE, valueState.read().longValue()); + assertEquals(Integer.MIN_VALUE, valueStateIdentical.read().longValue()); + valueState.write(Integer.MAX_VALUE); + assertEquals(Integer.MAX_VALUE, valueState.read().longValue()); + assertEquals(Integer.MAX_VALUE, valueStateIdentical.read().longValue()); + } + + @Test + public void testBagState() throws Exception { + BagState<Integer> bagStateA = jstormStateInternals.state( + StateNamespaces.global(), StateTags.bag("state-id-a", BigEndianIntegerCoder.of())); + BagState<Integer> bagStateB = jstormStateInternals.state( + StateNamespaces.global(), StateTags.bag("state-id-b", BigEndianIntegerCoder.of())); + + bagStateA.add(1); + bagStateA.add(0); + bagStateA.add(Integer.MAX_VALUE); + + bagStateB.add(0); + bagStateB.add(Integer.MIN_VALUE); + + Iterable<Integer> bagA = bagStateA.read(); + Iterable<Integer> bagB = bagStateB.read(); + assertThat(bagA, containsInAnyOrder(1, 0, Integer.MAX_VALUE)); + assertThat(bagB, containsInAnyOrder(0, Integer.MIN_VALUE)); + + bagStateA.clear(); + bagStateA.add(1); + bagStateB.add(0); + assertThat(bagStateA.read(), containsInAnyOrder(1)); + assertThat(bagStateB.read(), containsInAnyOrder(0, 0, Integer.MIN_VALUE)); + } + + @Test + public void testCombiningState() throws Exception { + Combine.CombineFn<Integer, int[], Integer> combineFn = Max.ofIntegers(); + Coder<int[]> accumCoder = combineFn.getAccumulatorCoder( + CoderRegistry.createDefault(), BigEndianIntegerCoder.of()); + + CombiningState<Integer, int[], Integer> combiningState = jstormStateInternals.state( + StateNamespaces.global(), + StateTags.combiningValue( + "state-id-a", + accumCoder, + combineFn)); + assertEquals(Integer.MIN_VALUE, combiningState.read().longValue()); + combiningState.add(10); + assertEquals(10, combiningState.read().longValue()); + combiningState.add(1); + assertEquals(10, combiningState.read().longValue()); + combiningState.add(Integer.MAX_VALUE); + assertEquals(Integer.MAX_VALUE, combiningState.read().longValue()); + } + + @Test + public void testWatermarkHoldState() throws Exception { + WatermarkHoldState watermarkHoldState = jstormStateInternals.state( + StateNamespaces.global(), + StateTags.watermarkStateInternal( + "state-id-a", + TimestampCombiner.EARLIEST)); + watermarkHoldState.add(new Instant(1)); + assertEquals(1, watermarkHoldState.read().getMillis()); + watermarkHoldState.add(new Instant(Integer.MIN_VALUE)); + assertEquals(Integer.MIN_VALUE, watermarkHoldState.read().getMillis()); + watermarkHoldState.add(new Instant(Integer.MAX_VALUE)); + assertEquals(Integer.MIN_VALUE, watermarkHoldState.read().getMillis()); + } + + @Test + public void testMapState() throws Exception { + MapState<Integer, Integer> mapStateA = jstormStateInternals.state( + StateNamespaces.global(), StateTags.map("state-id-a", BigEndianIntegerCoder.of(), BigEndianIntegerCoder.of())); + mapStateA.put(1, 1); + mapStateA.put(2, 22); + mapStateA.put(1, 12); + + Iterable<Integer> keys = mapStateA.keys().read(); + Iterable<Integer> values = mapStateA.values().read(); + assertThat(keys, containsInAnyOrder(1, 2)); + assertThat(values, containsInAnyOrder(12, 22)); + + Iterable<Map.Entry<Integer, Integer>> entries = mapStateA.entries().read(); + Iterator<Map.Entry<Integer, Integer>> itr = entries.iterator(); + Map.Entry<Integer, Integer> entry = itr.next(); + assertEquals((long) entry.getKey(), 1l); + assertEquals((long) entry.getValue(), 12l); + entry = itr.next(); + assertEquals((long) entry.getKey(), 2l); + assertEquals((long) entry.getValue(), 22l); + assertEquals(false, itr.hasNext()); + + mapStateA.remove(1); + keys = mapStateA.keys().read(); + values = mapStateA.values().read(); + assertThat(keys, containsInAnyOrder(2)); + assertThat(values, containsInAnyOrder(22)); + + entries = mapStateA.entries().read(); + itr = entries.iterator(); + entry = itr.next(); + assertEquals((long) entry.getKey(), 2l); + assertEquals((long) entry.getValue(), 22l); + assertEquals(false, itr.hasNext()); + } + + @Test + public void testMassiveDataOfBagState() { + BagState<Integer> bagStateA = jstormStateInternals.state( + StateNamespaces.global(), StateTags.bag("state-id-a", BigEndianIntegerCoder.of())); + + int count = 10000; + int n = 1; + while(n <= count) { + bagStateA.add(n); + n++; + } + + int readCount = 0; + int readN = 0; + Iterator<Integer> itr = bagStateA.read().iterator(); + while(itr.hasNext()) { + readN += itr.next(); + readCount++; + } + + assertEquals((long) readN, ((1 + count) * count) / 2); + assertEquals((long) readCount, count); + } +}
