http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/translator/ParDoBoundMultiTranslator.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/translator/ParDoBoundMultiTranslator.java b/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/translator/ParDoBoundMultiTranslator.java deleted file mode 100644 index 1870681..0000000 --- a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/translator/ParDoBoundMultiTranslator.java +++ /dev/null @@ -1,114 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.jstorm.beam.translation.translator; - -import avro.shaded.com.google.common.collect.Lists; -import avro.shaded.com.google.common.collect.Maps; -import com.alibaba.jstorm.beam.translation.TranslationContext; -import com.alibaba.jstorm.beam.translation.runtime.DoFnExecutor; -import com.alibaba.jstorm.beam.translation.runtime.MultiOutputDoFnExecutor; -import com.alibaba.jstorm.beam.translation.runtime.MultiStatefulDoFnExecutor; -import com.google.common.collect.ImmutableList; -import com.google.common.base.Function; -import com.google.common.collect.FluentIterable; -import com.google.common.collect.ImmutableMap; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.transforms.reflect.DoFnSignature; -import org.apache.beam.sdk.transforms.reflect.DoFnSignatures; -import org.apache.beam.sdk.util.WindowedValue; -import org.apache.beam.sdk.values.*; - -import java.util.Iterator; -import java.util.List; -import java.util.Map; - -/** - * Translates a ParDo.BoundMulti to a Storm {@link com.alibaba.jstorm.beam.translation.runtime.DoFnExecutor}. - */ -public class ParDoBoundMultiTranslator<InputT, OutputT> - extends TransformTranslator.Default<ParDo.MultiOutput<InputT, OutputT>> { - - @Override - public void translateNode(ParDo.MultiOutput<InputT, OutputT> transform, TranslationContext context) { - final TranslationContext.UserGraphContext userGraphContext = context.getUserGraphContext(); - final TupleTag<InputT> inputTag = (TupleTag<InputT>) userGraphContext.getInputTag(); - PCollection<InputT> input = (PCollection<InputT>) userGraphContext.getInput(); - - Map<TupleTag<?>, PValue> allOutputs = Maps.newHashMap(userGraphContext.getOutputs()); - Map<TupleTag<?>, TupleTag<?>> localToExternalTupleTagMap = Maps.newHashMap(); - for (Map.Entry<TupleTag<?>, PValue> entry : allOutputs.entrySet()) { - Iterator<TupleTag<?>> itr = ((PValueBase) entry.getValue()).expand().keySet().iterator(); - localToExternalTupleTagMap.put(entry.getKey(), itr.next()); - } - - TupleTag<OutputT> mainOutputTag = (TupleTag<OutputT>) userGraphContext.getOutputTag(); - List<TupleTag<?>> sideOutputTags = userGraphContext.getOutputTags(); - sideOutputTags.remove(mainOutputTag); - - Map<TupleTag<?>, PValue> allInputs = Maps.newHashMap(userGraphContext.getInputs()); - for (PCollectionView pCollectionView : transform.getSideInputs()) { - allInputs.put(userGraphContext.findTupleTag(pCollectionView), pCollectionView); - } - String description = describeTransform( - transform, - allInputs, - allOutputs); - - ImmutableMap.Builder<TupleTag, PCollectionView<?>> sideInputTagToView = ImmutableMap.builder(); - for (PCollectionView pCollectionView : transform.getSideInputs()) { - sideInputTagToView.put(userGraphContext.findTupleTag(pCollectionView), pCollectionView); - } - - DoFnExecutor executor; - DoFnSignature signature = DoFnSignatures.getSignature(transform.getFn().getClass()); - if (signature.stateDeclarations().size() > 0 - || signature.timerDeclarations().size() > 0) { - executor = new MultiStatefulDoFnExecutor<>( - userGraphContext.getStepName(), - description, - userGraphContext.getOptions(), - (DoFn<KV, OutputT>) transform.getFn(), - (Coder) WindowedValue.getFullCoder(input.getCoder(), input.getWindowingStrategy().getWindowFn().windowCoder()), - input.getWindowingStrategy(), - (TupleTag<KV>) inputTag, - transform.getSideInputs(), - sideInputTagToView.build(), - mainOutputTag, - sideOutputTags, - localToExternalTupleTagMap); - } else { - executor = new MultiOutputDoFnExecutor<>( - userGraphContext.getStepName(), - description, - userGraphContext.getOptions(), - transform.getFn(), - WindowedValue.getFullCoder(input.getCoder(), input.getWindowingStrategy().getWindowFn().windowCoder()), - input.getWindowingStrategy(), - inputTag, - transform.getSideInputs(), - sideInputTagToView.build(), - mainOutputTag, - sideOutputTags, - localToExternalTupleTagMap); - } - - context.addTransformExecutor(executor, ImmutableList.<PValue>copyOf(transform.getSideInputs())); - } -}
http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/translator/ParDoBoundTranslator.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/translator/ParDoBoundTranslator.java b/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/translator/ParDoBoundTranslator.java deleted file mode 100644 index a8d8186..0000000 --- a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/translator/ParDoBoundTranslator.java +++ /dev/null @@ -1,106 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.jstorm.beam.translation.translator; - -import java.util.List; -import java.util.Map; - -import avro.shaded.com.google.common.collect.Lists; -import com.alibaba.jstorm.beam.translation.runtime.StatefulDoFnExecutor; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.transforms.reflect.DoFnSignature; -import org.apache.beam.sdk.transforms.reflect.DoFnSignatures; -import org.apache.beam.sdk.util.WindowedValue; -import org.apache.beam.sdk.values.*; - -import com.alibaba.jstorm.beam.translation.TranslationContext; -import com.alibaba.jstorm.beam.translation.runtime.DoFnExecutor; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Translates a ParDo.Bound to a Storm {@link DoFnExecutor}. - */ -public class ParDoBoundTranslator<InputT, OutputT> - extends TransformTranslator.Default<ParDo.SingleOutput<InputT, OutputT>> { - - private static final Logger LOG = LoggerFactory.getLogger(ParDoBoundTranslator.class); - - @Override - public void translateNode(ParDo.SingleOutput<InputT, OutputT> transform, TranslationContext context) { - final TranslationContext.UserGraphContext userGraphContext = context.getUserGraphContext(); - final TupleTag<?> inputTag = userGraphContext.getInputTag(); - PCollection<InputT> input = (PCollection<InputT>) userGraphContext.getInput(); - - TupleTag<OutputT> mainOutputTag = (TupleTag<OutputT>) userGraphContext.getOutputTag(); - List<TupleTag<?>> sideOutputTags = Lists.newArrayList(); - - Map<TupleTag<?>, PValue> allInputs = avro.shaded.com.google.common.collect.Maps.newHashMap(userGraphContext.getInputs()); - for (PCollectionView pCollectionView : transform.getSideInputs()) { - allInputs.put(userGraphContext.findTupleTag(pCollectionView), pCollectionView); - } - String description = describeTransform( - transform, - allInputs, - userGraphContext.getOutputs()); - - ImmutableMap.Builder<TupleTag, PCollectionView<?>> sideInputTagToView = ImmutableMap.builder(); - for (PCollectionView pCollectionView : transform.getSideInputs()) { - sideInputTagToView.put(userGraphContext.findTupleTag(pCollectionView), pCollectionView); - } - - DoFnExecutor executor; - DoFnSignature signature = DoFnSignatures.getSignature(transform.getFn().getClass()); - if (signature.stateDeclarations().size() > 0 - || signature.timerDeclarations().size() > 0) { - executor = new StatefulDoFnExecutor<>( - userGraphContext.getStepName(), - description, - userGraphContext.getOptions(), - (DoFn<KV, OutputT>) transform.getFn(), - (Coder) WindowedValue.getFullCoder( - input.getCoder(), input.getWindowingStrategy().getWindowFn().windowCoder()), - input.getWindowingStrategy(), - (TupleTag<KV>) inputTag, - transform.getSideInputs(), - sideInputTagToView.build(), - mainOutputTag, - sideOutputTags); - } else { - executor = new DoFnExecutor<>( - userGraphContext.getStepName(), - description, - userGraphContext.getOptions(), - transform.getFn(), - WindowedValue.getFullCoder(input.getCoder(), input.getWindowingStrategy().getWindowFn().windowCoder()), - input.getWindowingStrategy(), - (TupleTag<InputT>) inputTag, - transform.getSideInputs(), - sideInputTagToView.build(), - mainOutputTag, - sideOutputTags); - } - - context.addTransformExecutor(executor, ImmutableList.<PValue>copyOf(transform.getSideInputs())); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/translator/ReshuffleTranslator.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/translator/ReshuffleTranslator.java b/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/translator/ReshuffleTranslator.java deleted file mode 100644 index 26a9b22..0000000 --- a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/translator/ReshuffleTranslator.java +++ /dev/null @@ -1,24 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.jstorm.beam.translation.translator; - -import org.apache.beam.sdk.transforms.Reshuffle; - -public class ReshuffleTranslator<K, V> extends TransformTranslator.Default<Reshuffle<K,V>> { - -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/translator/Stream.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/translator/Stream.java b/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/translator/Stream.java deleted file mode 100644 index f80a39d..0000000 --- a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/translator/Stream.java +++ /dev/null @@ -1,91 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.jstorm.beam.translation.translator; - -import com.google.auto.value.AutoValue; - -import javax.annotation.Nullable; -import java.util.List; - -import static com.google.common.base.Preconditions.checkArgument; -import static com.google.common.base.Preconditions.checkNotNull; - -/** - * Class that defines the stream connection between upstream and downstream components. - */ -@AutoValue -public abstract class Stream { - - public abstract Producer getProducer(); - public abstract Consumer getConsumer(); - - public static Stream of(Producer producer, Consumer consumer) { - return new com.alibaba.jstorm.beam.translation.translator.AutoValue_Stream(producer, consumer); - } - - @AutoValue - public abstract static class Producer { - public abstract String getComponentId(); - public abstract String getStreamId(); - public abstract String getStreamName(); - - public static Producer of(String componentId, String streamId, String streamName) { - return new com.alibaba.jstorm.beam.translation.translator.AutoValue_Stream_Producer( - componentId, streamId, streamName); - } - } - - @AutoValue - public abstract static class Consumer { - public abstract String getComponentId(); - public abstract Grouping getGrouping(); - - public static Consumer of(String componentId, Grouping grouping) { - return new com.alibaba.jstorm.beam.translation.translator.AutoValue_Stream_Consumer( - componentId, grouping); - } - } - - @AutoValue - public abstract static class Grouping { - public abstract Type getType(); - - @Nullable - public abstract List<String> getFields(); - - public static Grouping of(Type type) { - checkArgument(!Type.FIELDS.equals(type), "Fields grouping should provide key fields."); - return new com.alibaba.jstorm.beam.translation.translator.AutoValue_Stream_Grouping( - type, null /* fields */); - } - - public static Grouping byFields(List<String> fields) { - checkNotNull(fields, "fields"); - checkArgument(!fields.isEmpty(), "No key fields were provided for field grouping!"); - return new com.alibaba.jstorm.beam.translation.translator.AutoValue_Stream_Grouping( - Type.FIELDS, fields); - } - - /** - * Types of stream groupings Storm allows - */ - public enum Type { - ALL, CUSTOM, DIRECT, SHUFFLE, LOCAL_OR_SHUFFLE, FIELDS, GLOBAL, NONE - } - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/translator/TransformTranslator.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/translator/TransformTranslator.java b/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/translator/TransformTranslator.java deleted file mode 100644 index e1c35f6..0000000 --- a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/translator/TransformTranslator.java +++ /dev/null @@ -1,77 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.jstorm.beam.translation.translator; - -import com.google.common.base.Function; -import com.google.common.base.Joiner; -import com.google.common.collect.FluentIterable; -import org.apache.beam.sdk.transforms.PTransform; - -import com.alibaba.jstorm.beam.translation.TranslationContext; -import org.apache.beam.sdk.values.PValue; -import org.apache.beam.sdk.values.TaggedPValue; -import org.apache.beam.sdk.values.TupleTag; - -import java.util.Map; - -/** - * Interface for classes capable of tranforming Beam PTransforms into Storm primitives. - */ -public interface TransformTranslator<T extends PTransform<?, ?>> { - - void translateNode(T transform, TranslationContext context); - - /** - * Returns true if this translator can translate the given transform. - */ - boolean canTranslate(T transform, TranslationContext context); - - class Default<T1 extends PTransform<?, ?>> implements TransformTranslator<T1> { - @Override - public void translateNode(T1 transform, TranslationContext context) { - - } - - @Override - public boolean canTranslate(T1 transform, TranslationContext context) { - return true; - } - - static String describeTransform( - PTransform<?, ?> transform, - Map<TupleTag<?>, PValue> inputs, - Map<TupleTag<?>, PValue> outputs) { - return String.format("%s --> %s --> %s", - Joiner.on('+').join(FluentIterable.from(inputs.entrySet()) - .transform(new Function<Map.Entry<TupleTag<?>, PValue>, String>() { - @Override - public String apply(Map.Entry<TupleTag<?>, PValue> taggedPValue) { - return taggedPValue.getKey().getId(); - // return taggedPValue.getValue().getName(); - }})), - transform.getName(), - Joiner.on('+').join(FluentIterable.from(outputs.entrySet()) - .transform(new Function<Map.Entry<TupleTag<?>, PValue>, String>() { - @Override - public String apply(Map.Entry<TupleTag<?>, PValue> taggedPvalue) { - return taggedPvalue.getKey().getId(); - //return taggedPValue.getValue().getName(); - }}))); - } - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/translator/UnboundedSourceTranslator.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/translator/UnboundedSourceTranslator.java b/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/translator/UnboundedSourceTranslator.java deleted file mode 100644 index 0677e92..0000000 --- a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/translator/UnboundedSourceTranslator.java +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.jstorm.beam.translation.translator; - -import org.apache.beam.sdk.io.Read; -import org.apache.beam.sdk.values.PValue; -import org.apache.beam.sdk.values.TaggedPValue; -import org.apache.beam.sdk.values.TupleTag; - -import com.alibaba.jstorm.beam.translation.TranslationContext; -import com.alibaba.jstorm.beam.translation.runtime.UnboundedSourceSpout; - -/** - * Translates a Read.Unbounded into a Storm spout. - * - * @param <T> - */ -public class UnboundedSourceTranslator<T> extends TransformTranslator.Default<Read.Unbounded<T>> { - public void translateNode(Read.Unbounded<T> transform, TranslationContext context) { - TranslationContext.UserGraphContext userGraphContext = context.getUserGraphContext(); - String description = describeTransform(transform, userGraphContext.getInputs(), userGraphContext.getOutputs()); - - TupleTag<?> tag = userGraphContext.getOutputTag(); - PValue output = userGraphContext.getOutput(); - - UnboundedSourceSpout spout = new UnboundedSourceSpout( - description, - transform.getSource(), userGraphContext.getOptions(), tag); - context.getExecutionGraphContext().registerSpout(spout, TaggedPValue.of(tag, output)); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/translator/ViewTranslator.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/translator/ViewTranslator.java b/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/translator/ViewTranslator.java deleted file mode 100644 index 3069955..0000000 --- a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/translator/ViewTranslator.java +++ /dev/null @@ -1,374 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.jstorm.beam.translation.translator; - -import com.alibaba.jstorm.beam.translation.TranslationContext; -import com.alibaba.jstorm.beam.translation.runtime.ViewExecutor; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.CoderRegistry; -import org.apache.beam.sdk.coders.KvCoder; -import org.apache.beam.sdk.coders.ListCoder; -import org.apache.beam.sdk.transforms.Combine; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.transforms.View; -import org.apache.beam.sdk.values.KV; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PCollectionView; -import org.apache.beam.sdk.values.PCollectionViews; - -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.Map; - -/** - * A {@link TransformTranslator} for executing {@link View Views} in JStorm runner. - */ -public class ViewTranslator extends TransformTranslator.Default<ViewTranslator.CreateJStormPCollectionView<?, ?>> { - @Override - public void translateNode(CreateJStormPCollectionView<?, ?> transform, TranslationContext context) { - TranslationContext.UserGraphContext userGraphContext = context.getUserGraphContext(); - String description = describeTransform(transform, userGraphContext.getInputs(), userGraphContext.getOutputs()); - ViewExecutor viewExecutor = new ViewExecutor(description, userGraphContext.getOutputTag()); - context.addTransformExecutor(viewExecutor); - } - - /** - * Specialized implementation for - * {@link org.apache.beam.sdk.transforms.View.AsMap View.AsMap} - * for the Flink runner in streaming mode. - */ - public static class ViewAsMap<K, V> - extends PTransform<PCollection<KV<K, V>>, PCollectionView<Map<K, V>>> { - - @SuppressWarnings("unused") // used via reflection in JstormRunner#apply() - public ViewAsMap(View.AsMap<K, V> transform) { - } - - @Override - public PCollectionView<Map<K, V>> expand(PCollection<KV<K, V>> input) { - PCollectionView<Map<K, V>> view = - PCollectionViews.mapView( - input, - input.getWindowingStrategy(), - input.getCoder()); - - @SuppressWarnings({"rawtypes", "unchecked"}) - KvCoder<K, V> inputCoder = (KvCoder) input.getCoder(); - try { - inputCoder.getKeyCoder().verifyDeterministic(); - } catch (Coder.NonDeterministicException e) { - // TODO: log warning as other runners. - } - - return input - .apply(Combine.globally(new Concatenate<KV<K, V>>()).withoutDefaults()) - .apply(CreateJStormPCollectionView.<KV<K, V>, Map<K, V>>of(view)); - } - - @Override - protected String getKindString() { - return "StreamingViewAsMap"; - } - } - - /** - * Specialized expansion for {@link - * View.AsMultimap View.AsMultimap} for the - * Flink runner in streaming mode. - */ - public static class ViewAsMultimap<K, V> - extends PTransform<PCollection<KV<K, V>>, PCollectionView<Map<K, Iterable<V>>>> { - - /** - * Builds an instance of this class from the overridden transform. - */ - @SuppressWarnings("unused") // used via reflection in JStormRunner#apply() - public ViewAsMultimap(View.AsMultimap<K, V> transform) { - } - - @Override - public PCollectionView<Map<K, Iterable<V>>> expand(PCollection<KV<K, V>> input) { - PCollectionView<Map<K, Iterable<V>>> view = - PCollectionViews.multimapView( - input, - input.getWindowingStrategy(), - input.getCoder()); - - @SuppressWarnings({"rawtypes", "unchecked"}) - KvCoder<K, V> inputCoder = (KvCoder) input.getCoder(); - try { - inputCoder.getKeyCoder().verifyDeterministic(); - } catch (Coder.NonDeterministicException e) { - // TODO: log warning as other runners. - } - - return input - .apply(Combine.globally(new Concatenate<KV<K, V>>()).withoutDefaults()) - .apply(CreateJStormPCollectionView.<KV<K, V>, Map<K, Iterable<V>>>of(view)); - } - - @Override - protected String getKindString() { - return "StreamingViewAsMultimap"; - } - } - - /** - * Specialized implementation for - * {@link View.AsList View.AsList} for the - * JStorm runner in streaming mode. - */ - public static class ViewAsList<T> - extends PTransform<PCollection<T>, PCollectionView<List<T>>> { - /** - * Builds an instance of this class from the overridden transform. - */ - @SuppressWarnings("unused") // used via reflection in JStormRunner#apply() - public ViewAsList(View.AsList<T> transform) {} - - @Override - public PCollectionView<List<T>> expand(PCollection<T> input) { - PCollectionView<List<T>> view = - PCollectionViews.listView( - input, - input.getWindowingStrategy(), - input.getCoder()); - - return input.apply(Combine.globally(new Concatenate<T>()).withoutDefaults()) - .apply(CreateJStormPCollectionView.<T, List<T>>of(view)); - } - - @Override - protected String getKindString() { - return "StreamingViewAsList"; - } - } - - /** - * Specialized implementation for - * {@link View.AsIterable View.AsIterable} for the - * JStorm runner in streaming mode. - */ - public static class ViewAsIterable<T> - extends PTransform<PCollection<T>, PCollectionView<Iterable<T>>> { - /** - * Builds an instance of this class from the overridden transform. - */ - @SuppressWarnings("unused") // used via reflection in JStormRunner#apply() - public ViewAsIterable(View.AsIterable<T> transform) { } - - @Override - public PCollectionView<Iterable<T>> expand(PCollection<T> input) { - PCollectionView<Iterable<T>> view = - PCollectionViews.iterableView( - input, - input.getWindowingStrategy(), - input.getCoder()); - - return input.apply(Combine.globally(new Concatenate<T>()).withoutDefaults()) - .apply(CreateJStormPCollectionView.<T, Iterable<T>>of(view)); - } - - @Override - protected String getKindString() { - return "StreamingViewAsIterable"; - } - } - - /** - * Specialized expansion for - * {@link View.AsSingleton View.AsSingleton} for the - * JStorm runner in streaming mode. - */ - public static class ViewAsSingleton<T> - extends PTransform<PCollection<T>, PCollectionView<T>> { - private View.AsSingleton<T> transform; - - /** - * Builds an instance of this class from the overridden transform. - */ - @SuppressWarnings("unused") // used via reflection in JStormRunner#apply() - public ViewAsSingleton(View.AsSingleton<T> transform) { - this.transform = transform; - } - - @Override - public PCollectionView<T> expand(PCollection<T> input) { - Combine.Globally<T, T> combine = Combine.globally( - new SingletonCombine<>(transform.hasDefaultValue(), transform.defaultValue())); - if (!transform.hasDefaultValue()) { - combine = combine.withoutDefaults(); - } - return input.apply(combine.asSingletonView()); - } - - @Override - protected String getKindString() { - return "StreamingViewAsSingleton"; - } - - private static class SingletonCombine<T> extends Combine.BinaryCombineFn<T> { - private boolean hasDefaultValue; - private T defaultValue; - - SingletonCombine(boolean hasDefaultValue, T defaultValue) { - this.hasDefaultValue = hasDefaultValue; - this.defaultValue = defaultValue; - } - - @Override - public T apply(T left, T right) { - throw new IllegalArgumentException("PCollection with more than one element " - + "accessed as a singleton view. Consider using Combine.globally().asSingleton() to " - + "combine the PCollection into a single value"); - } - - @Override - public T identity() { - if (hasDefaultValue) { - return defaultValue; - } else { - throw new IllegalArgumentException( - "Empty PCollection accessed as a singleton view. " - + "Consider setting withDefault to provide a default value"); - } - } - } - } - - public static class CombineGloballyAsSingletonView<InputT, OutputT> - extends PTransform<PCollection<InputT>, PCollectionView<OutputT>> { - Combine.GloballyAsSingletonView<InputT, OutputT> transform; - - /** - * Builds an instance of this class from the overridden transform. - */ - @SuppressWarnings("unused") // used via reflection in FlinkRunner#apply() - public CombineGloballyAsSingletonView( - Combine.GloballyAsSingletonView<InputT, OutputT> transform) { - this.transform = transform; - } - - @Override - public PCollectionView<OutputT> expand(PCollection<InputT> input) { - PCollection<OutputT> combined = - input.apply(Combine.globally(transform.getCombineFn()) - .withoutDefaults() - .withFanout(transform.getFanout())); - - PCollectionView<OutputT> view = PCollectionViews.singletonView( - combined, - combined.getWindowingStrategy(), - transform.getInsertDefault(), - transform.getInsertDefault() - ? transform.getCombineFn().defaultValue() : null, - combined.getCoder()); - return combined - .apply(ParDo.of(new WrapAsList<OutputT>())) - .apply(CreateJStormPCollectionView.<OutputT, OutputT>of(view)); - } - - @Override - protected String getKindString() { - return "StreamingCombineGloballyAsSingletonView"; - } - } - - private static class WrapAsList<T> extends DoFn<T, List<T>> { - @ProcessElement - public void processElement(ProcessContext c) { - c.output(Collections.singletonList(c.element())); - } - } - - /** - * Combiner that combines {@code T}s into a single {@code List<T>} containing all inputs. - * They require the input {@link PCollection} fits in memory. - * For a large {@link PCollection} this is expected to crash! - * - * @param <T> the type of elements to concatenate. - */ - private static class Concatenate<T> extends Combine.CombineFn<T, List<T>, List<T>> { - private static final long serialVersionUID = 1L; - - @Override - public List<T> createAccumulator() { - return new ArrayList<>(); - } - - @Override - public List<T> addInput(List<T> accumulator, T input) { - accumulator.add(input); - return accumulator; - } - - @Override - public List<T> mergeAccumulators(Iterable<List<T>> accumulators) { - List<T> result = createAccumulator(); - for (List<T> accumulator : accumulators) { - result.addAll(accumulator); - } - return result; - } - - @Override - public List<T> extractOutput(List<T> accumulator) { - return accumulator; - } - - @Override - public Coder<List<T>> getAccumulatorCoder(CoderRegistry registry, Coder<T> inputCoder) { - return ListCoder.of(inputCoder); - } - - @Override - public Coder<List<T>> getDefaultOutputCoder(CoderRegistry registry, Coder<T> inputCoder) { - return ListCoder.of(inputCoder); - } - } - - /** - * Creates a primitive {@link PCollectionView}. - * - * <p>For internal use only by runner implementors. - * - * @param <ElemT> The type of the elements of the input PCollection - * @param <ViewT> The type associated with the {@link PCollectionView} used as a side input - */ - public static class CreateJStormPCollectionView<ElemT, ViewT> - extends PTransform<PCollection<List<ElemT>>, PCollectionView<ViewT>> { - private PCollectionView<ViewT> view; - - private CreateJStormPCollectionView(PCollectionView<ViewT> view) { - this.view = view; - } - - public static <ElemT, ViewT> CreateJStormPCollectionView<ElemT, ViewT> of( - PCollectionView<ViewT> view) { - return new CreateJStormPCollectionView<>(view); - } - - @Override - public PCollectionView<ViewT> expand(PCollection<List<ElemT>> input) { - return view; - } - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/translator/WindowAssignTranslator.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/translator/WindowAssignTranslator.java b/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/translator/WindowAssignTranslator.java deleted file mode 100644 index 7fe8ddd..0000000 --- a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/translator/WindowAssignTranslator.java +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.jstorm.beam.translation.translator; - -import com.alibaba.jstorm.beam.translation.runtime.WindowAssignExecutor; -import org.apache.beam.sdk.transforms.windowing.Window; - -import com.alibaba.jstorm.beam.translation.TranslationContext; - -public class WindowAssignTranslator<T> extends TransformTranslator.Default<Window.Assign<T>> { - - @Override - public void translateNode(Window.Assign<T> transform, TranslationContext context) { - TranslationContext.UserGraphContext userGraphContext = context.getUserGraphContext(); - String description = describeTransform(transform, userGraphContext.getInputs(), userGraphContext.getOutputs()); - context.getUserGraphContext().setWindowed(); - WindowAssignExecutor executor = new WindowAssignExecutor( - description, - transform.getWindowFn(), - userGraphContext.getOutputTag()); - context.addTransformExecutor(executor); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/translator/WindowBoundTranslator.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/translator/WindowBoundTranslator.java b/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/translator/WindowBoundTranslator.java deleted file mode 100644 index 0b35052..0000000 --- a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/translator/WindowBoundTranslator.java +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.jstorm.beam.translation.translator; - -import org.apache.beam.sdk.transforms.windowing.FixedWindows; -import org.apache.beam.sdk.transforms.windowing.SlidingWindows; -import org.apache.beam.sdk.transforms.windowing.Window; - -import com.alibaba.jstorm.beam.translation.TranslationContext; -import org.apache.beam.sdk.values.PValue; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Translates a Window.Bound node into a Storm WindowedBolt - * - * @param <T> - */ -public class WindowBoundTranslator<T> extends TransformTranslator.Default<Window.Assign<T>> { - private static final Logger LOG = LoggerFactory.getLogger(WindowBoundTranslator.class); - - // Do nothing here currently. The assign of window strategy is included in AssignTranslator. - @Override - public void translateNode(Window.Assign<T> transform, TranslationContext context) { - if (transform.getWindowFn() instanceof FixedWindows) { - context.getUserGraphContext().setWindowed(); - } else if (transform.getWindowFn() instanceof SlidingWindows) { - context.getUserGraphContext().setWindowed(); - } else { - throw new UnsupportedOperationException("Not supported window type currently: " + transform.getWindowFn()); - } - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/util/CommonInstance.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/util/CommonInstance.java b/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/util/CommonInstance.java deleted file mode 100644 index a75efa9..0000000 --- a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/util/CommonInstance.java +++ /dev/null @@ -1,25 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.jstorm.beam.translation.util; - -public class CommonInstance { - public static final String KEY = "Key"; - public static final String VALUE = "Value"; - - public static final String BEAM_WATERMARK_STREAM_ID = "BEAM_WATERMARK"; -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/util/DefaultSideInputReader.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/util/DefaultSideInputReader.java b/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/util/DefaultSideInputReader.java deleted file mode 100644 index 8bf49d8..0000000 --- a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/util/DefaultSideInputReader.java +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.jstorm.beam.translation.util; - -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.runners.core.SideInputReader; -import org.apache.beam.sdk.values.PCollectionView; - -import javax.annotation.Nullable; -import java.io.Serializable; - -/** - * No-op SideInputReader implementation. - */ -public class DefaultSideInputReader implements SideInputReader, Serializable { - @Nullable - @Override - public <T> T get(PCollectionView<T> pCollectionView, BoundedWindow boundedWindow) { - return null; - } - - @Override - public <T> boolean contains(PCollectionView<T> pCollectionView) { - return false; - } - - @Override - public boolean isEmpty() { - return true; - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/util/DefaultStepContext.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/util/DefaultStepContext.java b/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/util/DefaultStepContext.java deleted file mode 100644 index 08d1f2d..0000000 --- a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/util/DefaultStepContext.java +++ /dev/null @@ -1,89 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.jstorm.beam.translation.util; - -import org.apache.beam.runners.core.ExecutionContext; -import org.apache.beam.runners.core.TimerInternals; -import org.apache.beam.runners.core.StateInternals; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.util.WindowedValue; -import org.apache.beam.sdk.values.TupleTag; - -import java.io.IOException; - -import static com.google.common.base.Preconditions.checkNotNull; - -/** - * Default StepContext for running DoFn This does not allow accessing state or timer internals. - */ -public class DefaultStepContext implements ExecutionContext.StepContext { - - private TimerInternals timerInternals; - - private StateInternals stateInternals; - - public DefaultStepContext(TimerInternals timerInternals, StateInternals stateInternals) { - this.timerInternals = checkNotNull(timerInternals, "timerInternals"); - this.stateInternals = checkNotNull(stateInternals, "stateInternals"); - } - - @Override - public String getStepName() { - return null; - } - - @Override - public String getTransformName() { - return null; - } - - @Override - public void noteOutput(WindowedValue<?> windowedValue) { - - } - - @Override - public void noteOutput(TupleTag<?> tupleTag, WindowedValue<?> windowedValue) { - - } - - @Override - public <T, W extends BoundedWindow> void writePCollectionViewData(TupleTag<?> tag, Iterable<WindowedValue<T>> data, - Coder<Iterable<WindowedValue<T>>> dataCoder, W window, Coder<W> windowCoder) throws IOException { - throw new UnsupportedOperationException("Writing side-input data is not supported."); - } - - @Override - public StateInternals stateInternals() { - return stateInternals; - } - - @Override - public TimerInternals timerInternals() { - return timerInternals; - } - - public void setStateInternals(StateInternals stateInternals) { - this.stateInternals = stateInternals; - } - - public void setTimerInternals(TimerInternals timerInternals) { - this.timerInternals = timerInternals; - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/util/RunnerUtils.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/util/RunnerUtils.java b/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/util/RunnerUtils.java deleted file mode 100644 index 6cf3ae5..0000000 --- a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/util/RunnerUtils.java +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.jstorm.beam.util; - -import com.alibaba.jstorm.beam.translation.runtime.Executor; - -import com.alibaba.jstorm.beam.translation.runtime.GroupByWindowExecutor; -import com.alibaba.jstorm.beam.translation.runtime.MultiStatefulDoFnExecutor; -import com.alibaba.jstorm.beam.translation.runtime.StatefulDoFnExecutor; -import org.apache.beam.runners.core.KeyedWorkItem; -import org.apache.beam.sdk.util.WindowedValue; -import org.apache.beam.sdk.values.KV; - -public class RunnerUtils { - /** - * Convert WindowedValue<KV<>> into KeyedWorkItem<K, WindowedValue<V>> - * @param elem - * @return - */ - public static <K, V> KeyedWorkItem<K, V> toKeyedWorkItem(WindowedValue<KV<K, V>> elem) { - WindowedValue<KV<K, V>> kvElem = (WindowedValue<KV<K, V>>) elem; - SingletonKeyedWorkItem<K, V> workItem = SingletonKeyedWorkItem.of( - kvElem.getValue().getKey(), - kvElem.withValue(kvElem.getValue().getValue())); - return workItem; - } - - public static boolean isGroupByKeyExecutor (Executor executor) { - if (executor instanceof GroupByWindowExecutor) { - return true; - } else if (executor instanceof StatefulDoFnExecutor || - executor instanceof MultiStatefulDoFnExecutor) { - return true; - } else { - return false; - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/util/SerializedPipelineOptions.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/util/SerializedPipelineOptions.java b/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/util/SerializedPipelineOptions.java deleted file mode 100644 index 543db1c..0000000 --- a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/util/SerializedPipelineOptions.java +++ /dev/null @@ -1,64 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.alibaba.jstorm.beam.util; - -import com.fasterxml.jackson.databind.ObjectMapper; -import org.apache.beam.sdk.options.PipelineOptions; - -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.io.Serializable; - -import static com.google.common.base.Preconditions.checkNotNull; - -/** - * Encapsulates the PipelineOptions in serialized form to ship them to the cluster. - */ -public class SerializedPipelineOptions implements Serializable { - - private final byte[] serializedOptions; - - /** Lazily initialized copy of deserialized options */ - private transient PipelineOptions pipelineOptions; - - public SerializedPipelineOptions(PipelineOptions options) { - checkNotNull(options, "PipelineOptions must not be null."); - - try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) { - new ObjectMapper().writeValue(baos, options); - this.serializedOptions = baos.toByteArray(); - } catch (Exception e) { - throw new RuntimeException("Couldn't serialize PipelineOptions.", e); - } - - } - - public PipelineOptions getPipelineOptions() { - if (pipelineOptions == null) { - try { - pipelineOptions = new ObjectMapper().readValue(serializedOptions, PipelineOptions.class); - } catch (IOException e) { - throw new RuntimeException("Couldn't deserialize the PipelineOptions.", e); - } - } - - return pipelineOptions; - } - -} http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/util/SingletonKeyedWorkItem.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/util/SingletonKeyedWorkItem.java b/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/util/SingletonKeyedWorkItem.java deleted file mode 100644 index 2f9b224..0000000 --- a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/util/SingletonKeyedWorkItem.java +++ /dev/null @@ -1,62 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.jstorm.beam.util; - -import java.util.Collections; -import org.apache.beam.runners.core.KeyedWorkItem; -import org.apache.beam.runners.core.TimerInternals; -import org.apache.beam.sdk.util.WindowedValue; - -/** - * Singleton keyed word item. - * @param <K> - * @param <ElemT> - */ -public class SingletonKeyedWorkItem<K, ElemT> implements KeyedWorkItem<K, ElemT> { - - final K key; - final WindowedValue<ElemT> value; - - private SingletonKeyedWorkItem(K key, WindowedValue<ElemT> value) { - this.key = key; - this.value = value; - } - - public static <K, ElemT> SingletonKeyedWorkItem<K, ElemT> of(K key, WindowedValue<ElemT> value) { - return new SingletonKeyedWorkItem<K, ElemT>(key, value); - } - - @Override - public K key() { - return key; - } - - public WindowedValue<ElemT> value() { - return value; - } - - @Override - public Iterable<TimerInternals.TimerData> timersIterable() { - return Collections.EMPTY_LIST; - } - - @Override - public Iterable<WindowedValue<ElemT>> elementsIterable() { - return Collections.singletonList(value); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/StormPipelineOptions.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/StormPipelineOptions.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/StormPipelineOptions.java new file mode 100644 index 0000000..457beb6 --- /dev/null +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/StormPipelineOptions.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.jstorm; + +import avro.shaded.com.google.common.collect.Maps; +import org.apache.beam.sdk.options.*; + +import java.util.HashMap; +import java.util.Map; + +/** + * Options which can be used to configure a JStorm PipelineRunner. + */ +public interface StormPipelineOptions extends PipelineOptions, ApplicationNameOptions, StreamingOptions { + + @Description("Indicate if the topology is running on local machine or distributed cluster") + @Default.Boolean(false) + Boolean getLocalMode(); + void setLocalMode(Boolean isLocal); + + @Description("Executing time(sec) of topology on local mode. Default is 1min.") + @Default.Long(60) + Long getLocalModeExecuteTime(); + void setLocalModeExecuteTime(Long time); + + @Description("Worker number of topology") + @Default.Integer(1) + Integer getWorkerNumber(); + void setWorkerNumber(Integer number); + + @Description("Global parallelism number of a component") + @Default.Integer(1) + Integer getParallelismNumber(); + void setParallelismNumber(Integer number); + + @Description("System topology config of JStorm") + @Default.InstanceFactory(DefaultMapValueFactory.class) + Map getTopologyConfig(); + void setTopologyConfig(Map conf); + + @Description("Indicate if it is an exactly once topology") + @Default.Boolean(false) + Boolean getExactlyOnceTopology(); + void setExactlyOnceTopology(Boolean isExactlyOnce); + + @Description("Parallelism number of a specified composite PTransform") + @Default.InstanceFactory(DefaultMapValueFactory.class) + Map getParallelismNumMap(); + void setParallelismNumMap(Map parallelismNumMap); + + class DefaultMapValueFactory implements DefaultValueFactory<Map> { + @Override + public Map create(PipelineOptions pipelineOptions) { + return Maps.newHashMap(); + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/StormRegistrar.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/StormRegistrar.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/StormRegistrar.java new file mode 100644 index 0000000..12b3c18 --- /dev/null +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/StormRegistrar.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.jstorm; + +import com.google.auto.service.AutoService; +import com.google.common.collect.ImmutableList; + +import org.apache.beam.sdk.PipelineRunner; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PipelineOptionsRegistrar; +import org.apache.beam.sdk.runners.PipelineRunnerRegistrar; + +public class StormRegistrar { + private StormRegistrar() { + } + + @AutoService(PipelineRunnerRegistrar.class) + public static class Runner implements PipelineRunnerRegistrar { + @Override + public Iterable<Class<? extends PipelineRunner<?>>> getPipelineRunners() { + return ImmutableList.<Class<? extends PipelineRunner<?>>> of(StormRunner.class); + } + } + + @AutoService(PipelineOptionsRegistrar.class) + public static class Options implements PipelineOptionsRegistrar { + @Override + public Iterable<Class<? extends PipelineOptions>> getPipelineOptions() { + return ImmutableList.<Class<? extends PipelineOptions>> of(StormPipelineOptions.class); + } + } + +} http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/StormRunner.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/StormRunner.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/StormRunner.java new file mode 100644 index 0000000..8bee49f --- /dev/null +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/StormRunner.java @@ -0,0 +1,345 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.jstorm; + +import static com.google.common.base.Preconditions.checkNotNull; + +import backtype.storm.Config; +import backtype.storm.LocalCluster; +import backtype.storm.StormSubmitter; +import backtype.storm.generated.StormTopology; +import backtype.storm.topology.BoltDeclarer; +import backtype.storm.topology.IRichBolt; +import backtype.storm.topology.IRichSpout; +import backtype.storm.topology.TopologyBuilder; +import backtype.storm.tuple.Fields; + +import com.alibaba.jstorm.beam.serialization.*; +import org.apache.beam.runners.jstorm.serialization.ImmutableListSerializer; +import org.apache.beam.runners.jstorm.serialization.ImmutableMapSerializer; +import org.apache.beam.runners.jstorm.serialization.ImmutableSetSerializer; +import org.apache.beam.runners.jstorm.serialization.KvStoreIterableSerializer; +import org.apache.beam.runners.jstorm.serialization.SdkRepackImmuListSerializer; +import org.apache.beam.runners.jstorm.serialization.SdkRepackImmuSetSerializer; +import org.apache.beam.runners.jstorm.serialization.UnmodifiableCollectionsSerializer; +import org.apache.beam.runners.jstorm.translation.StormPipelineTranslator; +import org.apache.beam.runners.jstorm.translation.TranslationContext; +import org.apache.beam.runners.jstorm.translation.runtime.AbstractComponent; +import org.apache.beam.runners.jstorm.translation.runtime.AdaptorBasicBolt; +import org.apache.beam.runners.jstorm.translation.runtime.AdaptorBasicSpout; +import org.apache.beam.runners.jstorm.translation.runtime.ExecutorsBolt; +import org.apache.beam.runners.jstorm.translation.runtime.TxExecutorsBolt; +import org.apache.beam.runners.jstorm.translation.runtime.TxUnboundedSourceSpout; +import org.apache.beam.runners.jstorm.translation.runtime.UnboundedSourceSpout; +import org.apache.beam.runners.jstorm.translation.translator.Stream; +import org.apache.beam.runners.jstorm.translation.util.CommonInstance; +import com.alibaba.jstorm.cache.KvStoreIterable; +import com.alibaba.jstorm.cluster.StormConfig; +import com.alibaba.jstorm.transactional.TransactionTopologyBuilder; +import com.alibaba.jstorm.utils.JStormUtils; + +import java.io.IOException; +import java.util.HashMap; + +import java.util.Map; + +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.PipelineResult; +import org.apache.beam.sdk.PipelineRunner; +import org.apache.beam.sdk.metrics.MetricResults; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PipelineOptionsValidator; +import org.joda.time.Duration; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Main entry point into the Storm Runner. + * + * After reading the user defined pipeline, Beam will invoke the run() method with a representation of the pipeline. + */ +public class StormRunner extends PipelineRunner<StormRunner.StormPipelineResult> { + private static final Logger LOG = LoggerFactory.getLogger(StormRunner.class); + + private StormPipelineOptions options; + + public StormRunner(StormPipelineOptions options) { + this.options = options; + } + + public static StormRunner fromOptions(PipelineOptions options) { + StormPipelineOptions pipelineOptions = PipelineOptionsValidator.validate(StormPipelineOptions.class, options); + return new StormRunner(pipelineOptions); + } + + /** + * convert pipeline options to storm configuration format + * @param options + * @return + */ + private Config convertPipelineOptionsToConfig(StormPipelineOptions options) { + Config config = new Config(); + if (options.getLocalMode()) + config.put(Config.STORM_CLUSTER_MODE, "local"); + else + config.put(Config.STORM_CLUSTER_MODE, "distributed"); + + Config.setNumWorkers(config, options.getWorkerNumber()); + + config.putAll(options.getTopologyConfig()); + + // Setup config for runtime env + config.put("worker.external", "beam"); + config.put("topology.acker.executors", 0); + + UnmodifiableCollectionsSerializer.registerSerializers(config); + // register classes of guava utils, ImmutableList, ImmutableSet, ImmutableMap + ImmutableListSerializer.registerSerializers(config); + SdkRepackImmuListSerializer.registerSerializers(config); + ImmutableSetSerializer.registerSerializers(config); + SdkRepackImmuSetSerializer.registerSerializers(config); + ImmutableMapSerializer.registerSerializers(config); + + config.registerDefaultSerailizer(KvStoreIterable.class, KvStoreIterableSerializer.class); + return config; + } + + @Override + public StormPipelineResult run(Pipeline pipeline) { + LOG.info("Running pipeline..."); + TranslationContext context = new TranslationContext(this.options); + StormPipelineTranslator transformer = new StormPipelineTranslator(context); + transformer.translate(pipeline); + LOG.info("UserGraphContext=\n{}", context.getUserGraphContext()); + LOG.info("ExecutionGraphContext=\n{}", context.getExecutionGraphContext()); + + for (Stream stream : context.getExecutionGraphContext().getStreams()) { + LOG.info(stream.getProducer().getComponentId() + " --> " + stream.getConsumer().getComponentId()); + } + + String topologyName = options.getJobName(); + Config config = convertPipelineOptionsToConfig(options); + + return runTopology( + topologyName, + getTopology(options, context.getExecutionGraphContext()), + config); + } + + private StormPipelineResult runTopology(String topologyName, StormTopology topology, Config config) { + try { + if (StormConfig.local_mode(config)) { + LocalCluster localCluster = LocalCluster.getInstance(); + localCluster.submitTopology(topologyName, config, topology); + return new LocalStormPipelineResult( + topologyName, config, localCluster, options.getLocalModeExecuteTime()); + } else { + StormSubmitter.submitTopology(topologyName, config, topology); + return null; + } + } catch (Exception e) { + LOG.warn("Fail to submit topology", e); + throw new RuntimeException("Fail to submit topology", e); + } + } + + public static abstract class StormPipelineResult implements PipelineResult { + + private final String topologyName; + private final Config config; + + StormPipelineResult(String topologyName, Config config) { + this.config = checkNotNull(config, "config"); + this.topologyName = checkNotNull(topologyName, "topologyName"); + } + + public State getState() { + return null; + } + + public Config getConfig() { + return config; + } + + public String getTopologyName() { + return topologyName; + } + } + + public static class LocalStormPipelineResult extends StormPipelineResult { + + private LocalCluster localCluster; + private long localModeExecuteTimeSecs; + + LocalStormPipelineResult( + String topologyName, + Config config, + LocalCluster localCluster, + long localModeExecuteTimeSecs) { + super(topologyName, config); + this.localCluster = checkNotNull(localCluster, "localCluster"); + } + + @Override + public State cancel() throws IOException { + //localCluster.deactivate(getTopologyName()); + localCluster.killTopology(getTopologyName()); + localCluster.shutdown(); + JStormUtils.sleepMs(1000); + return State.CANCELLED; + } + + @Override + public State waitUntilFinish(Duration duration) { + return waitUntilFinish(); + } + + @Override + public State waitUntilFinish() { + JStormUtils.sleepMs(localModeExecuteTimeSecs * 1000); + try { + return cancel(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public MetricResults metrics() { + return null; + } + } + + private AbstractComponent getComponent(String id, TranslationContext.ExecutionGraphContext context) { + AbstractComponent component = null; + AdaptorBasicSpout spout = context.getSpout(id); + if (spout != null) { + component = spout; + } else { + AdaptorBasicBolt bolt = context.getBolt(id); + if (bolt != null) + component = bolt; + } + + return component; + } + + private StormTopology getTopology(StormPipelineOptions options, TranslationContext.ExecutionGraphContext context) { + boolean isExactlyOnce = options.getExactlyOnceTopology(); + TopologyBuilder builder = isExactlyOnce ? new TransactionTopologyBuilder() : new TopologyBuilder(); + + int parallelismNumber = options.getParallelismNumber(); + Map<String, AdaptorBasicSpout> spouts = context.getSpouts(); + for (String id : spouts.keySet()) { + IRichSpout spout = getSpout(isExactlyOnce, spouts.get(id)); + builder.setSpout(id, spout, getParallelismNum(spouts.get(id), parallelismNumber)); + } + + HashMap<String, BoltDeclarer> declarers = new HashMap<>(); + Iterable<Stream> streams = context.getStreams(); + LOG.info("streams=" + streams); + for (Stream stream : streams) { + String destBoltId = stream.getConsumer().getComponentId(); + IRichBolt bolt = getBolt(isExactlyOnce, context.getBolt(destBoltId)); + BoltDeclarer declarer = declarers.get(destBoltId); + if (declarer == null) { + declarer = builder.setBolt(destBoltId, bolt, + getParallelismNum(context.getBolt(destBoltId), parallelismNumber)); + declarers.put(destBoltId, declarer); + } + + Stream.Grouping grouping = stream.getConsumer().getGrouping(); + String streamId = stream.getProducer().getStreamId(); + String srcBoltId = stream.getProducer().getComponentId(); + + // add stream output declare for "from" component + AbstractComponent component = getComponent(srcBoltId, context); + if (grouping.getType().equals(Stream.Grouping.Type.FIELDS)) + component.addKVOutputField(streamId); + else + component.addOutputField(streamId); + + // "to" component declares grouping to "from" component + switch (grouping.getType()) { + case SHUFFLE: + declarer.shuffleGrouping(srcBoltId, streamId); + break; + case FIELDS: + declarer.fieldsGrouping(srcBoltId, streamId, new Fields(grouping.getFields())); + break; + case ALL: + declarer.allGrouping(srcBoltId, streamId); + break; + case DIRECT: + declarer.directGrouping(srcBoltId, streamId); + break; + case GLOBAL: + declarer.globalGrouping(srcBoltId, streamId); + break; + case LOCAL_OR_SHUFFLE: + declarer.localOrShuffleGrouping(srcBoltId, streamId); + break; + case NONE: + declarer.noneGrouping(srcBoltId, streamId); + break; + default: + throw new UnsupportedOperationException("unsupported grouping type: " + grouping); + } + + // Subscribe grouping of water mark stream + component.addOutputField(CommonInstance.BEAM_WATERMARK_STREAM_ID); + declarer.allGrouping(srcBoltId, CommonInstance.BEAM_WATERMARK_STREAM_ID); + } + + if (isExactlyOnce) { + ((TransactionTopologyBuilder) builder).enableHdfs(); + } + return builder.createTopology(); + } + + private IRichSpout getSpout(boolean isExactlyOnce, IRichSpout spout) { + IRichSpout ret = null; + if (isExactlyOnce) { + if (spout instanceof UnboundedSourceSpout) { + ret = new TxUnboundedSourceSpout((UnboundedSourceSpout) spout); + } else { + String error = String.format("The specified type(%s) is not supported in exactly once mode yet!", spout.getClass().toString()); + throw new RuntimeException(error); + } + } else { + ret = spout; + } + return ret; + } + + private IRichBolt getBolt(boolean isExactlyOnce, ExecutorsBolt bolt) { + return isExactlyOnce ? new TxExecutorsBolt(bolt) : bolt; + } + + /** + * Calculate the final parallelism number according to the configured number and global number. + * @param component + * @param globalParallelismNum + * @return final parallelism number for the specified component + */ + private int getParallelismNum(AbstractComponent component, int globalParallelismNum) { + int configParallelismNum = component.getParallelismNum(); + return configParallelismNum > 0 ? configParallelismNum : globalParallelismNum; + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/TestJStormRunner.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/TestJStormRunner.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/TestJStormRunner.java new file mode 100644 index 0000000..fa7bdf3 --- /dev/null +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/TestJStormRunner.java @@ -0,0 +1,120 @@ +package org.apache.beam.runners.jstorm; + +import avro.shaded.com.google.common.collect.Maps; +import com.alibaba.jstorm.common.metric.AsmMetric; +import com.alibaba.jstorm.metric.*; +import com.alibaba.jstorm.utils.JStormUtils; +import com.google.common.base.Optional; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.PipelineRunner; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.testing.PAssert; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Iterator; +import java.util.Map; + +import static com.google.common.base.Preconditions.checkNotNull; + +/** + * Test JStorm runner. + */ +public class TestJStormRunner extends PipelineRunner<StormRunner.StormPipelineResult> { + + private static final Logger LOG = LoggerFactory.getLogger(TestJStormRunner.class); + + public static TestJStormRunner fromOptions(PipelineOptions options) { + return new TestJStormRunner(options.as(StormPipelineOptions.class)); + } + + private final StormRunner stormRunner; + private final StormPipelineOptions options; + + private TestJStormRunner(StormPipelineOptions options) { + this.options = options; + Map conf = Maps.newHashMap(); + //conf.put(ConfigExtension.KV_STORE_TYPE, KvStoreManagerFactory.KvStoreType.memory.toString()); + options.setTopologyConfig(conf); + options.setLocalMode(true); + stormRunner = StormRunner.fromOptions(checkNotNull(options, "options")); + } + + @Override + public StormRunner.StormPipelineResult run(Pipeline pipeline) { + StormRunner.StormPipelineResult result = stormRunner.run(pipeline); + + try { + int numberOfAssertions = PAssert.countAsserts(pipeline); + + LOG.info("Running JStorm job {} with {} expected assertions.", result.getTopologyName(), numberOfAssertions); + if(numberOfAssertions == 0) { + // If assert number is zero, wait 5 sec + JStormUtils.sleepMs(5000); + return result; + } else { + for (int i = 0; i < 40; ++i) { + Optional<Boolean> success = checkForPAssertSuccess(numberOfAssertions); + if (success.isPresent() && success.get()) { + return result; + } else if (success.isPresent() && !success.get()) { + throw new AssertionError("Failed assertion checks."); + } else { + JStormUtils.sleepMs(500); + } + } + LOG.info("Assertion checks timed out."); + throw new AssertionError("Assertion checks timed out."); + } + } finally { + clearPAssertCount(); + cancel(result); + } + } + + private Optional<Boolean> checkForPAssertSuccess(int expectedNumberOfAssertions) { + int successes = 0; + for (AsmMetric metric : JStormMetrics.search(PAssert.SUCCESS_COUNTER, MetaType.TASK, MetricType.COUNTER)) { + successes += ((Long) metric.getValue(AsmWindow.M1_WINDOW)).intValue(); + } + int failures = 0; + for (AsmMetric metric : JStormMetrics.search(PAssert.FAILURE_COUNTER, MetaType.TASK, MetricType.COUNTER)) { + failures += ((Long) metric.getValue(AsmWindow.M1_WINDOW)).intValue(); + } + + if (failures > 0) { + LOG.info("Found {} success, {} failures out of {} expected assertions.", + successes, failures, expectedNumberOfAssertions); + return Optional.of(false); + } else if (successes >= expectedNumberOfAssertions) { + LOG.info("Found {} success, {} failures out of {} expected assertions.", + successes, failures, expectedNumberOfAssertions); + return Optional.of(true); + } + + LOG.info("Found {} success, {} failures out of {} expected assertions.", + successes, failures, expectedNumberOfAssertions); + return Optional.absent(); + } + + private void clearPAssertCount() { + String topologyName = options.getJobName(); + AsmMetricRegistry taskMetrics = JStormMetrics.getTaskMetrics(); + Iterator<Map.Entry<String, AsmMetric>> itr = taskMetrics.getMetrics().entrySet().iterator(); + while (itr.hasNext()) { + Map.Entry<String, AsmMetric> metric = itr.next(); + if (metric.getKey().contains(topologyName)) { + itr.remove(); + } + } + } + + private void cancel(StormRunner.StormPipelineResult result) { + try { + result.cancel(); + } catch (IOException e) { + throw new RuntimeException("Failed to cancel.", e); +} + } +}
