http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/ParDoBoundMultiTranslator.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/ParDoBoundMultiTranslator.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/ParDoBoundMultiTranslator.java deleted file mode 100644 index 6e3392c..0000000 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/ParDoBoundMultiTranslator.java +++ /dev/null @@ -1,118 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.jstorm.translation.translator; - -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Maps; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import org.apache.beam.runners.jstorm.translation.TranslationContext; -import org.apache.beam.runners.jstorm.translation.runtime.DoFnExecutor; -import org.apache.beam.runners.jstorm.translation.runtime.MultiOutputDoFnExecutor; -import org.apache.beam.runners.jstorm.translation.runtime.MultiStatefulDoFnExecutor; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.transforms.reflect.DoFnSignature; -import org.apache.beam.sdk.transforms.reflect.DoFnSignatures; -import org.apache.beam.sdk.util.WindowedValue; -import org.apache.beam.sdk.values.KV; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PCollectionView; -import org.apache.beam.sdk.values.PValue; -import org.apache.beam.sdk.values.PValueBase; -import org.apache.beam.sdk.values.TupleTag; - -/** - * Translates a ParDo.BoundMulti to a Storm {@link DoFnExecutor}. - */ -public class ParDoBoundMultiTranslator<InputT, OutputT> - extends TransformTranslator.Default<ParDo.MultiOutput<InputT, OutputT>> { - - @Override - public void translateNode( - ParDo.MultiOutput<InputT, OutputT> transform, TranslationContext context) { - final TranslationContext.UserGraphContext userGraphContext = context.getUserGraphContext(); - final TupleTag<InputT> inputTag = (TupleTag<InputT>) userGraphContext.getInputTag(); - PCollection<InputT> input = (PCollection<InputT>) userGraphContext.getInput(); - - Map<TupleTag<?>, PValue> allOutputs = Maps.newHashMap(userGraphContext.getOutputs()); - Map<TupleTag<?>, TupleTag<?>> localToExternalTupleTagMap = Maps.newHashMap(); - for (Map.Entry<TupleTag<?>, PValue> entry : allOutputs.entrySet()) { - Iterator<TupleTag<?>> itr = ((PValueBase) entry.getValue()).expand().keySet().iterator(); - localToExternalTupleTagMap.put(entry.getKey(), itr.next()); - } - - TupleTag<OutputT> mainOutputTag = (TupleTag<OutputT>) userGraphContext.getOutputTag(); - List<TupleTag<?>> sideOutputTags = userGraphContext.getOutputTags(); - sideOutputTags.remove(mainOutputTag); - - Map<TupleTag<?>, PValue> allInputs = Maps.newHashMap(userGraphContext.getInputs()); - for (PCollectionView pCollectionView : transform.getSideInputs()) { - allInputs.put(userGraphContext.findTupleTag(pCollectionView), pCollectionView); - } - String description = describeTransform( - transform, - allInputs, - allOutputs); - - ImmutableMap.Builder<TupleTag, PCollectionView<?>> sideInputTagToView = ImmutableMap.builder(); - for (PCollectionView pCollectionView : transform.getSideInputs()) { - sideInputTagToView.put(userGraphContext.findTupleTag(pCollectionView), pCollectionView); - } - - DoFnExecutor executor; - DoFnSignature signature = DoFnSignatures.getSignature(transform.getFn().getClass()); - if (signature.stateDeclarations().size() > 0 - || signature.timerDeclarations().size() > 0) { - executor = new MultiStatefulDoFnExecutor<>( - userGraphContext.getStepName(), - description, - userGraphContext.getOptions(), - (DoFn<KV, OutputT>) transform.getFn(), - (Coder) WindowedValue.getFullCoder( - input.getCoder(), input.getWindowingStrategy().getWindowFn().windowCoder()), - input.getWindowingStrategy(), - (TupleTag<KV>) inputTag, - transform.getSideInputs(), - sideInputTagToView.build(), - mainOutputTag, - sideOutputTags, - localToExternalTupleTagMap); - } else { - executor = new MultiOutputDoFnExecutor<>( - userGraphContext.getStepName(), - description, - userGraphContext.getOptions(), - transform.getFn(), - WindowedValue.getFullCoder( - input.getCoder(), input.getWindowingStrategy().getWindowFn().windowCoder()), - input.getWindowingStrategy(), - inputTag, - transform.getSideInputs(), - sideInputTagToView.build(), - mainOutputTag, - sideOutputTags, - localToExternalTupleTagMap); - } - - context.addTransformExecutor(executor, ImmutableList.<PValue>copyOf(transform.getSideInputs())); - } -}
http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/ParDoBoundTranslator.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/ParDoBoundTranslator.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/ParDoBoundTranslator.java deleted file mode 100644 index ad8f85f..0000000 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/ParDoBoundTranslator.java +++ /dev/null @@ -1,110 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.jstorm.translation.translator; - -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Lists; -import java.util.List; -import java.util.Map; -import org.apache.beam.runners.jstorm.translation.TranslationContext; -import org.apache.beam.runners.jstorm.translation.runtime.DoFnExecutor; -import org.apache.beam.runners.jstorm.translation.runtime.StatefulDoFnExecutor; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.transforms.reflect.DoFnSignature; -import org.apache.beam.sdk.transforms.reflect.DoFnSignatures; -import org.apache.beam.sdk.util.WindowedValue; -import org.apache.beam.sdk.values.KV; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PCollectionView; -import org.apache.beam.sdk.values.PValue; -import org.apache.beam.sdk.values.TupleTag; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Translates a ParDo.Bound to a JStorm {@link DoFnExecutor}. - */ -public class ParDoBoundTranslator<InputT, OutputT> - extends TransformTranslator.Default<ParDo.SingleOutput<InputT, OutputT>> { - - private static final Logger LOG = LoggerFactory.getLogger(ParDoBoundTranslator.class); - - @Override - public void translateNode( - ParDo.SingleOutput<InputT, OutputT> transform, TranslationContext context) { - final TranslationContext.UserGraphContext userGraphContext = context.getUserGraphContext(); - final TupleTag<?> inputTag = userGraphContext.getInputTag(); - PCollection<InputT> input = (PCollection<InputT>) userGraphContext.getInput(); - - TupleTag<OutputT> mainOutputTag = (TupleTag<OutputT>) userGraphContext.getOutputTag(); - List<TupleTag<?>> sideOutputTags = Lists.newArrayList(); - - Map<TupleTag<?>, PValue> allInputs = - avro.shaded.com.google.common.collect.Maps.newHashMap(userGraphContext.getInputs()); - for (PCollectionView pCollectionView : transform.getSideInputs()) { - allInputs.put(userGraphContext.findTupleTag(pCollectionView), pCollectionView); - } - String description = describeTransform( - transform, - allInputs, - userGraphContext.getOutputs()); - - ImmutableMap.Builder<TupleTag, PCollectionView<?>> sideInputTagToView = ImmutableMap.builder(); - for (PCollectionView pCollectionView : transform.getSideInputs()) { - sideInputTagToView.put(userGraphContext.findTupleTag(pCollectionView), pCollectionView); - } - - DoFnExecutor executor; - DoFnSignature signature = DoFnSignatures.getSignature(transform.getFn().getClass()); - if (signature.stateDeclarations().size() > 0 - || signature.timerDeclarations().size() > 0) { - executor = new StatefulDoFnExecutor<>( - userGraphContext.getStepName(), - description, - userGraphContext.getOptions(), - (DoFn<KV, OutputT>) transform.getFn(), - (Coder) WindowedValue.getFullCoder( - input.getCoder(), input.getWindowingStrategy().getWindowFn().windowCoder()), - input.getWindowingStrategy(), - (TupleTag<KV>) inputTag, - transform.getSideInputs(), - sideInputTagToView.build(), - mainOutputTag, - sideOutputTags); - } else { - executor = new DoFnExecutor<>( - userGraphContext.getStepName(), - description, - userGraphContext.getOptions(), - transform.getFn(), - WindowedValue.getFullCoder( - input.getCoder(), input.getWindowingStrategy().getWindowFn().windowCoder()), - input.getWindowingStrategy(), - (TupleTag<InputT>) inputTag, - transform.getSideInputs(), - sideInputTagToView.build(), - mainOutputTag, - sideOutputTags); - } - - context.addTransformExecutor(executor, ImmutableList.<PValue>copyOf(transform.getSideInputs())); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/Stream.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/Stream.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/Stream.java deleted file mode 100644 index 71243b9..0000000 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/Stream.java +++ /dev/null @@ -1,104 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.jstorm.translation.translator; - -import static com.google.common.base.Preconditions.checkArgument; -import static com.google.common.base.Preconditions.checkNotNull; - -import com.google.auto.value.AutoValue; -import java.util.List; -import javax.annotation.Nullable; - -/** - * Class that defines the stream connection between upstream and downstream components. - */ -@AutoValue -public abstract class Stream { - - public abstract Producer getProducer(); - - public abstract Consumer getConsumer(); - - public static Stream of(Producer producer, Consumer consumer) { - return new org.apache.beam.runners.jstorm.translation.translator.AutoValue_Stream( - producer, consumer); - } - - /** - * JStorm producer. - */ - @AutoValue - public abstract static class Producer { - public abstract String getComponentId(); - - public abstract String getStreamId(); - - public abstract String getStreamName(); - - public static Producer of(String componentId, String streamId, String streamName) { - return new org.apache.beam.runners.jstorm.translation.translator.AutoValue_Stream_Producer( - componentId, streamId, streamName); - } - } - - /** - * JStorm consumer. - */ - @AutoValue - public abstract static class Consumer { - public abstract String getComponentId(); - - public abstract Grouping getGrouping(); - - public static Consumer of(String componentId, Grouping grouping) { - return new org.apache.beam.runners.jstorm.translation.translator.AutoValue_Stream_Consumer( - componentId, grouping); - } - } - - /** - * JStorm grouping, which define how to transfer message between two nodes. - */ - @AutoValue - public abstract static class Grouping { - public abstract Type getType(); - - @Nullable - public abstract List<String> getFields(); - - public static Grouping of(Type type) { - checkArgument(!Type.FIELDS.equals(type), "Fields grouping should provide key fields."); - return new org.apache.beam.runners.jstorm.translation.translator.AutoValue_Stream_Grouping( - type, null /* fields */); - } - - public static Grouping byFields(List<String> fields) { - checkNotNull(fields, "fields"); - checkArgument(!fields.isEmpty(), "No key fields were provided for field grouping!"); - return new org.apache.beam.runners.jstorm.translation.translator.AutoValue_Stream_Grouping( - Type.FIELDS, fields); - } - - /** - * Types of stream groupings Storm allows. - */ - public enum Type { - ALL, CUSTOM, DIRECT, SHUFFLE, LOCAL_OR_SHUFFLE, FIELDS, GLOBAL, NONE - } - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/TransformTranslator.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/TransformTranslator.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/TransformTranslator.java deleted file mode 100644 index bfa94a0..0000000 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/TransformTranslator.java +++ /dev/null @@ -1,80 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.jstorm.translation.translator; - -import com.google.common.base.Function; -import com.google.common.base.Joiner; -import com.google.common.collect.FluentIterable; -import java.util.Map; -import org.apache.beam.runners.jstorm.translation.TranslationContext; -import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.values.PValue; -import org.apache.beam.sdk.values.TupleTag; - -/** - * Interface for classes capable of tranforming Beam PTransforms into Storm primitives. - */ -public interface TransformTranslator<T extends PTransform<?, ?>> { - - void translateNode(T transform, TranslationContext context); - - /** - * Returns true if this translator can translate the given transform. - */ - boolean canTranslate(T transform, TranslationContext context); - - /** - * Default translator. - * @param <T1> - */ - class Default<T1 extends PTransform<?, ?>> implements TransformTranslator<T1> { - @Override - public void translateNode(T1 transform, TranslationContext context) { - - } - - @Override - public boolean canTranslate(T1 transform, TranslationContext context) { - return true; - } - - static String describeTransform( - PTransform<?, ?> transform, - Map<TupleTag<?>, PValue> inputs, - Map<TupleTag<?>, PValue> outputs) { - return String.format("%s --> %s --> %s", - Joiner.on('+').join(FluentIterable.from(inputs.entrySet()) - .transform(new Function<Map.Entry<TupleTag<?>, PValue>, String>() { - @Override - public String apply(Map.Entry<TupleTag<?>, PValue> taggedPValue) { - return taggedPValue.getKey().getId(); - // return taggedPValue.getValue().getName(); - } - })), - transform.getName(), - Joiner.on('+').join(FluentIterable.from(outputs.entrySet()) - .transform(new Function<Map.Entry<TupleTag<?>, PValue>, String>() { - @Override - public String apply(Map.Entry<TupleTag<?>, PValue> taggedPvalue) { - return taggedPvalue.getKey().getId(); - //return taggedPValue.getValue().getName(); - } - }))); - } - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/UnboundedSourceTranslator.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/UnboundedSourceTranslator.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/UnboundedSourceTranslator.java deleted file mode 100644 index 33ac024..0000000 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/UnboundedSourceTranslator.java +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.jstorm.translation.translator; - -import org.apache.beam.runners.jstorm.translation.TranslationContext; -import org.apache.beam.runners.jstorm.translation.runtime.UnboundedSourceSpout; -import org.apache.beam.sdk.io.Read; -import org.apache.beam.sdk.values.PValue; -import org.apache.beam.sdk.values.TaggedPValue; -import org.apache.beam.sdk.values.TupleTag; - -/** - * Translates a Read.Unbounded into a Storm spout. - * - * @param <T> - */ -public class UnboundedSourceTranslator<T> extends TransformTranslator.Default<Read.Unbounded<T>> { - public void translateNode(Read.Unbounded<T> transform, TranslationContext context) { - TranslationContext.UserGraphContext userGraphContext = context.getUserGraphContext(); - String description = - describeTransform(transform, userGraphContext.getInputs(), userGraphContext.getOutputs()); - - TupleTag<?> tag = userGraphContext.getOutputTag(); - PValue output = userGraphContext.getOutput(); - - UnboundedSourceSpout spout = new UnboundedSourceSpout( - description, - transform.getSource(), userGraphContext.getOptions(), tag); - context.getExecutionGraphContext().registerSpout(spout, TaggedPValue.of(tag, output)); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/ViewTranslator.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/ViewTranslator.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/ViewTranslator.java deleted file mode 100644 index f71ee9c..0000000 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/ViewTranslator.java +++ /dev/null @@ -1,380 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.jstorm.translation.translator; - -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.Map; -import org.apache.beam.runners.jstorm.translation.TranslationContext; -import org.apache.beam.runners.jstorm.translation.runtime.ViewExecutor; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.CoderRegistry; -import org.apache.beam.sdk.coders.KvCoder; -import org.apache.beam.sdk.coders.ListCoder; -import org.apache.beam.sdk.transforms.Combine; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.transforms.View; -import org.apache.beam.sdk.values.KV; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PCollectionView; -import org.apache.beam.sdk.values.PCollectionViews; - -/** - * A {@link TransformTranslator} for executing {@link View Views} in JStorm runner. - */ -public class ViewTranslator - extends TransformTranslator.Default<ViewTranslator.CreateJStormPCollectionView<?, ?>> { - @Override - public void translateNode( - CreateJStormPCollectionView<?, ?> transform, TranslationContext context) { - TranslationContext.UserGraphContext userGraphContext = context.getUserGraphContext(); - String description = describeTransform( - transform, userGraphContext.getInputs(), userGraphContext.getOutputs()); - ViewExecutor viewExecutor = new ViewExecutor(description, userGraphContext.getOutputTag()); - context.addTransformExecutor(viewExecutor); - } - - /** - * Specialized implementation for - * {@link org.apache.beam.sdk.transforms.View.AsMap View.AsMap}. - */ - public static class ViewAsMap<K, V> - extends PTransform<PCollection<KV<K, V>>, PCollectionView<Map<K, V>>> { - - @SuppressWarnings("unused") // used via reflection in JstormRunner#apply() - public ViewAsMap(View.AsMap<K, V> transform) { - } - - @Override - public PCollectionView<Map<K, V>> expand(PCollection<KV<K, V>> input) { - PCollectionView<Map<K, V>> view = - PCollectionViews.mapView( - input, - input.getWindowingStrategy(), - input.getCoder()); - - @SuppressWarnings({"rawtypes", "unchecked"}) - KvCoder<K, V> inputCoder = (KvCoder) input.getCoder(); - try { - inputCoder.getKeyCoder().verifyDeterministic(); - } catch (Coder.NonDeterministicException e) { - // TODO: log warning as other runners. - } - - return input - .apply(Combine.globally(new Concatenate<KV<K, V>>()).withoutDefaults()) - .apply(CreateJStormPCollectionView.<KV<K, V>, Map<K, V>>of(view)); - } - - @Override - protected String getKindString() { - return "StreamingViewAsMap"; - } - } - - /** - * Specialized expansion for {@link - * View.AsMultimap View.AsMultimap}. - */ - public static class ViewAsMultimap<K, V> - extends PTransform<PCollection<KV<K, V>>, PCollectionView<Map<K, Iterable<V>>>> { - - /** - * Builds an instance of this class from the overridden transform. - */ - @SuppressWarnings("unused") // used via reflection in JStormRunner#apply() - public ViewAsMultimap(View.AsMultimap<K, V> transform) { - } - - @Override - public PCollectionView<Map<K, Iterable<V>>> expand(PCollection<KV<K, V>> input) { - PCollectionView<Map<K, Iterable<V>>> view = - PCollectionViews.multimapView( - input, - input.getWindowingStrategy(), - input.getCoder()); - - @SuppressWarnings({"rawtypes", "unchecked"}) - KvCoder<K, V> inputCoder = (KvCoder) input.getCoder(); - try { - inputCoder.getKeyCoder().verifyDeterministic(); - } catch (Coder.NonDeterministicException e) { - // TODO: log warning as other runners. - } - - return input - .apply(Combine.globally(new Concatenate<KV<K, V>>()).withoutDefaults()) - .apply(CreateJStormPCollectionView.<KV<K, V>, Map<K, Iterable<V>>>of(view)); - } - - @Override - protected String getKindString() { - return "StreamingViewAsMultimap"; - } - } - - /** - * Specialized implementation for - * {@link View.AsList View.AsList}. - */ - public static class ViewAsList<T> - extends PTransform<PCollection<T>, PCollectionView<List<T>>> { - /** - * Builds an instance of this class from the overridden transform. - */ - @SuppressWarnings("unused") // used via reflection in JStormRunner#apply() - public ViewAsList(View.AsList<T> transform) { - } - - @Override - public PCollectionView<List<T>> expand(PCollection<T> input) { - PCollectionView<List<T>> view = - PCollectionViews.listView( - input, - input.getWindowingStrategy(), - input.getCoder()); - - return input.apply(Combine.globally(new Concatenate<T>()).withoutDefaults()) - .apply(CreateJStormPCollectionView.<T, List<T>>of(view)); - } - - @Override - protected String getKindString() { - return "StreamingViewAsList"; - } - } - - /** - * Specialized implementation for - * {@link View.AsIterable View.AsIterable} for the - * JStorm runner in streaming mode. - */ - public static class ViewAsIterable<T> - extends PTransform<PCollection<T>, PCollectionView<Iterable<T>>> { - /** - * Builds an instance of this class from the overridden transform. - */ - @SuppressWarnings("unused") // used via reflection in JStormRunner#apply() - public ViewAsIterable(View.AsIterable<T> transform) { - } - - @Override - public PCollectionView<Iterable<T>> expand(PCollection<T> input) { - PCollectionView<Iterable<T>> view = - PCollectionViews.iterableView( - input, - input.getWindowingStrategy(), - input.getCoder()); - - return input.apply(Combine.globally(new Concatenate<T>()).withoutDefaults()) - .apply(CreateJStormPCollectionView.<T, Iterable<T>>of(view)); - } - - @Override - protected String getKindString() { - return "StreamingViewAsIterable"; - } - } - - /** - * Specialized expansion for - * {@link View.AsSingleton View.AsSingleton} for the - * JStorm runner in streaming mode. - */ - public static class ViewAsSingleton<T> - extends PTransform<PCollection<T>, PCollectionView<T>> { - private View.AsSingleton<T> transform; - - /** - * Builds an instance of this class from the overridden transform. - */ - @SuppressWarnings("unused") // used via reflection in JStormRunner#apply() - public ViewAsSingleton(View.AsSingleton<T> transform) { - this.transform = transform; - } - - @Override - public PCollectionView<T> expand(PCollection<T> input) { - Combine.Globally<T, T> combine = Combine.globally( - new SingletonCombine<>(transform.hasDefaultValue(), transform.defaultValue())); - if (!transform.hasDefaultValue()) { - combine = combine.withoutDefaults(); - } - return input.apply(combine.asSingletonView()); - } - - @Override - protected String getKindString() { - return "StreamingViewAsSingleton"; - } - - private static class SingletonCombine<T> extends Combine.BinaryCombineFn<T> { - private boolean hasDefaultValue; - private T defaultValue; - - SingletonCombine(boolean hasDefaultValue, T defaultValue) { - this.hasDefaultValue = hasDefaultValue; - this.defaultValue = defaultValue; - } - - @Override - public T apply(T left, T right) { - throw new IllegalArgumentException("PCollection with more than one element " - + "accessed as a singleton view. Consider using Combine.globally().asSingleton() to " - + "combine the PCollection into a single value"); - } - - @Override - public T identity() { - if (hasDefaultValue) { - return defaultValue; - } else { - throw new IllegalArgumentException( - "Empty PCollection accessed as a singleton view. " - + "Consider setting withDefault to provide a default value"); - } - } - } - } - - /** - * Specialized expansion for - * {@link org.apache.beam.sdk.transforms.Combine.GloballyAsSingletonView}. - * @param <InputT> - * @param <OutputT> - */ - public static class CombineGloballyAsSingletonView<InputT, OutputT> - extends PTransform<PCollection<InputT>, PCollectionView<OutputT>> { - Combine.GloballyAsSingletonView<InputT, OutputT> transform; - - /** - * Builds an instance of this class from the overridden transform. - */ - @SuppressWarnings("unused") // used via reflection in FlinkRunner#apply() - public CombineGloballyAsSingletonView( - Combine.GloballyAsSingletonView<InputT, OutputT> transform) { - this.transform = transform; - } - - @Override - public PCollectionView<OutputT> expand(PCollection<InputT> input) { - PCollection<OutputT> combined = - input.apply(Combine.globally(transform.getCombineFn()) - .withoutDefaults() - .withFanout(transform.getFanout())); - - PCollectionView<OutputT> view = PCollectionViews.singletonView( - combined, - combined.getWindowingStrategy(), - transform.getInsertDefault(), - transform.getInsertDefault() - ? transform.getCombineFn().defaultValue() : null, - combined.getCoder()); - return combined - .apply(ParDo.of(new WrapAsList<OutputT>())) - .apply(CreateJStormPCollectionView.<OutputT, OutputT>of(view)); - } - - @Override - protected String getKindString() { - return "StreamingCombineGloballyAsSingletonView"; - } - } - - private static class WrapAsList<T> extends DoFn<T, List<T>> { - @ProcessElement - public void processElement(ProcessContext c) { - c.output(Collections.singletonList(c.element())); - } - } - - /** - * Combiner that combines {@code T}s into a single {@code List<T>} containing all inputs. - * They require the input {@link PCollection} fits in memory. - * For a large {@link PCollection} this is expected to crash! - * - * @param <T> the type of elements to concatenate. - */ - private static class Concatenate<T> extends Combine.CombineFn<T, List<T>, List<T>> { - private static final long serialVersionUID = 1L; - - @Override - public List<T> createAccumulator() { - return new ArrayList<>(); - } - - @Override - public List<T> addInput(List<T> accumulator, T input) { - accumulator.add(input); - return accumulator; - } - - @Override - public List<T> mergeAccumulators(Iterable<List<T>> accumulators) { - List<T> result = createAccumulator(); - for (List<T> accumulator : accumulators) { - result.addAll(accumulator); - } - return result; - } - - @Override - public List<T> extractOutput(List<T> accumulator) { - return accumulator; - } - - @Override - public Coder<List<T>> getAccumulatorCoder(CoderRegistry registry, Coder<T> inputCoder) { - return ListCoder.of(inputCoder); - } - - @Override - public Coder<List<T>> getDefaultOutputCoder(CoderRegistry registry, Coder<T> inputCoder) { - return ListCoder.of(inputCoder); - } - } - - /** - * Creates a primitive {@link PCollectionView}. - * For internal use only by runner implementors. - * - * @param <ElemT> The type of the elements of the input PCollection - * @param <ViewT> The type associated with the {@link PCollectionView} used as a side input - */ - public static class CreateJStormPCollectionView<ElemT, ViewT> - extends PTransform<PCollection<List<ElemT>>, PCollectionView<ViewT>> { - private PCollectionView<ViewT> view; - - private CreateJStormPCollectionView(PCollectionView<ViewT> view) { - this.view = view; - } - - public static <ElemT, ViewT> CreateJStormPCollectionView<ElemT, ViewT> of( - PCollectionView<ViewT> view) { - return new CreateJStormPCollectionView<>(view); - } - - @Override - public PCollectionView<ViewT> expand(PCollection<List<ElemT>> input) { - return view; - } - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/WindowAssignTranslator.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/WindowAssignTranslator.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/WindowAssignTranslator.java deleted file mode 100644 index 2ccb8d7..0000000 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/WindowAssignTranslator.java +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.jstorm.translation.translator; - -import org.apache.beam.runners.jstorm.translation.TranslationContext; -import org.apache.beam.runners.jstorm.translation.runtime.WindowAssignExecutor; -import org.apache.beam.sdk.transforms.windowing.Window; - -/** - * Translates a {@link org.apache.beam.sdk.transforms.windowing.Window.Assign} to a - * JStorm {@link WindowAssignExecutor}. - * @param <T> - */ -public class WindowAssignTranslator<T> extends TransformTranslator.Default<Window.Assign<T>> { - - @Override - public void translateNode(Window.Assign<T> transform, TranslationContext context) { - TranslationContext.UserGraphContext userGraphContext = context.getUserGraphContext(); - String description = - describeTransform(transform, userGraphContext.getInputs(), userGraphContext.getOutputs()); - context.getUserGraphContext().setWindowed(); - WindowAssignExecutor executor = new WindowAssignExecutor( - description, - transform.getWindowFn(), - userGraphContext.getOutputTag()); - context.addTransformExecutor(executor); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/util/CommonInstance.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/util/CommonInstance.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/util/CommonInstance.java deleted file mode 100644 index 4b92a4c..0000000 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/util/CommonInstance.java +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.jstorm.translation.util; - -/** - * Common definition of JStorm runner. - */ -public class CommonInstance { - public static final String KEY = "Key"; - public static final String VALUE = "Value"; - - public static final String BEAM_WATERMARK_STREAM_ID = "BEAM_WATERMARK"; -} http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/util/DefaultStepContext.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/util/DefaultStepContext.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/util/DefaultStepContext.java deleted file mode 100644 index 4eb1d8f..0000000 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/util/DefaultStepContext.java +++ /dev/null @@ -1,90 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.jstorm.translation.util; - -import static com.google.common.base.Preconditions.checkNotNull; - -import java.io.IOException; -import org.apache.beam.runners.core.ExecutionContext; -import org.apache.beam.runners.core.StateInternals; -import org.apache.beam.runners.core.TimerInternals; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.util.WindowedValue; -import org.apache.beam.sdk.values.TupleTag; - -/** - * Default StepContext for running DoFn This does not allow accessing state or timer internals. - */ -public class DefaultStepContext implements ExecutionContext.StepContext { - - private TimerInternals timerInternals; - - private StateInternals stateInternals; - - public DefaultStepContext(TimerInternals timerInternals, StateInternals stateInternals) { - this.timerInternals = checkNotNull(timerInternals, "timerInternals"); - this.stateInternals = checkNotNull(stateInternals, "stateInternals"); - } - - @Override - public String getStepName() { - return null; - } - - @Override - public String getTransformName() { - return null; - } - - @Override - public void noteOutput(WindowedValue<?> windowedValue) { - - } - - @Override - public void noteOutput(TupleTag<?> tupleTag, WindowedValue<?> windowedValue) { - - } - - @Override - public <T, W extends BoundedWindow> void writePCollectionViewData( - TupleTag<?> tag, Iterable<WindowedValue<T>> data, - Coder<Iterable<WindowedValue<T>>> dataCoder, W window, Coder<W> windowCoder) - throws IOException { - throw new UnsupportedOperationException("Writing side-input data is not supported."); - } - - @Override - public StateInternals stateInternals() { - return stateInternals; - } - - @Override - public TimerInternals timerInternals() { - return timerInternals; - } - - public void setStateInternals(StateInternals stateInternals) { - this.stateInternals = stateInternals; - } - - public void setTimerInternals(TimerInternals timerInternals) { - this.timerInternals = timerInternals; - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/util/RunnerUtils.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/util/RunnerUtils.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/util/RunnerUtils.java deleted file mode 100644 index ad83c2b..0000000 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/util/RunnerUtils.java +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.jstorm.util; - -import org.apache.beam.runners.core.KeyedWorkItem; -import org.apache.beam.runners.jstorm.translation.runtime.Executor; -import org.apache.beam.runners.jstorm.translation.runtime.GroupByWindowExecutor; -import org.apache.beam.runners.jstorm.translation.runtime.MultiStatefulDoFnExecutor; -import org.apache.beam.runners.jstorm.translation.runtime.StatefulDoFnExecutor; -import org.apache.beam.sdk.util.WindowedValue; -import org.apache.beam.sdk.values.KV; - -/** - * Utils for JStorm runner. - */ -public class RunnerUtils { - /** - * Convert {@link WindowedValue} into {@link KeyedWorkItem}. - * @param elem - * @return - */ - public static <K, V> KeyedWorkItem<K, V> toKeyedWorkItem(WindowedValue<KV<K, V>> elem) { - WindowedValue<KV<K, V>> kvElem = (WindowedValue<KV<K, V>>) elem; - SingletonKeyedWorkItem<K, V> workItem = SingletonKeyedWorkItem.of( - kvElem.getValue().getKey(), - kvElem.withValue(kvElem.getValue().getValue())); - return workItem; - } - - public static boolean isGroupByKeyExecutor(Executor executor) { - if (executor instanceof GroupByWindowExecutor) { - return true; - } else if (executor instanceof StatefulDoFnExecutor - || executor instanceof MultiStatefulDoFnExecutor) { - return true; - } else { - return false; - } - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/util/SerializedPipelineOptions.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/util/SerializedPipelineOptions.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/util/SerializedPipelineOptions.java deleted file mode 100644 index 479afdc..0000000 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/util/SerializedPipelineOptions.java +++ /dev/null @@ -1,65 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.runners.jstorm.util; - -import static com.google.common.base.Preconditions.checkNotNull; - -import com.fasterxml.jackson.databind.ObjectMapper; -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.io.Serializable; -import org.apache.beam.sdk.options.PipelineOptions; - -/** - * Encapsulates the PipelineOptions in serialized form to ship them to the cluster. - */ -public class SerializedPipelineOptions implements Serializable { - - private final byte[] serializedOptions; - - /** - * Lazily initialized copy of deserialized options. - */ - private transient PipelineOptions pipelineOptions; - - public SerializedPipelineOptions(PipelineOptions options) { - checkNotNull(options, "PipelineOptions must not be null."); - - try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) { - new ObjectMapper().writeValue(baos, options); - this.serializedOptions = baos.toByteArray(); - } catch (Exception e) { - throw new RuntimeException("Couldn't serialize PipelineOptions.", e); - } - - } - - public PipelineOptions getPipelineOptions() { - if (pipelineOptions == null) { - try { - pipelineOptions = new ObjectMapper().readValue(serializedOptions, PipelineOptions.class); - } catch (IOException e) { - throw new RuntimeException("Couldn't deserialize the PipelineOptions.", e); - } - } - - return pipelineOptions; - } - -} http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/util/SingletonKeyedWorkItem.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/util/SingletonKeyedWorkItem.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/util/SingletonKeyedWorkItem.java deleted file mode 100644 index 46a12b9..0000000 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/util/SingletonKeyedWorkItem.java +++ /dev/null @@ -1,62 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.jstorm.util; - -import java.util.Collections; -import org.apache.beam.runners.core.KeyedWorkItem; -import org.apache.beam.runners.core.TimerInternals; -import org.apache.beam.sdk.util.WindowedValue; - -/** - * Singleton keyed word item. - * @param <K> - * @param <ElemT> - */ -public class SingletonKeyedWorkItem<K, ElemT> implements KeyedWorkItem<K, ElemT> { - - final K key; - final WindowedValue<ElemT> value; - - private SingletonKeyedWorkItem(K key, WindowedValue<ElemT> value) { - this.key = key; - this.value = value; - } - - public static <K, ElemT> SingletonKeyedWorkItem<K, ElemT> of(K key, WindowedValue<ElemT> value) { - return new SingletonKeyedWorkItem<K, ElemT>(key, value); - } - - @Override - public K key() { - return key; - } - - public WindowedValue<ElemT> value() { - return value; - } - - @Override - public Iterable<TimerInternals.TimerData> timersIterable() { - return Collections.EMPTY_LIST; - } - - @Override - public Iterable<WindowedValue<ElemT>> elementsIterable() { - return Collections.singletonList(value); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/test/java/org/apache/beam/runners/jstorm/translation/JStormStateInternalsTest.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/test/java/org/apache/beam/runners/jstorm/translation/JStormStateInternalsTest.java b/runners/jstorm/src/test/java/org/apache/beam/runners/jstorm/translation/JStormStateInternalsTest.java new file mode 100644 index 0000000..b2ca267 --- /dev/null +++ b/runners/jstorm/src/test/java/org/apache/beam/runners/jstorm/translation/JStormStateInternalsTest.java @@ -0,0 +1,221 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.jstorm.translation; + +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThat; + +import com.alibaba.jstorm.cache.IKvStoreManager; +import com.alibaba.jstorm.cache.rocksdb.RocksDbKvStoreManagerFactory; +import com.alibaba.jstorm.utils.KryoSerializer; +import com.google.common.collect.Maps; +import java.util.Iterator; +import java.util.Map; +import org.apache.beam.runners.core.StateNamespaces; +import org.apache.beam.runners.core.StateTags; +import org.apache.beam.sdk.coders.BigEndianIntegerCoder; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderRegistry; +import org.apache.beam.sdk.state.BagState; +import org.apache.beam.sdk.state.CombiningState; +import org.apache.beam.sdk.state.MapState; +import org.apache.beam.sdk.state.ValueState; +import org.apache.beam.sdk.state.WatermarkHoldState; +import org.apache.beam.sdk.transforms.Combine; +import org.apache.beam.sdk.transforms.Max; +import org.apache.beam.sdk.transforms.windowing.TimestampCombiner; +import org.joda.time.Instant; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * Tests for {@link JStormStateInternals}. + */ +@RunWith(JUnit4.class) +public class JStormStateInternalsTest { + + @Rule + public final TemporaryFolder tmp = new TemporaryFolder(); + + private JStormStateInternals<String> jstormStateInternals; + + @Before + public void setup() throws Exception { + IKvStoreManager kvStoreManager = RocksDbKvStoreManagerFactory.getManager( + Maps.newHashMap(), + "test", + tmp.toString(), + new KryoSerializer(Maps.newHashMap())); + jstormStateInternals = new JStormStateInternals( + "key-1", kvStoreManager, new TimerServiceImpl(), 0); + } + + @Test + public void testValueState() throws Exception { + ValueState<Integer> valueState = jstormStateInternals.state( + StateNamespaces.global(), StateTags.value("state-id-a", BigEndianIntegerCoder.of())); + valueState.write(Integer.MIN_VALUE); + assertEquals(Integer.MIN_VALUE, valueState.read().longValue()); + valueState.write(Integer.MAX_VALUE); + assertEquals(Integer.MAX_VALUE, valueState.read().longValue()); + } + + @Test + public void testValueStateIdenticalId() throws Exception { + ValueState<Integer> valueState = jstormStateInternals.state( + StateNamespaces.global(), StateTags.value("state-id-a", BigEndianIntegerCoder.of())); + ValueState<Integer> valueStateIdentical = jstormStateInternals.state( + StateNamespaces.global(), StateTags.value("state-id-a", BigEndianIntegerCoder.of())); + + valueState.write(Integer.MIN_VALUE); + assertEquals(Integer.MIN_VALUE, valueState.read().longValue()); + assertEquals(Integer.MIN_VALUE, valueStateIdentical.read().longValue()); + valueState.write(Integer.MAX_VALUE); + assertEquals(Integer.MAX_VALUE, valueState.read().longValue()); + assertEquals(Integer.MAX_VALUE, valueStateIdentical.read().longValue()); + } + + @Test + public void testBagState() throws Exception { + BagState<Integer> bagStateA = jstormStateInternals.state( + StateNamespaces.global(), StateTags.bag("state-id-a", BigEndianIntegerCoder.of())); + BagState<Integer> bagStateB = jstormStateInternals.state( + StateNamespaces.global(), StateTags.bag("state-id-b", BigEndianIntegerCoder.of())); + + bagStateA.add(1); + bagStateA.add(0); + bagStateA.add(Integer.MAX_VALUE); + + bagStateB.add(0); + bagStateB.add(Integer.MIN_VALUE); + + Iterable<Integer> bagA = bagStateA.read(); + Iterable<Integer> bagB = bagStateB.read(); + assertThat(bagA, containsInAnyOrder(1, 0, Integer.MAX_VALUE)); + assertThat(bagB, containsInAnyOrder(0, Integer.MIN_VALUE)); + + bagStateA.clear(); + bagStateA.add(1); + bagStateB.add(0); + assertThat(bagStateA.read(), containsInAnyOrder(1)); + assertThat(bagStateB.read(), containsInAnyOrder(0, 0, Integer.MIN_VALUE)); + } + + @Test + public void testCombiningState() throws Exception { + Combine.CombineFn<Integer, int[], Integer> combineFn = Max.ofIntegers(); + Coder<int[]> accumCoder = combineFn.getAccumulatorCoder( + CoderRegistry.createDefault(), BigEndianIntegerCoder.of()); + + CombiningState<Integer, int[], Integer> combiningState = jstormStateInternals.state( + StateNamespaces.global(), + StateTags.combiningValue( + "state-id-a", + accumCoder, + combineFn)); + assertEquals(Integer.MIN_VALUE, combiningState.read().longValue()); + combiningState.add(10); + assertEquals(10, combiningState.read().longValue()); + combiningState.add(1); + assertEquals(10, combiningState.read().longValue()); + combiningState.add(Integer.MAX_VALUE); + assertEquals(Integer.MAX_VALUE, combiningState.read().longValue()); + } + + @Test + public void testWatermarkHoldState() throws Exception { + WatermarkHoldState watermarkHoldState = jstormStateInternals.state( + StateNamespaces.global(), + StateTags.watermarkStateInternal( + "state-id-a", + TimestampCombiner.EARLIEST)); + watermarkHoldState.add(new Instant(1)); + assertEquals(1, watermarkHoldState.read().getMillis()); + watermarkHoldState.add(new Instant(Integer.MIN_VALUE)); + assertEquals(Integer.MIN_VALUE, watermarkHoldState.read().getMillis()); + watermarkHoldState.add(new Instant(Integer.MAX_VALUE)); + assertEquals(Integer.MIN_VALUE, watermarkHoldState.read().getMillis()); + } + + @Test + public void testMapState() throws Exception { + MapState<Integer, Integer> mapStateA = jstormStateInternals.state( + StateNamespaces.global(), + StateTags.map("state-id-a", BigEndianIntegerCoder.of(), BigEndianIntegerCoder.of())); + mapStateA.put(1, 1); + mapStateA.put(2, 22); + mapStateA.put(1, 12); + + Iterable<Integer> keys = mapStateA.keys().read(); + Iterable<Integer> values = mapStateA.values().read(); + assertThat(keys, containsInAnyOrder(1, 2)); + assertThat(values, containsInAnyOrder(12, 22)); + + Iterable<Map.Entry<Integer, Integer>> entries = mapStateA.entries().read(); + Iterator<Map.Entry<Integer, Integer>> itr = entries.iterator(); + Map.Entry<Integer, Integer> entry = itr.next(); + assertEquals((long) entry.getKey(), 1L); + assertEquals((long) entry.getValue(), 12L); + entry = itr.next(); + assertEquals((long) entry.getKey(), 2L); + assertEquals((long) entry.getValue(), 22L); + assertEquals(false, itr.hasNext()); + + mapStateA.remove(1); + keys = mapStateA.keys().read(); + values = mapStateA.values().read(); + assertThat(keys, containsInAnyOrder(2)); + assertThat(values, containsInAnyOrder(22)); + + entries = mapStateA.entries().read(); + itr = entries.iterator(); + entry = itr.next(); + assertEquals((long) entry.getKey(), 2L); + assertEquals((long) entry.getValue(), 22L); + assertEquals(false, itr.hasNext()); + } + + @Test + public void testMassiveDataOfBagState() { + BagState<Integer> bagStateA = jstormStateInternals.state( + StateNamespaces.global(), StateTags.bag("state-id-a", BigEndianIntegerCoder.of())); + + int count = 10000; + int n = 1; + while (n <= count) { + bagStateA.add(n); + n++; + } + + int readCount = 0; + int readN = 0; + Iterator<Integer> itr = bagStateA.read().iterator(); + while (itr.hasNext()) { + readN += itr.next(); + readCount++; + } + + assertEquals((long) readN, ((1 + count) * count) / 2); + assertEquals((long) readCount, count); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/test/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormStateInternalsTest.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/test/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormStateInternalsTest.java b/runners/jstorm/src/test/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormStateInternalsTest.java deleted file mode 100644 index 66f33a7..0000000 --- a/runners/jstorm/src/test/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormStateInternalsTest.java +++ /dev/null @@ -1,222 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.jstorm.translation.runtime.state; - -import static org.hamcrest.Matchers.containsInAnyOrder; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertThat; - -import com.alibaba.jstorm.cache.IKvStoreManager; -import com.alibaba.jstorm.cache.rocksdb.RocksDbKvStoreManagerFactory; -import com.alibaba.jstorm.utils.KryoSerializer; -import com.google.common.collect.Maps; -import java.util.Iterator; -import java.util.Map; -import org.apache.beam.runners.core.StateNamespaces; -import org.apache.beam.runners.core.StateTags; -import org.apache.beam.runners.jstorm.translation.runtime.TimerServiceImpl; -import org.apache.beam.sdk.coders.BigEndianIntegerCoder; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.CoderRegistry; -import org.apache.beam.sdk.state.BagState; -import org.apache.beam.sdk.state.CombiningState; -import org.apache.beam.sdk.state.MapState; -import org.apache.beam.sdk.state.ValueState; -import org.apache.beam.sdk.state.WatermarkHoldState; -import org.apache.beam.sdk.transforms.Combine; -import org.apache.beam.sdk.transforms.Max; -import org.apache.beam.sdk.transforms.windowing.TimestampCombiner; -import org.joda.time.Instant; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TemporaryFolder; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -/** - * Tests for {@link JStormStateInternals}. - */ -@RunWith(JUnit4.class) -public class JStormStateInternalsTest { - - @Rule - public final TemporaryFolder tmp = new TemporaryFolder(); - - private JStormStateInternals<String> jstormStateInternals; - - @Before - public void setup() throws Exception { - IKvStoreManager kvStoreManager = RocksDbKvStoreManagerFactory.getManager( - Maps.newHashMap(), - "test", - tmp.toString(), - new KryoSerializer(Maps.newHashMap())); - jstormStateInternals = new JStormStateInternals( - "key-1", kvStoreManager, new TimerServiceImpl(), 0); - } - - @Test - public void testValueState() throws Exception { - ValueState<Integer> valueState = jstormStateInternals.state( - StateNamespaces.global(), StateTags.value("state-id-a", BigEndianIntegerCoder.of())); - valueState.write(Integer.MIN_VALUE); - assertEquals(Integer.MIN_VALUE, valueState.read().longValue()); - valueState.write(Integer.MAX_VALUE); - assertEquals(Integer.MAX_VALUE, valueState.read().longValue()); - } - - @Test - public void testValueStateIdenticalId() throws Exception { - ValueState<Integer> valueState = jstormStateInternals.state( - StateNamespaces.global(), StateTags.value("state-id-a", BigEndianIntegerCoder.of())); - ValueState<Integer> valueStateIdentical = jstormStateInternals.state( - StateNamespaces.global(), StateTags.value("state-id-a", BigEndianIntegerCoder.of())); - - valueState.write(Integer.MIN_VALUE); - assertEquals(Integer.MIN_VALUE, valueState.read().longValue()); - assertEquals(Integer.MIN_VALUE, valueStateIdentical.read().longValue()); - valueState.write(Integer.MAX_VALUE); - assertEquals(Integer.MAX_VALUE, valueState.read().longValue()); - assertEquals(Integer.MAX_VALUE, valueStateIdentical.read().longValue()); - } - - @Test - public void testBagState() throws Exception { - BagState<Integer> bagStateA = jstormStateInternals.state( - StateNamespaces.global(), StateTags.bag("state-id-a", BigEndianIntegerCoder.of())); - BagState<Integer> bagStateB = jstormStateInternals.state( - StateNamespaces.global(), StateTags.bag("state-id-b", BigEndianIntegerCoder.of())); - - bagStateA.add(1); - bagStateA.add(0); - bagStateA.add(Integer.MAX_VALUE); - - bagStateB.add(0); - bagStateB.add(Integer.MIN_VALUE); - - Iterable<Integer> bagA = bagStateA.read(); - Iterable<Integer> bagB = bagStateB.read(); - assertThat(bagA, containsInAnyOrder(1, 0, Integer.MAX_VALUE)); - assertThat(bagB, containsInAnyOrder(0, Integer.MIN_VALUE)); - - bagStateA.clear(); - bagStateA.add(1); - bagStateB.add(0); - assertThat(bagStateA.read(), containsInAnyOrder(1)); - assertThat(bagStateB.read(), containsInAnyOrder(0, 0, Integer.MIN_VALUE)); - } - - @Test - public void testCombiningState() throws Exception { - Combine.CombineFn<Integer, int[], Integer> combineFn = Max.ofIntegers(); - Coder<int[]> accumCoder = combineFn.getAccumulatorCoder( - CoderRegistry.createDefault(), BigEndianIntegerCoder.of()); - - CombiningState<Integer, int[], Integer> combiningState = jstormStateInternals.state( - StateNamespaces.global(), - StateTags.combiningValue( - "state-id-a", - accumCoder, - combineFn)); - assertEquals(Integer.MIN_VALUE, combiningState.read().longValue()); - combiningState.add(10); - assertEquals(10, combiningState.read().longValue()); - combiningState.add(1); - assertEquals(10, combiningState.read().longValue()); - combiningState.add(Integer.MAX_VALUE); - assertEquals(Integer.MAX_VALUE, combiningState.read().longValue()); - } - - @Test - public void testWatermarkHoldState() throws Exception { - WatermarkHoldState watermarkHoldState = jstormStateInternals.state( - StateNamespaces.global(), - StateTags.watermarkStateInternal( - "state-id-a", - TimestampCombiner.EARLIEST)); - watermarkHoldState.add(new Instant(1)); - assertEquals(1, watermarkHoldState.read().getMillis()); - watermarkHoldState.add(new Instant(Integer.MIN_VALUE)); - assertEquals(Integer.MIN_VALUE, watermarkHoldState.read().getMillis()); - watermarkHoldState.add(new Instant(Integer.MAX_VALUE)); - assertEquals(Integer.MIN_VALUE, watermarkHoldState.read().getMillis()); - } - - @Test - public void testMapState() throws Exception { - MapState<Integer, Integer> mapStateA = jstormStateInternals.state( - StateNamespaces.global(), - StateTags.map("state-id-a", BigEndianIntegerCoder.of(), BigEndianIntegerCoder.of())); - mapStateA.put(1, 1); - mapStateA.put(2, 22); - mapStateA.put(1, 12); - - Iterable<Integer> keys = mapStateA.keys().read(); - Iterable<Integer> values = mapStateA.values().read(); - assertThat(keys, containsInAnyOrder(1, 2)); - assertThat(values, containsInAnyOrder(12, 22)); - - Iterable<Map.Entry<Integer, Integer>> entries = mapStateA.entries().read(); - Iterator<Map.Entry<Integer, Integer>> itr = entries.iterator(); - Map.Entry<Integer, Integer> entry = itr.next(); - assertEquals((long) entry.getKey(), 1L); - assertEquals((long) entry.getValue(), 12L); - entry = itr.next(); - assertEquals((long) entry.getKey(), 2L); - assertEquals((long) entry.getValue(), 22L); - assertEquals(false, itr.hasNext()); - - mapStateA.remove(1); - keys = mapStateA.keys().read(); - values = mapStateA.values().read(); - assertThat(keys, containsInAnyOrder(2)); - assertThat(values, containsInAnyOrder(22)); - - entries = mapStateA.entries().read(); - itr = entries.iterator(); - entry = itr.next(); - assertEquals((long) entry.getKey(), 2L); - assertEquals((long) entry.getValue(), 22L); - assertEquals(false, itr.hasNext()); - } - - @Test - public void testMassiveDataOfBagState() { - BagState<Integer> bagStateA = jstormStateInternals.state( - StateNamespaces.global(), StateTags.bag("state-id-a", BigEndianIntegerCoder.of())); - - int count = 10000; - int n = 1; - while (n <= count) { - bagStateA.add(n); - n++; - } - - int readCount = 0; - int readN = 0; - Iterator<Integer> itr = bagStateA.read().iterator(); - while (itr.hasNext()) { - readN += itr.next(); - readCount++; - } - - assertEquals((long) readN, ((1 + count) * count) / 2); - assertEquals((long) readCount, count); - } -}
