BEAM-261 Make translators package private.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/5553c603 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/5553c603 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/5553c603 Branch: refs/heads/apex-runner Commit: 5553c603a0c48855d38d4702f19e905eac2034f2 Parents: 9197d1e Author: Thomas Weise <t...@apache.org> Authored: Thu Oct 27 16:19:15 2016 -0700 Committer: Thomas Weise <t...@apache.org> Committed: Mon Nov 7 22:33:46 2016 +0100 ---------------------------------------------------------------------- .../runners/apex/ApexPipelineTranslator.java | 185 ------- .../apache/beam/runners/apex/ApexRunner.java | 13 +- .../beam/runners/apex/ApexRunnerResult.java | 2 +- .../translation/ApexPipelineTranslator.java | 179 +++++++ .../translation/CreateValuesTranslator.java | 48 ++ .../FlattenPCollectionTranslator.java | 129 +++++ .../apex/translation/GroupByKeyTranslator.java | 42 ++ .../translation/ParDoBoundMultiTranslator.java | 142 ++++++ .../apex/translation/ParDoBoundTranslator.java | 64 +++ .../translation/ReadUnboundedTranslator.java | 42 ++ .../apex/translation/TransformTranslator.java | 31 ++ .../apex/translation/TranslationContext.java | 178 +++++++ .../operators/ApexFlattenOperator.java | 125 +++++ .../operators/ApexGroupByKeyOperator.java | 478 +++++++++++++++++++ .../operators/ApexParDoOperator.java | 375 +++++++++++++++ .../ApexReadUnboundedInputOperator.java | 155 ++++++ .../translation/operators/package-info.java | 22 + .../runners/apex/translation/package-info.java | 22 + .../translation/utils/ApexStateInternals.java | 438 +++++++++++++++++ .../apex/translation/utils/ApexStreamTuple.java | 222 +++++++++ .../utils/CoderAdapterStreamCodec.java | 69 +++ .../apex/translation/utils/NoOpStepContext.java | 72 +++ .../utils/SerializablePipelineOptions.java | 60 +++ .../utils/ValueAndCoderKryoSerializable.java | 77 +++ .../apex/translation/utils/ValuesSource.java | 149 ++++++ .../apex/translation/utils/package-info.java | 22 + .../translators/CreateValuesTranslator.java | 48 -- .../FlattenPCollectionTranslator.java | 129 ----- .../apex/translators/GroupByKeyTranslator.java | 42 -- .../translators/ParDoBoundMultiTranslator.java | 142 ------ .../apex/translators/ParDoBoundTranslator.java | 64 --- .../translators/ReadUnboundedTranslator.java | 42 -- .../apex/translators/TransformTranslator.java | 31 -- .../apex/translators/TranslationContext.java | 178 ------- .../functions/ApexFlattenOperator.java | 125 ----- .../functions/ApexGroupByKeyOperator.java | 478 ------------------- .../functions/ApexParDoOperator.java | 375 --------------- .../translators/functions/package-info.java | 22 - .../io/ApexReadUnboundedInputOperator.java | 154 ------ .../apex/translators/io/ValuesSource.java | 149 ------ .../apex/translators/io/package-info.java | 22 - .../runners/apex/translators/package-info.java | 22 - .../translators/utils/ApexStateInternals.java | 438 ----------------- .../apex/translators/utils/ApexStreamTuple.java | 222 --------- .../utils/CoderAdapterStreamCodec.java | 69 --- .../apex/translators/utils/NoOpStepContext.java | 72 --- .../utils/SerializablePipelineOptions.java | 60 --- .../utils/ValueAndCoderKryoSerializable.java | 77 --- .../apex/translators/utils/package-info.java | 22 - .../translation/ApexGroupByKeyOperatorTest.java | 112 +++++ .../FlattenPCollectionTranslatorTest.java | 99 ++++ .../translation/GroupByKeyTranslatorTest.java | 246 ++++++++++ .../translation/ParDoBoundTranslatorTest.java | 340 +++++++++++++ .../translation/ReadUnboundTranslatorTest.java | 129 +++++ .../utils/ApexStateInternalsTest.java | 361 ++++++++++++++ .../translation/utils/CollectionSource.java | 136 ++++++ .../translation/utils/PipelineOptionsTest.java | 84 ++++ .../translators/ApexGroupByKeyOperatorTest.java | 112 ----- .../FlattenPCollectionTranslatorTest.java | 99 ---- .../translators/GroupByKeyTranslatorTest.java | 246 ---------- .../translators/ParDoBoundTranslatorTest.java | 340 ------------- .../translators/ReadUnboundTranslatorTest.java | 129 ----- .../utils/ApexStateInternalsTest.java | 361 -------------- .../translators/utils/CollectionSource.java | 136 ------ .../translators/utils/PipelineOptionsTest.java | 84 ---- 65 files changed, 4653 insertions(+), 4685 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5553c603/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexPipelineTranslator.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexPipelineTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexPipelineTranslator.java deleted file mode 100644 index 8a87ce0..0000000 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexPipelineTranslator.java +++ /dev/null @@ -1,185 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.runners.apex; - -import com.datatorrent.api.DAG; - -import java.util.HashMap; -import java.util.Map; - -import org.apache.beam.runners.apex.ApexRunner.CreateApexPCollectionView; -import org.apache.beam.runners.apex.translators.CreateValuesTranslator; -import org.apache.beam.runners.apex.translators.FlattenPCollectionTranslator; -import org.apache.beam.runners.apex.translators.GroupByKeyTranslator; -import org.apache.beam.runners.apex.translators.ParDoBoundMultiTranslator; -import org.apache.beam.runners.apex.translators.ParDoBoundTranslator; -import org.apache.beam.runners.apex.translators.ReadUnboundedTranslator; -import org.apache.beam.runners.apex.translators.TransformTranslator; -import org.apache.beam.runners.apex.translators.TranslationContext; -import org.apache.beam.runners.apex.translators.io.ApexReadUnboundedInputOperator; -import org.apache.beam.runners.core.UnboundedReadFromBoundedSource.BoundedToUnboundedSourceAdapter; -import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.io.Read; -import org.apache.beam.sdk.runners.TransformTreeNode; -import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.transforms.Flatten; -import org.apache.beam.sdk.transforms.GroupByKey; -import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.transforms.View.CreatePCollectionView; -import org.apache.beam.sdk.values.PCollectionView; -import org.apache.beam.sdk.values.PValue; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * {@link ApexPipelineTranslator} translates {@link Pipeline} objects - * into Apex logical plan {@link DAG}. - */ -@SuppressWarnings({"rawtypes", "unchecked"}) -public class ApexPipelineTranslator implements Pipeline.PipelineVisitor { - private static final Logger LOG = LoggerFactory.getLogger(ApexPipelineTranslator.class); - - /** - * A map from {@link PTransform} subclass to the corresponding - * {@link TransformTranslator} to use to translate that transform. - */ - private static final Map<Class<? extends PTransform>, TransformTranslator> - transformTranslators = new HashMap<>(); - - private final TranslationContext translationContext; - - static { - // register TransformTranslators - registerTransformTranslator(ParDo.Bound.class, new ParDoBoundTranslator()); - registerTransformTranslator(ParDo.BoundMulti.class, new ParDoBoundMultiTranslator<>()); - registerTransformTranslator(Read.Unbounded.class, new ReadUnboundedTranslator()); - registerTransformTranslator(Read.Bounded.class, new ReadBoundedTranslator()); - registerTransformTranslator(GroupByKey.class, new GroupByKeyTranslator()); - registerTransformTranslator(Flatten.FlattenPCollectionList.class, - new FlattenPCollectionTranslator()); - registerTransformTranslator(Create.Values.class, new CreateValuesTranslator()); - registerTransformTranslator(CreateApexPCollectionView.class, - new CreateApexPCollectionViewTranslator()); - registerTransformTranslator(CreatePCollectionView.class, - new CreatePCollectionViewTranslator()); - } - - public ApexPipelineTranslator(TranslationContext translationContext) { - this.translationContext = translationContext; - } - - public void translate(Pipeline pipeline) { - pipeline.traverseTopologically(this); - } - - @Override - public CompositeBehavior enterCompositeTransform(TransformTreeNode node) { - LOG.debug("entering composite transform {}", node.getTransform()); - return CompositeBehavior.ENTER_TRANSFORM; - } - - @Override - public void leaveCompositeTransform(TransformTreeNode node) { - LOG.debug("leaving composite transform {}", node.getTransform()); - } - - @Override - public void visitPrimitiveTransform(TransformTreeNode node) { - LOG.debug("visiting transform {}", node.getTransform()); - PTransform transform = node.getTransform(); - TransformTranslator translator = getTransformTranslator(transform.getClass()); - if (null == translator) { - throw new UnsupportedOperationException( - "no translator registered for " + transform); - } - translationContext.setCurrentTransform(node); - translator.translate(transform, translationContext); - } - - @Override - public void visitValue(PValue value, TransformTreeNode producer) { - LOG.debug("visiting value {}", value); - } - - /** - * Records that instances of the specified PTransform class - * should be translated by default by the corresponding - * {@link TransformTranslator}. - */ - private static <TransformT extends PTransform> void registerTransformTranslator( - Class<TransformT> transformClass, - TransformTranslator<? extends TransformT> transformTranslator) { - if (transformTranslators.put(transformClass, transformTranslator) != null) { - throw new IllegalArgumentException( - "defining multiple translators for " + transformClass); - } - } - - /** - * Returns the {@link TransformTranslator} to use for instances of the - * specified PTransform class, or null if none registered. - */ - private <TransformT extends PTransform<?, ?>> - TransformTranslator<TransformT> getTransformTranslator(Class<TransformT> transformClass) { - return transformTranslators.get(transformClass); - } - - private static class ReadBoundedTranslator<T> implements TransformTranslator<Read.Bounded<T>> { - private static final long serialVersionUID = 1L; - - @Override - public void translate(Read.Bounded<T> transform, TranslationContext context) { - // TODO: adapter is visibleForTesting - BoundedToUnboundedSourceAdapter unboundedSource = new BoundedToUnboundedSourceAdapter<>( - transform.getSource()); - ApexReadUnboundedInputOperator<T, ?> operator = new ApexReadUnboundedInputOperator<>( - unboundedSource, true, context.getPipelineOptions()); - context.addOperator(operator, operator.output); - } - - } - - private static class CreateApexPCollectionViewTranslator<ElemT, ViewT> - implements TransformTranslator<CreateApexPCollectionView<ElemT, ViewT>> { - private static final long serialVersionUID = 1L; - - @Override - public void translate(CreateApexPCollectionView<ElemT, ViewT> transform, - TranslationContext context) { - PCollectionView<ViewT> view = transform.getView(); - context.addView(view); - LOG.debug("view {}", view.getName()); - } - } - - private static class CreatePCollectionViewTranslator<ElemT, ViewT> - implements TransformTranslator<CreatePCollectionView<ElemT, ViewT>> { - private static final long serialVersionUID = 1L; - - @Override - public void translate(CreatePCollectionView<ElemT, ViewT> transform, - TranslationContext context) { - PCollectionView<ViewT> view = transform.getView(); - context.addView(view); - LOG.debug("view {}", view.getName()); - } - } - -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5553c603/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java index 661308d..b42dddf 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java @@ -29,7 +29,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; -import org.apache.beam.runners.apex.translators.TranslationContext; +import org.apache.beam.runners.apex.translation.ApexPipelineTranslator; import org.apache.beam.runners.core.AssignWindows; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.Coder; @@ -118,17 +118,15 @@ public class ApexRunner extends PipelineRunner<ApexRunnerResult> { } @Override - public ApexRunnerResult run(Pipeline pipeline) { + public ApexRunnerResult run(final Pipeline pipeline) { - final TranslationContext translationContext = new TranslationContext(options); - ApexPipelineTranslator translator = new ApexPipelineTranslator(translationContext); - translator.translate(pipeline); + final ApexPipelineTranslator translator = new ApexPipelineTranslator(options); StreamingApplication apexApp = new StreamingApplication() { @Override public void populateDAG(DAG dag, Configuration conf) { dag.setAttribute(DAGContext.APPLICATION_NAME, options.getApplicationName()); - translationContext.populateDAG(dag); + translator.translate(pipeline, dag); } }; @@ -352,9 +350,6 @@ public class ApexRunner extends PipelineRunner<ApexRunnerResult> { /** * Combiner that combines {@code T}s into a single {@code List<T>} containing all inputs. - * - * <p>For internal use by {@link StreamingViewAsMap}, {@link StreamingViewAsMultimap}, - * {@link StreamingViewAsList}, {@link StreamingViewAsIterable}. * They require the input {@link PCollection} fits in memory. * For a large {@link PCollection} this is expected to crash! * http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5553c603/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunnerResult.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunnerResult.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunnerResult.java index 03428a6..3ae69f2 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunnerResult.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunnerResult.java @@ -79,7 +79,7 @@ public class ApexRunnerResult implements PipelineResult { /** * Return the DAG executed by the pipeline. - * @return + * @return DAG from translation. */ public DAG getApexDAG() { return apexDAG; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5553c603/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ApexPipelineTranslator.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ApexPipelineTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ApexPipelineTranslator.java new file mode 100644 index 0000000..d38faf7 --- /dev/null +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ApexPipelineTranslator.java @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.apex.translation; + +import com.datatorrent.api.DAG; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.beam.runners.apex.ApexPipelineOptions; +import org.apache.beam.runners.apex.ApexRunner.CreateApexPCollectionView; +import org.apache.beam.runners.apex.translation.operators.ApexReadUnboundedInputOperator; +import org.apache.beam.runners.core.UnboundedReadFromBoundedSource.BoundedToUnboundedSourceAdapter; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.io.Read; +import org.apache.beam.sdk.runners.TransformTreeNode; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.Flatten; +import org.apache.beam.sdk.transforms.GroupByKey; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.View.CreatePCollectionView; +import org.apache.beam.sdk.values.PCollectionView; +import org.apache.beam.sdk.values.PValue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * {@link ApexPipelineTranslator} translates {@link Pipeline} objects + * into Apex logical plan {@link DAG}. + */ +@SuppressWarnings({"rawtypes", "unchecked"}) +public class ApexPipelineTranslator implements Pipeline.PipelineVisitor { + private static final Logger LOG = LoggerFactory.getLogger(ApexPipelineTranslator.class); + + /** + * A map from {@link PTransform} subclass to the corresponding + * {@link TransformTranslator} to use to translate that transform. + */ + private static final Map<Class<? extends PTransform>, TransformTranslator> + transformTranslators = new HashMap<>(); + + private final TranslationContext translationContext; + + static { + // register TransformTranslators + registerTransformTranslator(ParDo.Bound.class, new ParDoBoundTranslator()); + registerTransformTranslator(ParDo.BoundMulti.class, new ParDoBoundMultiTranslator<>()); + registerTransformTranslator(Read.Unbounded.class, new ReadUnboundedTranslator()); + registerTransformTranslator(Read.Bounded.class, new ReadBoundedTranslator()); + registerTransformTranslator(GroupByKey.class, new GroupByKeyTranslator()); + registerTransformTranslator(Flatten.FlattenPCollectionList.class, + new FlattenPCollectionTranslator()); + registerTransformTranslator(Create.Values.class, new CreateValuesTranslator()); + registerTransformTranslator(CreateApexPCollectionView.class, + new CreateApexPCollectionViewTranslator()); + registerTransformTranslator(CreatePCollectionView.class, + new CreatePCollectionViewTranslator()); + } + + public ApexPipelineTranslator(ApexPipelineOptions options) { + this.translationContext = new TranslationContext(options); + } + + public void translate(Pipeline pipeline, DAG dag) { + pipeline.traverseTopologically(this); + translationContext.populateDAG(dag); + } + + @Override + public CompositeBehavior enterCompositeTransform(TransformTreeNode node) { + LOG.debug("entering composite transform {}", node.getTransform()); + return CompositeBehavior.ENTER_TRANSFORM; + } + + @Override + public void leaveCompositeTransform(TransformTreeNode node) { + LOG.debug("leaving composite transform {}", node.getTransform()); + } + + @Override + public void visitPrimitiveTransform(TransformTreeNode node) { + LOG.debug("visiting transform {}", node.getTransform()); + PTransform transform = node.getTransform(); + TransformTranslator translator = getTransformTranslator(transform.getClass()); + if (null == translator) { + throw new UnsupportedOperationException( + "no translator registered for " + transform); + } + translationContext.setCurrentTransform(node); + translator.translate(transform, translationContext); + } + + @Override + public void visitValue(PValue value, TransformTreeNode producer) { + LOG.debug("visiting value {}", value); + } + + /** + * Records that instances of the specified PTransform class + * should be translated by default by the corresponding + * {@link TransformTranslator}. + */ + private static <TransformT extends PTransform> void registerTransformTranslator( + Class<TransformT> transformClass, + TransformTranslator<? extends TransformT> transformTranslator) { + if (transformTranslators.put(transformClass, transformTranslator) != null) { + throw new IllegalArgumentException( + "defining multiple translators for " + transformClass); + } + } + + /** + * Returns the {@link TransformTranslator} to use for instances of the + * specified PTransform class, or null if none registered. + */ + private <TransformT extends PTransform<?, ?>> + TransformTranslator<TransformT> getTransformTranslator(Class<TransformT> transformClass) { + return transformTranslators.get(transformClass); + } + + private static class ReadBoundedTranslator<T> implements TransformTranslator<Read.Bounded<T>> { + private static final long serialVersionUID = 1L; + + @Override + public void translate(Read.Bounded<T> transform, TranslationContext context) { + // TODO: adapter is visibleForTesting + BoundedToUnboundedSourceAdapter unboundedSource = new BoundedToUnboundedSourceAdapter<>( + transform.getSource()); + ApexReadUnboundedInputOperator<T, ?> operator = new ApexReadUnboundedInputOperator<>( + unboundedSource, true, context.getPipelineOptions()); + context.addOperator(operator, operator.output); + } + + } + + private static class CreateApexPCollectionViewTranslator<ElemT, ViewT> + implements TransformTranslator<CreateApexPCollectionView<ElemT, ViewT>> { + private static final long serialVersionUID = 1L; + + @Override + public void translate(CreateApexPCollectionView<ElemT, ViewT> transform, + TranslationContext context) { + PCollectionView<ViewT> view = transform.getView(); + context.addView(view); + LOG.debug("view {}", view.getName()); + } + } + + private static class CreatePCollectionViewTranslator<ElemT, ViewT> + implements TransformTranslator<CreatePCollectionView<ElemT, ViewT>> { + private static final long serialVersionUID = 1L; + + @Override + public void translate(CreatePCollectionView<ElemT, ViewT> transform, + TranslationContext context) { + PCollectionView<ViewT> view = transform.getView(); + context.addView(view); + LOG.debug("view {}", view.getName()); + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5553c603/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/CreateValuesTranslator.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/CreateValuesTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/CreateValuesTranslator.java new file mode 100644 index 0000000..ceae2b5 --- /dev/null +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/CreateValuesTranslator.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.apex.translation; + +import org.apache.beam.runners.apex.translation.operators.ApexReadUnboundedInputOperator; +import org.apache.beam.runners.apex.translation.utils.ValuesSource; +import org.apache.beam.sdk.coders.CannotProvideCoderException; +import org.apache.beam.sdk.io.UnboundedSource; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.values.PBegin; + + +/** + * Wraps elements from Create.Values into an {@link UnboundedSource}. + * mainly used for testing + */ +class CreateValuesTranslator<T> implements TransformTranslator<Create.Values<T>> { + private static final long serialVersionUID = 1451000241832745629L; + + @Override + public void translate(Create.Values<T> transform, TranslationContext context) { + try { + UnboundedSource<T, ?> unboundedSource = new ValuesSource<>(transform.getElements(), + transform.getDefaultOutputCoder((PBegin) context.getInput())); + ApexReadUnboundedInputOperator<T, ?> operator = new ApexReadUnboundedInputOperator<>( + unboundedSource, context.getPipelineOptions()); + context.addOperator(operator, operator.output); + } catch (CannotProvideCoderException e) { + throw new RuntimeException(e); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5553c603/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/FlattenPCollectionTranslator.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/FlattenPCollectionTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/FlattenPCollectionTranslator.java new file mode 100644 index 0000000..eb24af9 --- /dev/null +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/FlattenPCollectionTranslator.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.apex.translation; + +import com.google.common.collect.Lists; + +import java.util.Collections; +import java.util.List; +import java.util.Map; + +import org.apache.beam.runners.apex.translation.operators.ApexFlattenOperator; +import org.apache.beam.runners.apex.translation.operators.ApexReadUnboundedInputOperator; +import org.apache.beam.runners.apex.translation.utils.ValuesSource; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.VoidCoder; +import org.apache.beam.sdk.io.UnboundedSource; +import org.apache.beam.sdk.transforms.Flatten; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionList; + +/** + * {@link Flatten.FlattenPCollectionList} translation to Apex operator. + */ +class FlattenPCollectionTranslator<T> implements + TransformTranslator<Flatten.FlattenPCollectionList<T>> { + private static final long serialVersionUID = 1L; + + @Override + public void translate(Flatten.FlattenPCollectionList<T> transform, TranslationContext context) { + PCollectionList<T> input = context.getInput(); + List<PCollection<T>> collections = input.getAll(); + + if (collections.isEmpty()) { + // create a dummy source that never emits anything + @SuppressWarnings("unchecked") + UnboundedSource<T, ?> unboundedSource = new ValuesSource<>(Collections.EMPTY_LIST, + (Coder<T>) VoidCoder.of()); + ApexReadUnboundedInputOperator<T, ?> operator = new ApexReadUnboundedInputOperator<>( + unboundedSource, context.getPipelineOptions()); + context.addOperator(operator, operator.output); + } else { + PCollection<T> output = context.getOutput(); + Map<PCollection<?>, Integer> unionTags = Collections.emptyMap(); + flattenCollections(collections, unionTags, output, context); + } + } + + /** + * Flatten the given collections into the given result collection. Translates + * into a cascading merge with 2 input ports per operator. The optional union + * tags can be used to identify the source in the result stream, used to + * channel multiple side inputs to a single Apex operator port. + * + * @param collections + * @param unionTags + * @param finalCollection + * @param context + */ + static <T> void flattenCollections(List<PCollection<T>> collections, Map<PCollection<?>, + Integer> unionTags, PCollection<T> finalCollection, TranslationContext context) { + List<PCollection<T>> remainingCollections = Lists.newArrayList(); + PCollection<T> firstCollection = null; + while (!collections.isEmpty()) { + for (PCollection<T> collection : collections) { + if (null == firstCollection) { + firstCollection = collection; + } else { + ApexFlattenOperator<T> operator = new ApexFlattenOperator<>(); + context.addStream(firstCollection, operator.data1); + Integer unionTag = unionTags.get(firstCollection); + operator.data1Tag = (unionTag != null) ? unionTag : 0; + context.addStream(collection, operator.data2); + unionTag = unionTags.get(collection); + operator.data2Tag = (unionTag != null) ? unionTag : 0; + + if (!collection.getCoder().equals(firstCollection.getCoder())) { + throw new UnsupportedOperationException("coders don't match"); + } + + if (collections.size() > 2) { + PCollection<T> intermediateCollection = intermediateCollection(collection, + collection.getCoder()); + context.addOperator(operator, operator.out, intermediateCollection); + remainingCollections.add(intermediateCollection); + } else { + // final stream merge + context.addOperator(operator, operator.out, finalCollection); + } + firstCollection = null; + } + } + if (firstCollection != null) { + // push to next merge level + remainingCollections.add(firstCollection); + firstCollection = null; + } + if (remainingCollections.size() > 1) { + collections = remainingCollections; + remainingCollections = Lists.newArrayList(); + } else { + collections = Lists.newArrayList(); + } + } + } + + static <T> PCollection<T> intermediateCollection(PCollection<T> input, Coder<T> outputCoder) { + PCollection<T> output = PCollection.createPrimitiveOutputInternal(input.getPipeline(), + input.getWindowingStrategy(), input.isBounded()); + output.setCoder(outputCoder); + return output; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5553c603/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/GroupByKeyTranslator.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/GroupByKeyTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/GroupByKeyTranslator.java new file mode 100644 index 0000000..47d447a --- /dev/null +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/GroupByKeyTranslator.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.apex.translation; + +import org.apache.beam.runners.apex.translation.operators.ApexGroupByKeyOperator; +import org.apache.beam.sdk.transforms.GroupByKey; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; + +/** + * {@link GroupByKey} translation to Apex operator. + */ +class GroupByKeyTranslator<K, V> implements TransformTranslator<GroupByKey<K, V>> { + private static final long serialVersionUID = 1L; + + @Override + public void translate(GroupByKey<K, V> transform, TranslationContext context) { + PCollection<KV<K, V>> input = context.getInput(); + ApexGroupByKeyOperator<K, V> group = new ApexGroupByKeyOperator<>(context.getPipelineOptions(), + input, context.<K>stateInternalsFactory() + ); + context.addOperator(group, group.output); + context.addStream(input, group.input); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5553c603/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoBoundMultiTranslator.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoBoundMultiTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoBoundMultiTranslator.java new file mode 100644 index 0000000..7c91b91 --- /dev/null +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoBoundMultiTranslator.java @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.apex.translation; + +import static com.google.common.base.Preconditions.checkArgument; + +import com.datatorrent.api.Operator; +import com.datatorrent.api.Operator.OutputPort; +import com.google.common.collect.Maps; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.beam.runners.apex.translation.operators.ApexParDoOperator; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.OldDoFn; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder; +import org.apache.beam.sdk.util.WindowedValue.WindowedValueCoder; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionTuple; +import org.apache.beam.sdk.values.PCollectionView; +import org.apache.beam.sdk.values.TupleTag; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * {@link ParDo.BoundMulti} is translated to {@link ApexParDoOperator} that wraps the {@link DoFn}. + */ +class ParDoBoundMultiTranslator<InputT, OutputT> + implements TransformTranslator<ParDo.BoundMulti<InputT, OutputT>> { + private static final long serialVersionUID = 1L; + private static final Logger LOG = LoggerFactory.getLogger(ParDoBoundMultiTranslator.class); + + @Override + public void translate(ParDo.BoundMulti<InputT, OutputT> transform, TranslationContext context) { + OldDoFn<InputT, OutputT> doFn = transform.getFn(); + PCollectionTuple output = context.getOutput(); + PCollection<InputT> input = context.getInput(); + List<PCollectionView<?>> sideInputs = transform.getSideInputs(); + Coder<InputT> inputCoder = input.getCoder(); + WindowedValueCoder<InputT> wvInputCoder = FullWindowedValueCoder.of(inputCoder, + input.getWindowingStrategy().getWindowFn().windowCoder()); + + ApexParDoOperator<InputT, OutputT> operator = new ApexParDoOperator<>( + context.getPipelineOptions(), + doFn, transform.getMainOutputTag(), transform.getSideOutputTags().getAll(), + context.<PCollection<?>>getInput().getWindowingStrategy(), sideInputs, wvInputCoder, + context.<Void>stateInternalsFactory() + ); + + Map<TupleTag<?>, PCollection<?>> outputs = output.getAll(); + Map<PCollection<?>, OutputPort<?>> ports = Maps.newHashMapWithExpectedSize(outputs.size()); + for (Map.Entry<TupleTag<?>, PCollection<?>> outputEntry : outputs.entrySet()) { + if (outputEntry.getKey() == transform.getMainOutputTag()) { + ports.put(outputEntry.getValue(), operator.output); + } else { + int portIndex = 0; + for (TupleTag<?> tag : transform.getSideOutputTags().getAll()) { + if (tag == outputEntry.getKey()) { + ports.put(outputEntry.getValue(), operator.sideOutputPorts[portIndex]); + break; + } + portIndex++; + } + } + } + context.addOperator(operator, ports); + context.addStream(context.getInput(), operator.input); + if (!sideInputs.isEmpty()) { + addSideInputs(operator, sideInputs, context); + } + } + + static void addSideInputs(ApexParDoOperator<?, ?> operator, List<PCollectionView<?>> sideInputs, + TranslationContext context) { + Operator.InputPort<?>[] sideInputPorts = {operator.sideInput1}; + if (sideInputs.size() > sideInputPorts.length) { + PCollection<?> unionCollection = unionSideInputs(sideInputs, context); + context.addStream(unionCollection, sideInputPorts[0]); + } else { + // the number of ports for side inputs is fixed and each port can only take one input. + for (int i = 0; i < sideInputs.size(); i++) { + context.addStream(context.getViewInput(sideInputs.get(i)), sideInputPorts[i]); + } + } + } + + private static PCollection<?> unionSideInputs(List<PCollectionView<?>> sideInputs, + TranslationContext context) { + checkArgument(sideInputs.size() > 1, "requires multiple side inputs"); + // flatten and assign union tag + List<PCollection<Object>> sourceCollections = new ArrayList<>(); + Map<PCollection<?>, Integer> unionTags = new HashMap<>(); + PCollection<Object> firstSideInput = context.getViewInput(sideInputs.get(0)); + for (int i = 0; i < sideInputs.size(); i++) { + PCollectionView<?> sideInput = sideInputs.get(i); + PCollection<?> sideInputCollection = context.getViewInput(sideInput); + if (!sideInputCollection.getWindowingStrategy().equals( + firstSideInput.getWindowingStrategy())) { + // TODO: check how to handle this in stream codec + //String msg = "Multiple side inputs with different window strategies."; + //throw new UnsupportedOperationException(msg); + LOG.warn("Side inputs union with different windowing strategies {} {}", + firstSideInput.getWindowingStrategy(), sideInputCollection.getWindowingStrategy()); + } + if (!sideInputCollection.getCoder().equals(firstSideInput.getCoder())) { + String msg = "Multiple side inputs with different coders."; + throw new UnsupportedOperationException(msg); + } + sourceCollections.add(context.<PCollection<Object>>getViewInput(sideInput)); + unionTags.put(sideInputCollection, i); + } + + PCollection<Object> resultCollection = FlattenPCollectionTranslator.intermediateCollection( + firstSideInput, firstSideInput.getCoder()); + FlattenPCollectionTranslator.flattenCollections(sourceCollections, unionTags, resultCollection, + context); + return resultCollection; + + } + +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5553c603/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoBoundTranslator.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoBoundTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoBoundTranslator.java new file mode 100644 index 0000000..c1ebbd5 --- /dev/null +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoBoundTranslator.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.apex.translation; + +import java.util.List; + +import org.apache.beam.runners.apex.translation.operators.ApexParDoOperator; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.OldDoFn; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder; +import org.apache.beam.sdk.util.WindowedValue.WindowedValueCoder; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionView; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.TupleTagList; + +/** + * {@link ParDo.Bound} is translated to {link ApexParDoOperator} that wraps the {@link DoFn}. + */ +class ParDoBoundTranslator<InputT, OutputT> implements + TransformTranslator<ParDo.Bound<InputT, OutputT>> { + private static final long serialVersionUID = 1L; + + @Override + public void translate(ParDo.Bound<InputT, OutputT> transform, TranslationContext context) { + OldDoFn<InputT, OutputT> doFn = transform.getFn(); + PCollection<OutputT> output = context.getOutput(); + PCollection<InputT> input = context.getInput(); + List<PCollectionView<?>> sideInputs = transform.getSideInputs(); + Coder<InputT> inputCoder = input.getCoder(); + WindowedValueCoder<InputT> wvInputCoder = FullWindowedValueCoder.of(inputCoder, + input.getWindowingStrategy().getWindowFn().windowCoder()); + + ApexParDoOperator<InputT, OutputT> operator = new ApexParDoOperator<>( + context.getPipelineOptions(), + doFn, new TupleTag<OutputT>(), TupleTagList.empty().getAll() /*sideOutputTags*/, + output.getWindowingStrategy(), sideInputs, wvInputCoder, + context.<Void>stateInternalsFactory() + ); + context.addOperator(operator, operator.output); + context.addStream(context.getInput(), operator.input); + if (!sideInputs.isEmpty()) { + ParDoBoundMultiTranslator.addSideInputs(operator, sideInputs, context); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5553c603/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ReadUnboundedTranslator.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ReadUnboundedTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ReadUnboundedTranslator.java new file mode 100644 index 0000000..b3034ac --- /dev/null +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ReadUnboundedTranslator.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.apex.translation; + +import com.datatorrent.api.InputOperator; + +import org.apache.beam.runners.apex.translation.operators.ApexReadUnboundedInputOperator; +import org.apache.beam.sdk.io.Read; +import org.apache.beam.sdk.io.UnboundedSource; + +/** + * {@link Read.Unbounded} is translated to Apex {@link InputOperator} + * that wraps {@link UnboundedSource}. + */ +class ReadUnboundedTranslator<T> implements TransformTranslator<Read.Unbounded<T>> { + private static final long serialVersionUID = 1L; + + @Override + public void translate(Read.Unbounded<T> transform, TranslationContext context) { + UnboundedSource<T, ?> unboundedSource = transform.getSource(); + ApexReadUnboundedInputOperator<T, ?> operator = new ApexReadUnboundedInputOperator<>( + unboundedSource, context.getPipelineOptions()); + context.addOperator(operator, operator.output); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5553c603/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/TransformTranslator.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/TransformTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/TransformTranslator.java new file mode 100644 index 0000000..eb81052 --- /dev/null +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/TransformTranslator.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.apex.translation; + + +import java.io.Serializable; + +import org.apache.beam.sdk.transforms.PTransform; + +/** + * Translates {@link PTransform} to Apex functions. + */ +interface TransformTranslator<T extends PTransform<?, ?>> extends Serializable { + void translate(T transform, TranslationContext context); +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5553c603/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/TranslationContext.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/TranslationContext.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/TranslationContext.java new file mode 100644 index 0000000..e016730 --- /dev/null +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/TranslationContext.java @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.apex.translation; + +import static com.google.common.base.Preconditions.checkArgument; + +import com.datatorrent.api.Context.PortContext; +import com.datatorrent.api.DAG; +import com.datatorrent.api.Operator; +import com.datatorrent.api.Operator.InputPort; +import com.datatorrent.api.Operator.OutputPort; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.beam.runners.apex.ApexPipelineOptions; +import org.apache.beam.runners.apex.translation.utils.ApexStateInternals; +import org.apache.beam.runners.apex.translation.utils.ApexStreamTuple; +import org.apache.beam.runners.apex.translation.utils.CoderAdapterStreamCodec; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.runners.TransformTreeNode; +import org.apache.beam.sdk.transforms.AppliedPTransform; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder; +import org.apache.beam.sdk.util.state.StateInternalsFactory; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionView; +import org.apache.beam.sdk.values.PInput; +import org.apache.beam.sdk.values.POutput; +import org.apache.commons.lang3.tuple.ImmutablePair; +import org.apache.commons.lang3.tuple.Pair; + +/** + * Maintains context data for {@link TransformTranslator}s. + */ +@SuppressWarnings({"rawtypes", "unchecked"}) +class TranslationContext { + + private final ApexPipelineOptions pipelineOptions; + private AppliedPTransform<?, ?, ?> currentTransform; + private final Map<PCollection, Pair<OutputPort<?>, List<InputPort<?>>>> streams = new HashMap<>(); + private final Map<String, Operator> operators = new HashMap<>(); + private final Map<PCollectionView<?>, PInput> viewInputs = new HashMap<>(); + + public void addView(PCollectionView<?> view) { + this.viewInputs.put(view, this.getInput()); + } + + public <InputT extends PInput> InputT getViewInput(PCollectionView<?> view) { + PInput input = this.viewInputs.get(view); + checkArgument(input != null, "unknown view " + view.getName()); + return (InputT) input; + } + + TranslationContext(ApexPipelineOptions pipelineOptions) { + this.pipelineOptions = pipelineOptions; + } + + public void setCurrentTransform(TransformTreeNode treeNode) { + this.currentTransform = AppliedPTransform.of(treeNode.getFullName(), + treeNode.getInput(), treeNode.getOutput(), (PTransform) treeNode.getTransform()); + } + + public ApexPipelineOptions getPipelineOptions() { + return pipelineOptions; + } + + public <InputT extends PInput> InputT getInput() { + return (InputT) getCurrentTransform().getInput(); + } + + public <OutputT extends POutput> OutputT getOutput() { + return (OutputT) getCurrentTransform().getOutput(); + } + + private AppliedPTransform<?, ?, ?> getCurrentTransform() { + checkArgument(currentTransform != null, "current transform not set"); + return currentTransform; + } + + public void addOperator(Operator operator, OutputPort port) { + addOperator(operator, port, this.<PCollection<?>>getOutput()); + } + + /** + * Register operator and output ports for the given collections. + * @param operator + * @param ports + */ + public void addOperator(Operator operator, Map<PCollection<?>, OutputPort<?>> ports) { + boolean first = true; + for (Map.Entry<PCollection<?>, OutputPort<?>> portEntry : ports.entrySet()) { + if (first) { + addOperator(operator, portEntry.getValue(), portEntry.getKey()); + first = false; + } else { + this.streams.put(portEntry.getKey(), (Pair) new ImmutablePair<>(portEntry.getValue(), + new ArrayList<>())); + } + } + } + + /** + * Add the operator with its output port for the given result {link PCollection}. + * @param operator + * @param port + * @param output + */ + public void addOperator(Operator operator, OutputPort port, PCollection output) { + // Apex DAG requires a unique operator name + // use the transform's name and make it unique + String name = getCurrentTransform().getFullName(); + for (int i = 1; this.operators.containsKey(name); i++) { + name = getCurrentTransform().getFullName() + i; + } + this.operators.put(name, operator); + this.streams.put(output, (Pair) new ImmutablePair<>(port, new ArrayList<>())); + } + + public void addStream(PInput input, InputPort inputPort) { + Pair<OutputPort<?>, List<InputPort<?>>> stream = this.streams.get(input); + checkArgument(stream != null, "no upstream operator defined for %s", input); + stream.getRight().add(inputPort); + } + + public void populateDAG(DAG dag) { + for (Map.Entry<String, Operator> nameAndOperator : this.operators.entrySet()) { + dag.addOperator(nameAndOperator.getKey(), nameAndOperator.getValue()); + } + int streamIndex = 0; + for (Map.Entry<PCollection, Pair<OutputPort<?>, List<InputPort<?>>>> streamEntry : this. + streams.entrySet()) { + List<InputPort<?>> sinksList = streamEntry.getValue().getRight(); + InputPort[] sinks = sinksList.toArray(new InputPort[sinksList.size()]); + if (sinks.length > 0) { + dag.addStream("stream" + streamIndex++, streamEntry.getValue().getLeft(), sinks); + for (InputPort port : sinks) { + PCollection pc = streamEntry.getKey(); + Coder coder = pc.getCoder(); + if (pc.getWindowingStrategy() != null) { + coder = FullWindowedValueCoder.of(pc.getCoder(), + pc.getWindowingStrategy().getWindowFn().windowCoder() + ); + } + Coder<Object> wrapperCoder = ApexStreamTuple.ApexStreamTupleCoder.of(coder); + CoderAdapterStreamCodec streamCodec = new CoderAdapterStreamCodec(wrapperCoder); + dag.setInputPortAttribute(port, PortContext.STREAM_CODEC, streamCodec); + } + } + } + } + + /** + * Return the {@link StateInternalsFactory} for the pipeline translation. + * @return + */ + public <K> StateInternalsFactory<K> stateInternalsFactory() { + return new ApexStateInternals.ApexStateInternalsFactory(); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5553c603/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexFlattenOperator.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexFlattenOperator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexFlattenOperator.java new file mode 100644 index 0000000..3d9db51 --- /dev/null +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexFlattenOperator.java @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.apex.translation.operators; + +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.annotation.OutputPortFieldAnnotation; +import com.datatorrent.common.util.BaseOperator; + +import org.apache.beam.runners.apex.translation.utils.ApexStreamTuple; +import org.apache.beam.runners.apex.translation.utils.ApexStreamTuple.WatermarkTuple; +import org.apache.beam.sdk.util.WindowedValue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Apex operator for Beam {@link org.apache.beam.sdk.transforms.Flatten.FlattenPCollectionList}. + */ +public class ApexFlattenOperator<InputT> extends BaseOperator { + + private static final Logger LOG = LoggerFactory.getLogger(ApexFlattenOperator.class); + private boolean traceTuples = false; + + private long inputWM1; + private long inputWM2; + private long outputWM; + + public int data1Tag; + public int data2Tag; + + /** + * Data input port 1. + */ + public final transient DefaultInputPort<ApexStreamTuple<WindowedValue<InputT>>> data1 = + new DefaultInputPort<ApexStreamTuple<WindowedValue<InputT>>>() { + /** + * Emits to port "out" + */ + @Override + public void process(ApexStreamTuple<WindowedValue<InputT>> tuple) { + if (tuple instanceof WatermarkTuple) { + WatermarkTuple<?> wmTuple = (WatermarkTuple<?>) tuple; + if (wmTuple.getTimestamp() > inputWM1) { + inputWM1 = wmTuple.getTimestamp(); + if (inputWM1 <= inputWM2) { + // move output watermark and emit it + outputWM = inputWM1; + if (traceTuples) { + LOG.debug("\nemitting watermark {}\n", outputWM); + } + out.emit(tuple); + } + } + return; + } + if (traceTuples) { + LOG.debug("\nemitting {}\n", tuple); + } + + if (data1Tag > 0 && tuple instanceof ApexStreamTuple.DataTuple) { + ((ApexStreamTuple.DataTuple<?>) tuple).setUnionTag(data1Tag); + } + out.emit(tuple); + } + }; + + /** + * Data input port 2. + */ + public final transient DefaultInputPort<ApexStreamTuple<WindowedValue<InputT>>> data2 = + new DefaultInputPort<ApexStreamTuple<WindowedValue<InputT>>>() { + /** + * Emits to port "out" + */ + @Override + public void process(ApexStreamTuple<WindowedValue<InputT>> tuple) { + if (tuple instanceof WatermarkTuple) { + WatermarkTuple<?> wmTuple = (WatermarkTuple<?>) tuple; + if (wmTuple.getTimestamp() > inputWM2) { + inputWM2 = wmTuple.getTimestamp(); + if (inputWM2 <= inputWM1) { + // move output watermark and emit it + outputWM = inputWM2; + if (traceTuples) { + LOG.debug("\nemitting watermark {}\n", outputWM); + } + out.emit(tuple); + } + } + return; + } + if (traceTuples) { + LOG.debug("\nemitting {}\n", tuple); + } + + if (data2Tag > 0 && tuple instanceof ApexStreamTuple.DataTuple) { + ((ApexStreamTuple.DataTuple<?>) tuple).setUnionTag(data2Tag); + } + out.emit(tuple); + } + }; + + /** + * Output port. + */ + @OutputPortFieldAnnotation(optional = true) + public final transient DefaultOutputPort<ApexStreamTuple<WindowedValue<InputT>>> out = + new DefaultOutputPort<>(); + +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5553c603/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java new file mode 100644 index 0000000..1b5e693 --- /dev/null +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java @@ -0,0 +1,478 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.apex.translation.operators; + +import static com.google.common.base.Preconditions.checkNotNull; + +import com.datatorrent.api.Context.OperatorContext; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.Operator; +import com.datatorrent.api.StreamCodec; +import com.datatorrent.api.annotation.OutputPortFieldAnnotation; +import com.esotericsoftware.kryo.serializers.FieldSerializer.Bind; +import com.esotericsoftware.kryo.serializers.JavaSerializer; +import com.google.common.base.Throwables; +import com.google.common.collect.HashMultimap; +import com.google.common.collect.Multimap; + +import java.io.IOException; +import java.io.Serializable; +import java.nio.ByteBuffer; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Map; +import java.util.Set; + +import org.apache.beam.runners.apex.ApexPipelineOptions; +import org.apache.beam.runners.apex.translation.utils.ApexStreamTuple; +import org.apache.beam.runners.apex.translation.utils.SerializablePipelineOptions; +import org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetDoFn; +import org.apache.beam.runners.core.SystemReduceFn; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderException; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.transforms.Aggregator; +import org.apache.beam.sdk.transforms.Combine; +import org.apache.beam.sdk.transforms.GroupByKey; +import org.apache.beam.sdk.transforms.OldDoFn; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.util.CoderUtils; +import org.apache.beam.sdk.util.KeyedWorkItem; +import org.apache.beam.sdk.util.KeyedWorkItems; +import org.apache.beam.sdk.util.TimeDomain; +import org.apache.beam.sdk.util.TimerInternals; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.util.WindowingInternals; +import org.apache.beam.sdk.util.WindowingStrategy; +import org.apache.beam.sdk.util.state.StateInternals; +import org.apache.beam.sdk.util.state.StateInternalsFactory; +import org.apache.beam.sdk.util.state.StateNamespace; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionView; +import org.apache.beam.sdk.values.TupleTag; +import org.joda.time.Instant; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Apex operator for Beam {@link GroupByKey}. + * This operator expects the input stream already partitioned by K, + * which is determined by the {@link StreamCodec} on the input port. + * + * @param <K> key type + * @param <V> value type + */ +public class ApexGroupByKeyOperator<K, V> implements Operator { + private static final Logger LOG = LoggerFactory.getLogger(ApexGroupByKeyOperator.class); + private boolean traceTuples = true; + + @Bind(JavaSerializer.class) + private WindowingStrategy<V, BoundedWindow> windowingStrategy; + @Bind(JavaSerializer.class) + private Coder<K> keyCoder; + @Bind(JavaSerializer.class) + private Coder<V> valueCoder; + + @Bind(JavaSerializer.class) + private final SerializablePipelineOptions serializedOptions; + @Bind(JavaSerializer.class) + private final StateInternalsFactory<K> stateInternalsFactory; + private Map<ByteBuffer, StateInternals<K>> perKeyStateInternals = new HashMap<>(); + private Map<ByteBuffer, Set<TimerInternals.TimerData>> activeTimers = new HashMap<>(); + + private transient ProcessContext context; + private transient OldDoFn<KeyedWorkItem<K, V>, KV<K, Iterable<V>>> fn; + private transient ApexTimerInternals timerInternals = new ApexTimerInternals(); + private Instant inputWatermark = BoundedWindow.TIMESTAMP_MIN_VALUE; + + public final transient DefaultInputPort<ApexStreamTuple<WindowedValue<KV<K, V>>>> input = + new DefaultInputPort<ApexStreamTuple<WindowedValue<KV<K, V>>>>() { + @Override + public void process(ApexStreamTuple<WindowedValue<KV<K, V>>> t) { + try { + if (t instanceof ApexStreamTuple.WatermarkTuple) { + ApexStreamTuple.WatermarkTuple<?> mark = (ApexStreamTuple.WatermarkTuple<?>) t; + processWatermark(mark); + if (traceTuples) { + LOG.debug("\nemitting watermark {}\n", mark.getTimestamp()); + } + output.emit(ApexStreamTuple.WatermarkTuple.<WindowedValue<KV<K, Iterable<V>>>>of( + mark.getTimestamp())); + return; + } + if (traceTuples) { + LOG.debug("\ninput {}\n", t.getValue()); + } + processElement(t.getValue()); + } catch (Exception e) { + Throwables.propagateIfPossible(e); + throw new RuntimeException(e); + } + } + }; + + @OutputPortFieldAnnotation(optional = true) + public final transient DefaultOutputPort<ApexStreamTuple<WindowedValue<KV<K, Iterable<V>>>>> + output = new DefaultOutputPort<>(); + + @SuppressWarnings("unchecked") + public ApexGroupByKeyOperator(ApexPipelineOptions pipelineOptions, PCollection<KV<K, V>> input, + StateInternalsFactory<K> stateInternalsFactory) { + checkNotNull(pipelineOptions); + this.serializedOptions = new SerializablePipelineOptions(pipelineOptions); + this.windowingStrategy = (WindowingStrategy<V, BoundedWindow>) input.getWindowingStrategy(); + this.keyCoder = ((KvCoder<K, V>) input.getCoder()).getKeyCoder(); + this.valueCoder = ((KvCoder<K, V>) input.getCoder()).getValueCoder(); + this.stateInternalsFactory = stateInternalsFactory; + } + + @SuppressWarnings("unused") // for Kryo + private ApexGroupByKeyOperator() { + this.serializedOptions = null; + this.stateInternalsFactory = null; + } + + @Override + public void beginWindow(long l) { + } + + @Override + public void endWindow() { + } + + @Override + public void setup(OperatorContext context) { + this.traceTuples = ApexStreamTuple.Logging.isDebugEnabled(serializedOptions.get(), this); + StateInternalsFactory<K> stateInternalsFactory = new GroupByKeyStateInternalsFactory(); + this.fn = GroupAlsoByWindowViaWindowSetDoFn.create(this.windowingStrategy, + stateInternalsFactory, SystemReduceFn.<K, V, BoundedWindow>buffering(this.valueCoder)); + this.context = new ProcessContext(fn, this.timerInternals); + } + + @Override + public void teardown() { + } + + /** + * Returns the list of timers that are ready to fire. These are the timers + * that are registered to be triggered at a time before the current watermark. + * We keep these timers in a Set, so that they are deduplicated, as the same + * timer can be registered multiple times. + */ + private Multimap<ByteBuffer, TimerInternals.TimerData> getTimersReadyToProcess( + long currentWatermark) { + + // we keep the timers to return in a different list and launch them later + // because we cannot prevent a trigger from registering another trigger, + // which would lead to concurrent modification exception. + Multimap<ByteBuffer, TimerInternals.TimerData> toFire = HashMultimap.create(); + + Iterator<Map.Entry<ByteBuffer, Set<TimerInternals.TimerData>>> it = + activeTimers.entrySet().iterator(); + while (it.hasNext()) { + Map.Entry<ByteBuffer, Set<TimerInternals.TimerData>> keyWithTimers = it.next(); + + Iterator<TimerInternals.TimerData> timerIt = keyWithTimers.getValue().iterator(); + while (timerIt.hasNext()) { + TimerInternals.TimerData timerData = timerIt.next(); + if (timerData.getTimestamp().isBefore(currentWatermark)) { + toFire.put(keyWithTimers.getKey(), timerData); + timerIt.remove(); + } + } + + if (keyWithTimers.getValue().isEmpty()) { + it.remove(); + } + } + return toFire; + } + + private void processElement(WindowedValue<KV<K, V>> windowedValue) throws Exception { + final KV<K, V> kv = windowedValue.getValue(); + final WindowedValue<V> updatedWindowedValue = WindowedValue.of(kv.getValue(), + windowedValue.getTimestamp(), + windowedValue.getWindows(), + windowedValue.getPane()); + + KeyedWorkItem<K, V> kwi = KeyedWorkItems.elementsWorkItem( + kv.getKey(), + Collections.singletonList(updatedWindowedValue)); + + context.setElement(kwi, getStateInternalsForKey(kwi.key())); + fn.processElement(context); + } + + private StateInternals<K> getStateInternalsForKey(K key) { + final ByteBuffer keyBytes; + try { + keyBytes = ByteBuffer.wrap(CoderUtils.encodeToByteArray(keyCoder, key)); + } catch (CoderException e) { + throw new RuntimeException(e); + } + StateInternals<K> stateInternals = perKeyStateInternals.get(keyBytes); + if (stateInternals == null) { + stateInternals = stateInternalsFactory.stateInternalsForKey(key); + perKeyStateInternals.put(keyBytes, stateInternals); + } + return stateInternals; + } + + private void registerActiveTimer(K key, TimerInternals.TimerData timer) { + final ByteBuffer keyBytes; + try { + keyBytes = ByteBuffer.wrap(CoderUtils.encodeToByteArray(keyCoder, key)); + } catch (CoderException e) { + throw new RuntimeException(e); + } + Set<TimerInternals.TimerData> timersForKey = activeTimers.get(keyBytes); + if (timersForKey == null) { + timersForKey = new HashSet<>(); + } + timersForKey.add(timer); + activeTimers.put(keyBytes, timersForKey); + } + + private void unregisterActiveTimer(K key, TimerInternals.TimerData timer) { + final ByteBuffer keyBytes; + try { + keyBytes = ByteBuffer.wrap(CoderUtils.encodeToByteArray(keyCoder, key)); + } catch (CoderException e) { + throw new RuntimeException(e); + } + Set<TimerInternals.TimerData> timersForKey = activeTimers.get(keyBytes); + if (timersForKey != null) { + timersForKey.remove(timer); + if (timersForKey.isEmpty()) { + activeTimers.remove(keyBytes); + } else { + activeTimers.put(keyBytes, timersForKey); + } + } + } + + private void processWatermark(ApexStreamTuple.WatermarkTuple<?> mark) throws Exception { + this.inputWatermark = new Instant(mark.getTimestamp()); + Multimap<ByteBuffer, TimerInternals.TimerData> timers = getTimersReadyToProcess( + mark.getTimestamp()); + if (!timers.isEmpty()) { + for (ByteBuffer keyBytes : timers.keySet()) { + K key = CoderUtils.decodeFromByteArray(keyCoder, keyBytes.array()); + KeyedWorkItem<K, V> kwi = KeyedWorkItems.<K, V>timersWorkItem(key, timers.get(keyBytes)); + context.setElement(kwi, getStateInternalsForKey(kwi.key())); + fn.processElement(context); + } + } + } + + private class ProcessContext extends GroupAlsoByWindowViaWindowSetDoFn<K, V, Iterable<V>, ?, + KeyedWorkItem<K, V>>.ProcessContext { + + private final ApexTimerInternals timerInternals; + private StateInternals<K> stateInternals; + private KeyedWorkItem<K, V> element; + + public ProcessContext(OldDoFn<KeyedWorkItem<K, V>, KV<K, Iterable<V>>> function, + ApexTimerInternals timerInternals) { + function.super(); + this.timerInternals = checkNotNull(timerInternals); + } + + public void setElement(KeyedWorkItem<K, V> element, StateInternals<K> stateForKey) { + this.element = element; + this.stateInternals = stateForKey; + } + + @Override + public KeyedWorkItem<K, V> element() { + return this.element; + } + + @Override + public Instant timestamp() { + throw new UnsupportedOperationException( + "timestamp() is not available when processing KeyedWorkItems."); + } + + @Override + public PipelineOptions getPipelineOptions() { + return serializedOptions.get(); + } + + @Override + public void output(KV<K, Iterable<V>> output) { + throw new UnsupportedOperationException( + "output() is not available when processing KeyedWorkItems."); + } + + @Override + public void outputWithTimestamp(KV<K, Iterable<V>> output, Instant timestamp) { + throw new UnsupportedOperationException( + "outputWithTimestamp() is not available when processing KeyedWorkItems."); + } + + @Override + public PaneInfo pane() { + throw new UnsupportedOperationException( + "pane() is not available when processing KeyedWorkItems."); + } + + @Override + public BoundedWindow window() { + throw new UnsupportedOperationException( + "window() is not available when processing KeyedWorkItems."); + } + + @Override + public WindowingInternals<KeyedWorkItem<K, V>, KV<K, Iterable<V>>> windowingInternals() { + return new WindowingInternals<KeyedWorkItem<K, V>, KV<K, Iterable<V>>>() { + + @Override + public StateInternals<K> stateInternals() { + return stateInternals; + } + + @Override + public void outputWindowedValue(KV<K, Iterable<V>> output, Instant timestamp, + Collection<? extends BoundedWindow> windows, PaneInfo pane) { + if (traceTuples) { + LOG.debug("\nemitting {} timestamp {}\n", output, timestamp); + } + ApexGroupByKeyOperator.this.output.emit(ApexStreamTuple.DataTuple.of( + WindowedValue.of(output, timestamp, windows, pane))); + } + + @Override + public TimerInternals timerInternals() { + return timerInternals; + } + + @Override + public Collection<? extends BoundedWindow> windows() { + throw new UnsupportedOperationException("windows() is not available in Streaming mode."); + } + + @Override + public PaneInfo pane() { + throw new UnsupportedOperationException("pane() is not available in Streaming mode."); + } + + @Override + public <T> void writePCollectionViewData(TupleTag<?> tag, Iterable<WindowedValue<T>> data, + Coder<T> elemCoder) throws IOException { + throw new RuntimeException("writePCollectionViewData() not available in Streaming mode."); + } + + @Override + public <T> T sideInput(PCollectionView<T> view, BoundedWindow mainInputWindow) { + throw new RuntimeException("sideInput() is not available in Streaming mode."); + } + }; + } + + @Override + public <T> T sideInput(PCollectionView<T> view) { + throw new RuntimeException("sideInput() is not supported in Streaming mode."); + } + + @Override + public <T> void sideOutput(TupleTag<T> tag, T output) { + // ignore the side output, this can happen when a user does not register + // side outputs but then outputs using a freshly created TupleTag. + throw new RuntimeException("sideOutput() is not available when grouping by window."); + } + + @Override + public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) { + sideOutput(tag, output); + } + + @Override + protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> createAggregatorInternal( + String name, Combine.CombineFn<AggInputT, ?, AggOutputT> combiner) { + throw new UnsupportedOperationException(); + } + } + + /** + * An implementation of Beam's {@link TimerInternals}. + * + */ + public class ApexTimerInternals implements TimerInternals { + + @Override + public void setTimer(TimerData timerKey) { + registerActiveTimer(context.element().key(), timerKey); + } + + @Override + public void deleteTimer(TimerData timerKey) { + unregisterActiveTimer(context.element().key(), timerKey); + } + + @Override + public Instant currentProcessingTime() { + return Instant.now(); + } + + @Override + public Instant currentSynchronizedProcessingTime() { + // TODO Auto-generated method stub + return null; + } + + @Override + public Instant currentInputWatermarkTime() { + return inputWatermark; + } + + @Override + public Instant currentOutputWatermarkTime() { + // TODO Auto-generated method stub + return null; + } + + @Override + public void setTimer(StateNamespace namespace, String timerId, Instant target, + TimeDomain timeDomain) { + throw new UnsupportedOperationException("Setting timer by ID not yet supported."); + } + + @Override + public void deleteTimer(StateNamespace namespace, String timerId) { + throw new UnsupportedOperationException("Canceling of timer by ID is not yet supported."); + } + + } + + private class GroupByKeyStateInternalsFactory implements StateInternalsFactory<K>, Serializable { + private static final long serialVersionUID = 1L; + + @Override + public StateInternals<K> stateInternalsForKey(K key) { + return getStateInternalsForKey(key); + } + } +}