http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunnerRegistrar.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunnerRegistrar.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunnerRegistrar.java deleted file mode 100644 index 681459a..0000000 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunnerRegistrar.java +++ /dev/null @@ -1,62 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.runners.flink; - -import com.google.auto.service.AutoService; -import com.google.common.collect.ImmutableList; -import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.options.PipelineOptionsRegistrar; -import org.apache.beam.sdk.runners.PipelineRunner; -import org.apache.beam.sdk.runners.PipelineRunnerRegistrar; - - -/** - * AutoService registrar - will register FlinkRunner and FlinkOptions - * as possible pipeline runner services. - * - * <p>It ends up in META-INF/services and gets picked up by Beam. - * - */ -public class FlinkRunnerRegistrar { - private FlinkRunnerRegistrar() { } - - /** - * Pipeline runner registrar. - */ - @AutoService(PipelineRunnerRegistrar.class) - public static class Runner implements PipelineRunnerRegistrar { - @Override - public Iterable<Class<? extends PipelineRunner<?>>> getPipelineRunners() { - return ImmutableList.<Class<? extends PipelineRunner<?>>>of( - FlinkRunner.class, - TestFlinkRunner.class); - } - } - - /** - * Pipeline options registrar. - */ - @AutoService(PipelineOptionsRegistrar.class) - public static class Options implements PipelineOptionsRegistrar { - @Override - public Iterable<Class<? extends PipelineOptions>> getPipelineOptions() { - return ImmutableList.<Class<? extends PipelineOptions>>of(FlinkPipelineOptions.class); - } - } -}
http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunnerResult.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunnerResult.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunnerResult.java deleted file mode 100644 index 0682b56..0000000 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunnerResult.java +++ /dev/null @@ -1,98 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.flink; - -import java.io.IOException; -import java.util.Collections; -import java.util.Map; -import org.apache.beam.sdk.AggregatorRetrievalException; -import org.apache.beam.sdk.AggregatorValues; -import org.apache.beam.sdk.PipelineResult; -import org.apache.beam.sdk.metrics.MetricResults; -import org.apache.beam.sdk.transforms.Aggregator; -import org.joda.time.Duration; - -/** - * Result of executing a {@link org.apache.beam.sdk.Pipeline} with Flink. This - * has methods to query to job runtime and the final values of - * {@link org.apache.beam.sdk.transforms.Aggregator}s. - */ -public class FlinkRunnerResult implements PipelineResult { - - private final Map<String, Object> aggregators; - - private final long runtime; - - FlinkRunnerResult(Map<String, Object> aggregators, long runtime) { - this.aggregators = (aggregators == null || aggregators.isEmpty()) - ? Collections.<String, Object>emptyMap() - : Collections.unmodifiableMap(aggregators); - this.runtime = runtime; - } - - @Override - public State getState() { - return State.DONE; - } - - @Override - public <T> AggregatorValues<T> getAggregatorValues(final Aggregator<?, T> aggregator) - throws AggregatorRetrievalException { - // TODO provide a list of all accumulator step values - Object value = aggregators.get(aggregator.getName()); - if (value != null) { - return new AggregatorValues<T>() { - @Override - public Map<String, T> getValuesAtSteps() { - return (Map<String, T>) aggregators; - } - }; - } else { - throw new AggregatorRetrievalException("Accumulator results not found.", - new RuntimeException("Accumulator does not exist.")); - } - } - - @Override - public String toString() { - return "FlinkRunnerResult{" - + "aggregators=" + aggregators - + ", runtime=" + runtime - + '}'; - } - - @Override - public State cancel() throws IOException { - throw new UnsupportedOperationException("FlinkRunnerResult does not support cancel."); - } - - @Override - public State waitUntilFinish() { - return State.DONE; - } - - @Override - public State waitUntilFinish(Duration duration) { - return State.DONE; - } - - @Override - public MetricResults metrics() { - throw new UnsupportedOperationException("The FlinkRunner does not currently support metrics."); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java deleted file mode 100644 index 0459ef7..0000000 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java +++ /dev/null @@ -1,276 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.flink; - -import com.google.common.collect.ImmutableList; -import java.util.List; -import java.util.Map; -import org.apache.beam.runners.core.SplittableParDo; -import org.apache.beam.runners.core.construction.PTransformMatchers; -import org.apache.beam.runners.core.construction.PTransformReplacements; -import org.apache.beam.runners.core.construction.ReplacementOutputs; -import org.apache.beam.runners.core.construction.SingleInputOutputOverrideFactory; -import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.runners.PTransformOverride; -import org.apache.beam.sdk.runners.PTransformOverrideFactory; -import org.apache.beam.sdk.runners.TransformHierarchy; -import org.apache.beam.sdk.transforms.AppliedPTransform; -import org.apache.beam.sdk.transforms.Combine; -import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.transforms.ParDo.MultiOutput; -import org.apache.beam.sdk.transforms.View; -import org.apache.beam.sdk.util.InstanceBuilder; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PCollectionTuple; -import org.apache.beam.sdk.values.PValue; -import org.apache.beam.sdk.values.TupleTag; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * This is a {@link FlinkPipelineTranslator} for streaming jobs. Its role is to translate - * the user-provided {@link org.apache.beam.sdk.values.PCollection}-based job into a - * {@link org.apache.flink.streaming.api.datastream.DataStream} one. - * - */ -class FlinkStreamingPipelineTranslator extends FlinkPipelineTranslator { - - private static final Logger LOG = LoggerFactory.getLogger(FlinkStreamingPipelineTranslator.class); - - /** The necessary context in the case of a straming job. */ - private final FlinkStreamingTranslationContext streamingContext; - - private int depth = 0; - - private FlinkRunner flinkRunner; - - public FlinkStreamingPipelineTranslator( - FlinkRunner flinkRunner, - StreamExecutionEnvironment env, - PipelineOptions options) { - this.streamingContext = new FlinkStreamingTranslationContext(env, options); - this.flinkRunner = flinkRunner; - } - - @Override - public void translate(Pipeline pipeline) { - List<PTransformOverride> transformOverrides = - ImmutableList.<PTransformOverride>builder() - .add( - PTransformOverride.of( - PTransformMatchers.splittableParDoMulti(), - new SplittableParDoOverrideFactory())) - .add( - PTransformOverride.of( - PTransformMatchers.classEqualTo(View.AsIterable.class), - new ReflectiveOneToOneOverrideFactory( - FlinkStreamingViewOverrides.StreamingViewAsIterable.class, flinkRunner))) - .add( - PTransformOverride.of( - PTransformMatchers.classEqualTo(View.AsList.class), - new ReflectiveOneToOneOverrideFactory( - FlinkStreamingViewOverrides.StreamingViewAsList.class, flinkRunner))) - .add( - PTransformOverride.of( - PTransformMatchers.classEqualTo(View.AsMap.class), - new ReflectiveOneToOneOverrideFactory( - FlinkStreamingViewOverrides.StreamingViewAsMap.class, flinkRunner))) - .add( - PTransformOverride.of( - PTransformMatchers.classEqualTo(View.AsMultimap.class), - new ReflectiveOneToOneOverrideFactory( - FlinkStreamingViewOverrides.StreamingViewAsMultimap.class, flinkRunner))) - .add( - PTransformOverride.of( - PTransformMatchers.classEqualTo(View.AsSingleton.class), - new ReflectiveOneToOneOverrideFactory( - FlinkStreamingViewOverrides.StreamingViewAsSingleton.class, flinkRunner))) - // this has to be last since the ViewAsSingleton override - // can expand to a Combine.GloballyAsSingletonView - .add( - PTransformOverride.of( - PTransformMatchers.classEqualTo(Combine.GloballyAsSingletonView.class), - new ReflectiveOneToOneOverrideFactory( - FlinkStreamingViewOverrides.StreamingCombineGloballyAsSingletonView.class, - flinkRunner))) - .build(); - - pipeline.replaceAll(transformOverrides); - super.translate(pipeline); - } - - // -------------------------------------------------------------------------------------------- - // Pipeline Visitor Methods - // -------------------------------------------------------------------------------------------- - - @Override - public CompositeBehavior enterCompositeTransform(TransformHierarchy.Node node) { - LOG.info("{} enterCompositeTransform- {}", genSpaces(this.depth), node.getFullName()); - this.depth++; - - PTransform<?, ?> transform = node.getTransform(); - if (transform != null) { - StreamTransformTranslator<?> translator = - FlinkStreamingTransformTranslators.getTranslator(transform); - - if (translator != null && applyCanTranslate(transform, node, translator)) { - applyStreamingTransform(transform, node, translator); - LOG.info("{} translated- {}", genSpaces(this.depth), node.getFullName()); - return CompositeBehavior.DO_NOT_ENTER_TRANSFORM; - } - } - return CompositeBehavior.ENTER_TRANSFORM; - } - - @Override - public void leaveCompositeTransform(TransformHierarchy.Node node) { - this.depth--; - LOG.info("{} leaveCompositeTransform- {}", genSpaces(this.depth), node.getFullName()); - } - - @Override - public void visitPrimitiveTransform(TransformHierarchy.Node node) { - LOG.info("{} visitPrimitiveTransform- {}", genSpaces(this.depth), node.getFullName()); - // get the transformation corresponding to hte node we are - // currently visiting and translate it into its Flink alternative. - - PTransform<?, ?> transform = node.getTransform(); - StreamTransformTranslator<?> translator = - FlinkStreamingTransformTranslators.getTranslator(transform); - - if (translator == null || !applyCanTranslate(transform, node, translator)) { - LOG.info(node.getTransform().getClass().toString()); - throw new UnsupportedOperationException( - "The transform " + transform + " is currently not supported."); - } - applyStreamingTransform(transform, node, translator); - } - - @Override - public void visitValue(PValue value, TransformHierarchy.Node producer) { - // do nothing here - } - - private <T extends PTransform<?, ?>> void applyStreamingTransform( - PTransform<?, ?> transform, - TransformHierarchy.Node node, - StreamTransformTranslator<?> translator) { - - @SuppressWarnings("unchecked") - T typedTransform = (T) transform; - - @SuppressWarnings("unchecked") - StreamTransformTranslator<T> typedTranslator = (StreamTransformTranslator<T>) translator; - - // create the applied PTransform on the streamingContext - streamingContext.setCurrentTransform(node.toAppliedPTransform()); - typedTranslator.translateNode(typedTransform, streamingContext); - } - - private <T extends PTransform<?, ?>> boolean applyCanTranslate( - PTransform<?, ?> transform, - TransformHierarchy.Node node, - StreamTransformTranslator<?> translator) { - - @SuppressWarnings("unchecked") - T typedTransform = (T) transform; - - @SuppressWarnings("unchecked") - StreamTransformTranslator<T> typedTranslator = (StreamTransformTranslator<T>) translator; - - streamingContext.setCurrentTransform(node.toAppliedPTransform()); - - return typedTranslator.canTranslate(typedTransform, streamingContext); - } - - /** - * The interface that every Flink translator of a Beam operator should implement. - * This interface is for <b>streaming</b> jobs. For examples of such translators see - * {@link FlinkStreamingTransformTranslators}. - */ - abstract static class StreamTransformTranslator<T extends PTransform> { - - /** - * Translate the given transform. - */ - abstract void translateNode(T transform, FlinkStreamingTranslationContext context); - - /** - * Returns true iff this translator can translate the given transform. - */ - boolean canTranslate(T transform, FlinkStreamingTranslationContext context) { - return true; - } - } - - private static class ReflectiveOneToOneOverrideFactory< - InputT, OutputT, TransformT extends PTransform<PCollection<InputT>, PCollection<OutputT>>> - extends SingleInputOutputOverrideFactory< - PCollection<InputT>, PCollection<OutputT>, TransformT> { - private final Class<PTransform<PCollection<InputT>, PCollection<OutputT>>> replacement; - private final FlinkRunner runner; - - private ReflectiveOneToOneOverrideFactory( - Class<PTransform<PCollection<InputT>, PCollection<OutputT>>> replacement, - FlinkRunner runner) { - this.replacement = replacement; - this.runner = runner; - } - - @Override - public PTransformReplacement<PCollection<InputT>, PCollection<OutputT>> getReplacementTransform( - AppliedPTransform<PCollection<InputT>, PCollection<OutputT>, TransformT> transform) { - return PTransformReplacement.of( - PTransformReplacements.getSingletonMainInput(transform), - InstanceBuilder.ofType(replacement) - .withArg(FlinkRunner.class, runner) - .withArg( - (Class<PTransform<PCollection<InputT>, PCollection<OutputT>>>) - transform.getTransform().getClass(), - transform.getTransform()) - .build()); - } - } - - /** - * A {@link PTransformOverrideFactory} that overrides a <a - * href="https://s.apache.org/splittable-do-fn">Splittable DoFn</a> with {@link SplittableParDo}. - */ - static class SplittableParDoOverrideFactory<InputT, OutputT> - implements PTransformOverrideFactory< - PCollection<InputT>, PCollectionTuple, MultiOutput<InputT, OutputT>> { - @Override - public PTransformReplacement<PCollection<InputT>, PCollectionTuple> - getReplacementTransform( - AppliedPTransform< - PCollection<InputT>, PCollectionTuple, MultiOutput<InputT, OutputT>> - transform) { - return PTransformReplacement.of( - PTransformReplacements.getSingletonMainInput(transform), - new SplittableParDo<>(transform.getTransform())); - } - - @Override - public Map<PValue, ReplacementOutput> mapOutputs( - Map<TupleTag<?>, PValue> outputs, PCollectionTuple newOutput) { - return ReplacementOutputs.tagged(outputs, newOutput); - } - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java deleted file mode 100644 index 123d5e7..0000000 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java +++ /dev/null @@ -1,1044 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.runners.flink; - -import static com.google.common.base.Preconditions.checkArgument; - -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import org.apache.beam.runners.core.ElementAndRestriction; -import org.apache.beam.runners.core.KeyedWorkItem; -import org.apache.beam.runners.core.SplittableParDo; -import org.apache.beam.runners.core.SystemReduceFn; -import org.apache.beam.runners.flink.translation.functions.FlinkAssignWindows; -import org.apache.beam.runners.flink.translation.types.CoderTypeInformation; -import org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator; -import org.apache.beam.runners.flink.translation.wrappers.streaming.KvToByteBufferKeySelector; -import org.apache.beam.runners.flink.translation.wrappers.streaming.SingletonKeyedWorkItem; -import org.apache.beam.runners.flink.translation.wrappers.streaming.SingletonKeyedWorkItemCoder; -import org.apache.beam.runners.flink.translation.wrappers.streaming.SplittableDoFnOperator; -import org.apache.beam.runners.flink.translation.wrappers.streaming.WindowDoFnOperator; -import org.apache.beam.runners.flink.translation.wrappers.streaming.WorkItemKeySelector; -import org.apache.beam.runners.flink.translation.wrappers.streaming.io.BoundedSourceWrapper; -import org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.KvCoder; -import org.apache.beam.sdk.coders.StringUtf8Coder; -import org.apache.beam.sdk.coders.VoidCoder; -import org.apache.beam.sdk.io.Read; -import org.apache.beam.sdk.io.TextIO; -import org.apache.beam.sdk.transforms.Combine; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.Flatten; -import org.apache.beam.sdk.transforms.GroupByKey; -import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.transforms.join.RawUnionValue; -import org.apache.beam.sdk.transforms.join.UnionCoder; -import org.apache.beam.sdk.transforms.reflect.DoFnSignature; -import org.apache.beam.sdk.transforms.reflect.DoFnSignatures; -import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.transforms.windowing.GlobalWindow; -import org.apache.beam.sdk.transforms.windowing.Window; -import org.apache.beam.sdk.transforms.windowing.WindowFn; -import org.apache.beam.sdk.util.AppliedCombineFn; -import org.apache.beam.sdk.util.Reshuffle; -import org.apache.beam.sdk.util.WindowedValue; -import org.apache.beam.sdk.util.WindowingStrategy; -import org.apache.beam.sdk.values.KV; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PCollectionView; -import org.apache.beam.sdk.values.PValue; -import org.apache.beam.sdk.values.TupleTag; -import org.apache.flink.api.common.functions.FlatMapFunction; -import org.apache.flink.api.common.functions.MapFunction; -import org.apache.flink.api.common.functions.RichFlatMapFunction; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.core.fs.FileSystem; -import org.apache.flink.streaming.api.collector.selector.OutputSelector; -import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.datastream.DataStreamSink; -import org.apache.flink.streaming.api.datastream.DataStreamSource; -import org.apache.flink.streaming.api.datastream.KeyedStream; -import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; -import org.apache.flink.streaming.api.datastream.SplitStream; -import org.apache.flink.streaming.api.operators.OneInputStreamOperator; -import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; -import org.apache.flink.streaming.api.transformations.TwoInputTransformation; -import org.apache.flink.util.Collector; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * This class contains all the mappings between Beam and Flink - * <b>streaming</b> transformations. The {@link FlinkStreamingPipelineTranslator} - * traverses the Beam job and comes here to translate the encountered Beam transformations - * into Flink one, based on the mapping available in this class. - */ -class FlinkStreamingTransformTranslators { - - // -------------------------------------------------------------------------------------------- - // Transform Translator Registry - // -------------------------------------------------------------------------------------------- - - @SuppressWarnings("rawtypes") - private static final Map< - Class<? extends PTransform>, - FlinkStreamingPipelineTranslator.StreamTransformTranslator> TRANSLATORS = new HashMap<>(); - - // here you can find all the available translators. - static { - TRANSLATORS.put(Read.Bounded.class, new BoundedReadSourceTranslator()); - TRANSLATORS.put(Read.Unbounded.class, new UnboundedReadSourceTranslator()); - TRANSLATORS.put(TextIO.Write.Bound.class, new TextIOWriteBoundStreamingTranslator()); - - TRANSLATORS.put(ParDo.MultiOutput.class, new ParDoStreamingTranslator()); - TRANSLATORS.put( - SplittableParDo.ProcessElements.class, new SplittableProcessElementsStreamingTranslator()); - TRANSLATORS.put( - SplittableParDo.GBKIntoKeyedWorkItems.class, new GBKIntoKeyedWorkItemsTranslator()); - - - TRANSLATORS.put(Window.Assign.class, new WindowAssignTranslator()); - TRANSLATORS.put(Flatten.PCollections.class, new FlattenPCollectionTranslator()); - TRANSLATORS.put( - FlinkStreamingViewOverrides.CreateFlinkPCollectionView.class, - new CreateViewStreamingTranslator()); - - TRANSLATORS.put(Reshuffle.class, new ReshuffleTranslatorStreaming()); - TRANSLATORS.put(GroupByKey.class, new GroupByKeyTranslator()); - TRANSLATORS.put(Combine.PerKey.class, new CombinePerKeyTranslator()); - } - - public static FlinkStreamingPipelineTranslator.StreamTransformTranslator<?> getTranslator( - PTransform<?, ?> transform) { - return TRANSLATORS.get(transform.getClass()); - } - - // -------------------------------------------------------------------------------------------- - // Transformation Implementations - // -------------------------------------------------------------------------------------------- - - private static class TextIOWriteBoundStreamingTranslator - extends FlinkStreamingPipelineTranslator.StreamTransformTranslator<TextIO.Write.Bound> { - - private static final Logger LOG = - LoggerFactory.getLogger(TextIOWriteBoundStreamingTranslator.class); - - @Override - public void translateNode( - TextIO.Write.Bound transform, - FlinkStreamingTranslationContext context) { - PValue input = context.getInput(transform); - DataStream<WindowedValue<String>> inputDataStream = context.getInputDataStream(input); - - String filenamePrefix = transform.getFilenamePrefix(); - String filenameSuffix = transform.getFilenameSuffix(); - boolean needsValidation = transform.needsValidation(); - int numShards = transform.getNumShards(); - String shardNameTemplate = transform.getShardNameTemplate(); - - // TODO: Implement these. We need Flink support for this. - LOG.warn( - "Translation of TextIO.Write.needsValidation not yet supported. Is: {}.", - needsValidation); - LOG.warn( - "Translation of TextIO.Write.filenameSuffix not yet supported. Is: {}.", - filenameSuffix); - LOG.warn( - "Translation of TextIO.Write.shardNameTemplate not yet supported. Is: {}.", - shardNameTemplate); - - DataStream<String> dataSink = inputDataStream - .flatMap(new FlatMapFunction<WindowedValue<String>, String>() { - @Override - public void flatMap( - WindowedValue<String> value, - Collector<String> out) - throws Exception { - out.collect(value.getValue()); - } - }); - DataStreamSink<String> output = - dataSink.writeAsText(filenamePrefix, FileSystem.WriteMode.OVERWRITE); - - if (numShards > 0) { - output.setParallelism(numShards); - } - } - } - - private static class UnboundedReadSourceTranslator<T> - extends FlinkStreamingPipelineTranslator.StreamTransformTranslator<Read.Unbounded<T>> { - - @Override - public void translateNode( - Read.Unbounded<T> transform, - FlinkStreamingTranslationContext context) { - PCollection<T> output = context.getOutput(transform); - - TypeInformation<WindowedValue<T>> outputTypeInfo = - context.getTypeInfo(context.getOutput(transform)); - - DataStream<WindowedValue<T>> source; - try { - UnboundedSourceWrapper<T, ?> sourceWrapper = - new UnboundedSourceWrapper<>( - context.getPipelineOptions(), - transform.getSource(), - context.getExecutionEnvironment().getParallelism()); - source = context - .getExecutionEnvironment() - .addSource(sourceWrapper).name(transform.getName()).returns(outputTypeInfo); - } catch (Exception e) { - throw new RuntimeException( - "Error while translating UnboundedSource: " + transform.getSource(), e); - } - - context.setOutputDataStream(output, source); - } - } - - private static class BoundedReadSourceTranslator<T> - extends FlinkStreamingPipelineTranslator.StreamTransformTranslator<Read.Bounded<T>> { - - @Override - public void translateNode( - Read.Bounded<T> transform, - FlinkStreamingTranslationContext context) { - PCollection<T> output = context.getOutput(transform); - - TypeInformation<WindowedValue<T>> outputTypeInfo = - context.getTypeInfo(context.getOutput(transform)); - - - DataStream<WindowedValue<T>> source; - try { - BoundedSourceWrapper<T> sourceWrapper = - new BoundedSourceWrapper<>( - context.getPipelineOptions(), - transform.getSource(), - context.getExecutionEnvironment().getParallelism()); - source = context - .getExecutionEnvironment() - .addSource(sourceWrapper).name(transform.getName()).returns(outputTypeInfo); - } catch (Exception e) { - throw new RuntimeException( - "Error while translating BoundedSource: " + transform.getSource(), e); - } - - context.setOutputDataStream(output, source); - } - } - - /** - * Wraps each element in a {@link RawUnionValue} with the given tag id. - */ - private static class ToRawUnion<T> implements MapFunction<T, RawUnionValue> { - private final int intTag; - - public ToRawUnion(int intTag) { - this.intTag = intTag; - } - - @Override - public RawUnionValue map(T o) throws Exception { - return new RawUnionValue(intTag, o); - } - } - - private static Tuple2<Map<Integer, PCollectionView<?>>, DataStream<RawUnionValue>> - transformSideInputs( - Collection<PCollectionView<?>> sideInputs, - FlinkStreamingTranslationContext context) { - - // collect all side inputs - Map<TupleTag<?>, Integer> tagToIntMapping = new HashMap<>(); - Map<Integer, PCollectionView<?>> intToViewMapping = new HashMap<>(); - int count = 0; - for (PCollectionView<?> sideInput: sideInputs) { - TupleTag<?> tag = sideInput.getTagInternal(); - intToViewMapping.put(count, sideInput); - tagToIntMapping.put(tag, count); - count++; - Coder<Iterable<WindowedValue<?>>> coder = sideInput.getCoderInternal(); - } - - - List<Coder<?>> inputCoders = new ArrayList<>(); - for (PCollectionView<?> sideInput: sideInputs) { - DataStream<Object> sideInputStream = context.getInputDataStream(sideInput); - TypeInformation<Object> tpe = sideInputStream.getType(); - if (!(tpe instanceof CoderTypeInformation)) { - throw new IllegalStateException( - "Input Stream TypeInformation is no CoderTypeInformation."); - } - - Coder<?> coder = ((CoderTypeInformation) tpe).getCoder(); - inputCoders.add(coder); - } - - UnionCoder unionCoder = UnionCoder.of(inputCoders); - - CoderTypeInformation<RawUnionValue> unionTypeInformation = - new CoderTypeInformation<>(unionCoder); - - // transform each side input to RawUnionValue and union them - DataStream<RawUnionValue> sideInputUnion = null; - - for (PCollectionView<?> sideInput: sideInputs) { - TupleTag<?> tag = sideInput.getTagInternal(); - final int intTag = tagToIntMapping.get(tag); - DataStream<Object> sideInputStream = context.getInputDataStream(sideInput); - DataStream<RawUnionValue> unionValueStream = - sideInputStream.map(new ToRawUnion<>(intTag)).returns(unionTypeInformation); - - if (sideInputUnion == null) { - sideInputUnion = unionValueStream; - } else { - sideInputUnion = sideInputUnion.union(unionValueStream); - } - } - - if (sideInputUnion == null) { - throw new IllegalStateException("No unioned side inputs, this indicates a bug."); - } - - return new Tuple2<>(intToViewMapping, sideInputUnion); - } - - /** - * Helper for translating {@link ParDo.MultiOutput} and {@link SplittableParDo.ProcessElements}. - */ - static class ParDoTranslationHelper { - - interface DoFnOperatorFactory<InputT, OutputT> { - DoFnOperator<InputT, OutputT, RawUnionValue> createDoFnOperator( - DoFn<InputT, OutputT> doFn, - List<PCollectionView<?>> sideInputs, - TupleTag<OutputT> mainOutputTag, - List<TupleTag<?>> additionalOutputTags, - FlinkStreamingTranslationContext context, - WindowingStrategy<?, ?> windowingStrategy, - Map<TupleTag<?>, Integer> tagsToLabels, - Coder<WindowedValue<InputT>> inputCoder, - Coder keyCoder, - Map<Integer, PCollectionView<?>> transformedSideInputs); - } - - static <InputT, OutputT> void translateParDo( - String transformName, - DoFn<InputT, OutputT> doFn, - PCollection<InputT> input, - List<PCollectionView<?>> sideInputs, - Map<TupleTag<?>, PValue> outputs, - TupleTag<OutputT> mainOutputTag, - List<TupleTag<?>> additionalOutputTags, - FlinkStreamingTranslationContext context, - DoFnOperatorFactory<InputT, OutputT> doFnOperatorFactory) { - - // we assume that the transformation does not change the windowing strategy. - WindowingStrategy<?, ?> windowingStrategy = input.getWindowingStrategy(); - - Map<TupleTag<?>, Integer> tagsToLabels = - transformTupleTagsToLabels(mainOutputTag, outputs); - - SingleOutputStreamOperator<RawUnionValue> unionOutputStream; - - Coder<WindowedValue<InputT>> inputCoder = context.getCoder(input); - - DataStream<WindowedValue<InputT>> inputDataStream = context.getInputDataStream(input); - - Coder keyCoder = null; - boolean stateful = false; - DoFnSignature signature = DoFnSignatures.getSignature(doFn.getClass()); - if (signature.stateDeclarations().size() > 0 - || signature.timerDeclarations().size() > 0) { - // Based on the fact that the signature is stateful, DoFnSignatures ensures - // that it is also keyed - keyCoder = ((KvCoder) input.getCoder()).getKeyCoder(); - inputDataStream = inputDataStream.keyBy(new KvToByteBufferKeySelector(keyCoder)); - stateful = true; - } else if (doFn instanceof SplittableParDo.ProcessFn) { - // we know that it is keyed on String - keyCoder = StringUtf8Coder.of(); - stateful = true; - } - - if (sideInputs.isEmpty()) { - DoFnOperator<InputT, OutputT, RawUnionValue> doFnOperator = - doFnOperatorFactory.createDoFnOperator( - doFn, - sideInputs, - mainOutputTag, - additionalOutputTags, - context, - windowingStrategy, - tagsToLabels, - inputCoder, - keyCoder, - new HashMap<Integer, PCollectionView<?>>() /* side-input mapping */); - - UnionCoder outputUnionCoder = createUnionCoder(outputs); - - CoderTypeInformation<RawUnionValue> outputUnionTypeInformation = - new CoderTypeInformation<>(outputUnionCoder); - - unionOutputStream = inputDataStream - .transform(transformName, outputUnionTypeInformation, doFnOperator); - - } else { - Tuple2<Map<Integer, PCollectionView<?>>, DataStream<RawUnionValue>> transformedSideInputs = - transformSideInputs(sideInputs, context); - - DoFnOperator<InputT, OutputT, RawUnionValue> doFnOperator = - doFnOperatorFactory.createDoFnOperator( - doFn, - sideInputs, - mainOutputTag, - additionalOutputTags, - context, - windowingStrategy, - tagsToLabels, - inputCoder, - keyCoder, - transformedSideInputs.f0); - - UnionCoder outputUnionCoder = createUnionCoder(outputs); - - CoderTypeInformation<RawUnionValue> outputUnionTypeInformation = - new CoderTypeInformation<>(outputUnionCoder); - - if (stateful) { - // we have to manually contruct the two-input transform because we're not - // allowed to have only one input keyed, normally. - KeyedStream keyedStream = (KeyedStream<?, InputT>) inputDataStream; - TwoInputTransformation< - WindowedValue<KV<?, InputT>>, - RawUnionValue, - WindowedValue<OutputT>> rawFlinkTransform = new TwoInputTransformation( - keyedStream.getTransformation(), - transformedSideInputs.f1.broadcast().getTransformation(), - transformName, - (TwoInputStreamOperator) doFnOperator, - outputUnionTypeInformation, - keyedStream.getParallelism()); - - rawFlinkTransform.setStateKeyType(keyedStream.getKeyType()); - rawFlinkTransform.setStateKeySelectors(keyedStream.getKeySelector(), null); - - unionOutputStream = new SingleOutputStreamOperator( - keyedStream.getExecutionEnvironment(), - rawFlinkTransform) {}; // we have to cheat around the ctor being protected - - keyedStream.getExecutionEnvironment().addOperator(rawFlinkTransform); - - } else { - unionOutputStream = inputDataStream - .connect(transformedSideInputs.f1.broadcast()) - .transform(transformName, outputUnionTypeInformation, doFnOperator); - } - } - - SplitStream<RawUnionValue> splitStream = unionOutputStream - .split(new OutputSelector<RawUnionValue>() { - @Override - public Iterable<String> select(RawUnionValue value) { - return Collections.singletonList(Integer.toString(value.getUnionTag())); - } - }); - - for (Entry<TupleTag<?>, PValue> output : outputs.entrySet()) { - final int outputTag = tagsToLabels.get(output.getKey()); - - TypeInformation outputTypeInfo = context.getTypeInfo((PCollection<?>) output.getValue()); - - @SuppressWarnings("unchecked") - DataStream unwrapped = splitStream.select(String.valueOf(outputTag)) - .flatMap(new FlatMapFunction<RawUnionValue, Object>() { - @Override - public void flatMap(RawUnionValue value, Collector<Object> out) throws Exception { - out.collect(value.getValue()); - } - }).returns(outputTypeInfo); - - context.setOutputDataStream(output.getValue(), unwrapped); - } - } - - private static Map<TupleTag<?>, Integer> transformTupleTagsToLabels( - TupleTag<?> mainTag, - Map<TupleTag<?>, PValue> allTaggedValues) { - - Map<TupleTag<?>, Integer> tagToLabelMap = Maps.newHashMap(); - int count = 0; - tagToLabelMap.put(mainTag, count++); - for (TupleTag<?> key : allTaggedValues.keySet()) { - if (!tagToLabelMap.containsKey(key)) { - tagToLabelMap.put(key, count++); - } - } - return tagToLabelMap; - } - - private static UnionCoder createUnionCoder(Map<TupleTag<?>, PValue> taggedCollections) { - List<Coder<?>> outputCoders = Lists.newArrayList(); - for (PValue taggedColl : taggedCollections.values()) { - checkArgument( - taggedColl instanceof PCollection, - "A Union Coder can only be created for a Collection of Tagged %s. Got %s", - PCollection.class.getSimpleName(), - taggedColl.getClass().getSimpleName()); - PCollection<?> coll = (PCollection<?>) taggedColl; - WindowedValue.FullWindowedValueCoder<?> windowedValueCoder = - WindowedValue.getFullCoder( - coll.getCoder(), - coll.getWindowingStrategy().getWindowFn().windowCoder()); - outputCoders.add(windowedValueCoder); - } - return UnionCoder.of(outputCoders); - } - } - - private static class ParDoStreamingTranslator<InputT, OutputT> - extends FlinkStreamingPipelineTranslator.StreamTransformTranslator< - ParDo.MultiOutput<InputT, OutputT>> { - - @Override - public void translateNode( - ParDo.MultiOutput<InputT, OutputT> transform, - FlinkStreamingTranslationContext context) { - - ParDoTranslationHelper.translateParDo( - transform.getName(), - transform.getFn(), - (PCollection<InputT>) context.getInput(transform), - transform.getSideInputs(), - context.getOutputs(transform), - transform.getMainOutputTag(), - transform.getAdditionalOutputTags().getAll(), - context, - new ParDoTranslationHelper.DoFnOperatorFactory<InputT, OutputT>() { - @Override - public DoFnOperator<InputT, OutputT, RawUnionValue> createDoFnOperator( - DoFn<InputT, OutputT> doFn, - List<PCollectionView<?>> sideInputs, - TupleTag<OutputT> mainOutputTag, - List<TupleTag<?>> additionalOutputTags, - FlinkStreamingTranslationContext context, - WindowingStrategy<?, ?> windowingStrategy, - Map<TupleTag<?>, Integer> tagsToLabels, - Coder<WindowedValue<InputT>> inputCoder, - Coder keyCoder, - Map<Integer, PCollectionView<?>> transformedSideInputs) { - return new DoFnOperator<>( - doFn, - inputCoder, - mainOutputTag, - additionalOutputTags, - new DoFnOperator.MultiOutputOutputManagerFactory(tagsToLabels), - windowingStrategy, - transformedSideInputs, - sideInputs, - context.getPipelineOptions(), - keyCoder); - } - }); - } - } - - private static class SplittableProcessElementsStreamingTranslator< - InputT, OutputT, RestrictionT, TrackerT extends RestrictionTracker<RestrictionT>> - extends FlinkStreamingPipelineTranslator.StreamTransformTranslator< - SplittableParDo.ProcessElements<InputT, OutputT, RestrictionT, TrackerT>> { - - @Override - public void translateNode( - SplittableParDo.ProcessElements<InputT, OutputT, RestrictionT, TrackerT> transform, - FlinkStreamingTranslationContext context) { - - ParDoTranslationHelper.translateParDo( - transform.getName(), - transform.newProcessFn(transform.getFn()), - (PCollection<KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>>) - context.getInput(transform), - transform.getSideInputs(), - context.getOutputs(transform), - transform.getMainOutputTag(), - transform.getAdditionalOutputTags().getAll(), - context, - new ParDoTranslationHelper.DoFnOperatorFactory< - KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>, OutputT>() { - @Override - public DoFnOperator< - KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>, - OutputT, - RawUnionValue> createDoFnOperator( - DoFn< - KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>, - OutputT> doFn, - List<PCollectionView<?>> sideInputs, - TupleTag<OutputT> mainOutputTag, - List<TupleTag<?>> additionalOutputTags, - FlinkStreamingTranslationContext context, - WindowingStrategy<?, ?> windowingStrategy, - Map<TupleTag<?>, Integer> tagsToLabels, - Coder< - WindowedValue< - KeyedWorkItem< - String, - ElementAndRestriction<InputT, RestrictionT>>>> inputCoder, - Coder keyCoder, - Map<Integer, PCollectionView<?>> transformedSideInputs) { - return new SplittableDoFnOperator<>( - doFn, - inputCoder, - mainOutputTag, - additionalOutputTags, - new DoFnOperator.MultiOutputOutputManagerFactory(tagsToLabels), - windowingStrategy, - transformedSideInputs, - sideInputs, - context.getPipelineOptions(), - keyCoder); - } - }); - } - } - - private static class CreateViewStreamingTranslator<ElemT, ViewT> - extends FlinkStreamingPipelineTranslator.StreamTransformTranslator< - FlinkStreamingViewOverrides.CreateFlinkPCollectionView<ElemT, ViewT>> { - - @Override - public void translateNode( - FlinkStreamingViewOverrides.CreateFlinkPCollectionView<ElemT, ViewT> transform, - FlinkStreamingTranslationContext context) { - // just forward - DataStream<WindowedValue<List<ElemT>>> inputDataSet = - context.getInputDataStream(context.getInput(transform)); - - PCollectionView<ViewT> view = context.getOutput(transform); - - context.setOutputDataStream(view, inputDataSet); - } - } - - private static class WindowAssignTranslator<T> - extends FlinkStreamingPipelineTranslator.StreamTransformTranslator<Window.Assign<T>> { - - @Override - public void translateNode( - Window.Assign<T> transform, - FlinkStreamingTranslationContext context) { - - @SuppressWarnings("unchecked") - WindowingStrategy<T, BoundedWindow> windowingStrategy = - (WindowingStrategy<T, BoundedWindow>) - context.getOutput(transform).getWindowingStrategy(); - - TypeInformation<WindowedValue<T>> typeInfo = - context.getTypeInfo(context.getOutput(transform)); - - DataStream<WindowedValue<T>> inputDataStream = - context.getInputDataStream(context.getInput(transform)); - - WindowFn<T, ? extends BoundedWindow> windowFn = windowingStrategy.getWindowFn(); - - FlinkAssignWindows<T, ? extends BoundedWindow> assignWindowsFunction = - new FlinkAssignWindows<>(windowFn); - - SingleOutputStreamOperator<WindowedValue<T>> outputDataStream = inputDataStream - .flatMap(assignWindowsFunction) - .name(context.getOutput(transform).getName()) - .returns(typeInfo); - - context.setOutputDataStream(context.getOutput(transform), outputDataStream); - } - } - - private static class ReshuffleTranslatorStreaming<K, InputT> - extends FlinkStreamingPipelineTranslator.StreamTransformTranslator<Reshuffle<K, InputT>> { - - @Override - public void translateNode( - Reshuffle<K, InputT> transform, - FlinkStreamingTranslationContext context) { - - DataStream<WindowedValue<KV<K, InputT>>> inputDataSet = - context.getInputDataStream(context.getInput(transform)); - - context.setOutputDataStream(context.getOutput(transform), inputDataSet.rebalance()); - - } - } - - - private static class GroupByKeyTranslator<K, InputT> - extends FlinkStreamingPipelineTranslator.StreamTransformTranslator<GroupByKey<K, InputT>> { - - @Override - public void translateNode( - GroupByKey<K, InputT> transform, - FlinkStreamingTranslationContext context) { - - PCollection<KV<K, InputT>> input = context.getInput(transform); - - @SuppressWarnings("unchecked") - WindowingStrategy<?, BoundedWindow> windowingStrategy = - (WindowingStrategy<?, BoundedWindow>) input.getWindowingStrategy(); - - KvCoder<K, InputT> inputKvCoder = (KvCoder<K, InputT>) input.getCoder(); - - SingletonKeyedWorkItemCoder<K, InputT> workItemCoder = SingletonKeyedWorkItemCoder.of( - inputKvCoder.getKeyCoder(), - inputKvCoder.getValueCoder(), - input.getWindowingStrategy().getWindowFn().windowCoder()); - - DataStream<WindowedValue<KV<K, InputT>>> inputDataStream = context.getInputDataStream(input); - - WindowedValue. - FullWindowedValueCoder<SingletonKeyedWorkItem<K, InputT>> windowedWorkItemCoder = - WindowedValue.getFullCoder( - workItemCoder, - input.getWindowingStrategy().getWindowFn().windowCoder()); - - CoderTypeInformation<WindowedValue<SingletonKeyedWorkItem<K, InputT>>> workItemTypeInfo = - new CoderTypeInformation<>(windowedWorkItemCoder); - - DataStream<WindowedValue<SingletonKeyedWorkItem<K, InputT>>> workItemStream = - inputDataStream - .flatMap(new ToKeyedWorkItem<K, InputT>()) - .returns(workItemTypeInfo).name("ToKeyedWorkItem"); - - KeyedStream< - WindowedValue< - SingletonKeyedWorkItem<K, InputT>>, ByteBuffer> keyedWorkItemStream = workItemStream - .keyBy(new WorkItemKeySelector<K, InputT>(inputKvCoder.getKeyCoder())); - - SystemReduceFn<K, InputT, Iterable<InputT>, Iterable<InputT>, BoundedWindow> reduceFn = - SystemReduceFn.buffering(inputKvCoder.getValueCoder()); - - TypeInformation<WindowedValue<KV<K, Iterable<InputT>>>> outputTypeInfo = - context.getTypeInfo(context.getOutput(transform)); - - DoFnOperator.DefaultOutputManagerFactory< - WindowedValue<KV<K, Iterable<InputT>>>> outputManagerFactory = - new DoFnOperator.DefaultOutputManagerFactory<>(); - - WindowDoFnOperator<K, InputT, Iterable<InputT>> doFnOperator = - new WindowDoFnOperator<>( - reduceFn, - (Coder) windowedWorkItemCoder, - new TupleTag<KV<K, Iterable<InputT>>>("main output"), - Collections.<TupleTag<?>>emptyList(), - outputManagerFactory, - windowingStrategy, - new HashMap<Integer, PCollectionView<?>>(), /* side-input mapping */ - Collections.<PCollectionView<?>>emptyList(), /* side inputs */ - context.getPipelineOptions(), - inputKvCoder.getKeyCoder()); - - // our operator excepts WindowedValue<KeyedWorkItem> while our input stream - // is WindowedValue<SingletonKeyedWorkItem>, which is fine but Java doesn't like it ... - @SuppressWarnings("unchecked") - SingleOutputStreamOperator<WindowedValue<KV<K, Iterable<InputT>>>> outDataStream = - keyedWorkItemStream - .transform( - transform.getName(), - outputTypeInfo, - (OneInputStreamOperator) doFnOperator); - - context.setOutputDataStream(context.getOutput(transform), outDataStream); - - } - } - - private static class CombinePerKeyTranslator<K, InputT, OutputT> - extends FlinkStreamingPipelineTranslator.StreamTransformTranslator< - Combine.PerKey<K, InputT, OutputT>> { - - @Override - boolean canTranslate( - Combine.PerKey<K, InputT, OutputT> transform, - FlinkStreamingTranslationContext context) { - - // if we have a merging window strategy and side inputs we cannot - // translate as a proper combine. We have to group and then run the combine - // over the final grouped values. - PCollection<KV<K, InputT>> input = context.getInput(transform); - - @SuppressWarnings("unchecked") - WindowingStrategy<?, BoundedWindow> windowingStrategy = - (WindowingStrategy<?, BoundedWindow>) input.getWindowingStrategy(); - - return windowingStrategy.getWindowFn().isNonMerging() || transform.getSideInputs().isEmpty(); - } - - @Override - public void translateNode( - Combine.PerKey<K, InputT, OutputT> transform, - FlinkStreamingTranslationContext context) { - - PCollection<KV<K, InputT>> input = context.getInput(transform); - - @SuppressWarnings("unchecked") - WindowingStrategy<?, BoundedWindow> windowingStrategy = - (WindowingStrategy<?, BoundedWindow>) input.getWindowingStrategy(); - - KvCoder<K, InputT> inputKvCoder = (KvCoder<K, InputT>) input.getCoder(); - - SingletonKeyedWorkItemCoder<K, InputT> workItemCoder = SingletonKeyedWorkItemCoder.of( - inputKvCoder.getKeyCoder(), - inputKvCoder.getValueCoder(), - input.getWindowingStrategy().getWindowFn().windowCoder()); - - DataStream<WindowedValue<KV<K, InputT>>> inputDataStream = context.getInputDataStream(input); - - WindowedValue. - FullWindowedValueCoder<SingletonKeyedWorkItem<K, InputT>> windowedWorkItemCoder = - WindowedValue.getFullCoder( - workItemCoder, - input.getWindowingStrategy().getWindowFn().windowCoder()); - - CoderTypeInformation<WindowedValue<SingletonKeyedWorkItem<K, InputT>>> workItemTypeInfo = - new CoderTypeInformation<>(windowedWorkItemCoder); - - DataStream<WindowedValue<SingletonKeyedWorkItem<K, InputT>>> workItemStream = - inputDataStream - .flatMap(new ToKeyedWorkItem<K, InputT>()) - .returns(workItemTypeInfo).name("ToKeyedWorkItem"); - - KeyedStream< - WindowedValue< - SingletonKeyedWorkItem<K, InputT>>, ByteBuffer> keyedWorkItemStream = workItemStream - .keyBy(new WorkItemKeySelector<K, InputT>(inputKvCoder.getKeyCoder())); - - SystemReduceFn<K, InputT, ?, OutputT, BoundedWindow> reduceFn = SystemReduceFn.combining( - inputKvCoder.getKeyCoder(), - AppliedCombineFn.withInputCoder( - transform.getFn(), input.getPipeline().getCoderRegistry(), inputKvCoder)); - - TypeInformation<WindowedValue<KV<K, OutputT>>> outputTypeInfo = - context.getTypeInfo(context.getOutput(transform)); - - List<PCollectionView<?>> sideInputs = transform.getSideInputs(); - - if (sideInputs.isEmpty()) { - - WindowDoFnOperator<K, InputT, OutputT> doFnOperator = - new WindowDoFnOperator<>( - reduceFn, - (Coder) windowedWorkItemCoder, - new TupleTag<KV<K, OutputT>>("main output"), - Collections.<TupleTag<?>>emptyList(), - new DoFnOperator.DefaultOutputManagerFactory<WindowedValue<KV<K, OutputT>>>(), - windowingStrategy, - new HashMap<Integer, PCollectionView<?>>(), /* side-input mapping */ - Collections.<PCollectionView<?>>emptyList(), /* side inputs */ - context.getPipelineOptions(), - inputKvCoder.getKeyCoder()); - - // our operator excepts WindowedValue<KeyedWorkItem> while our input stream - // is WindowedValue<SingletonKeyedWorkItem>, which is fine but Java doesn't like it ... - @SuppressWarnings("unchecked") - SingleOutputStreamOperator<WindowedValue<KV<K, OutputT>>> outDataStream = - keyedWorkItemStream.transform( - transform.getName(), outputTypeInfo, (OneInputStreamOperator) doFnOperator); - - context.setOutputDataStream(context.getOutput(transform), outDataStream); - } else { - Tuple2<Map<Integer, PCollectionView<?>>, DataStream<RawUnionValue>> transformSideInputs = - transformSideInputs(sideInputs, context); - - WindowDoFnOperator<K, InputT, OutputT> doFnOperator = - new WindowDoFnOperator<>( - reduceFn, - (Coder) windowedWorkItemCoder, - new TupleTag<KV<K, OutputT>>("main output"), - Collections.<TupleTag<?>>emptyList(), - new DoFnOperator.DefaultOutputManagerFactory<WindowedValue<KV<K, OutputT>>>(), - windowingStrategy, - transformSideInputs.f0, - sideInputs, - context.getPipelineOptions(), - inputKvCoder.getKeyCoder()); - - // we have to manually contruct the two-input transform because we're not - // allowed to have only one input keyed, normally. - - TwoInputTransformation< - WindowedValue<SingletonKeyedWorkItem<K, InputT>>, - RawUnionValue, - WindowedValue<KV<K, OutputT>>> rawFlinkTransform = new TwoInputTransformation<>( - keyedWorkItemStream.getTransformation(), - transformSideInputs.f1.broadcast().getTransformation(), - transform.getName(), - (TwoInputStreamOperator) doFnOperator, - outputTypeInfo, - keyedWorkItemStream.getParallelism()); - - rawFlinkTransform.setStateKeyType(keyedWorkItemStream.getKeyType()); - rawFlinkTransform.setStateKeySelectors(keyedWorkItemStream.getKeySelector(), null); - - @SuppressWarnings({ "unchecked", "rawtypes" }) - SingleOutputStreamOperator<WindowedValue<KV<K, OutputT>>> outDataStream = - new SingleOutputStreamOperator( - keyedWorkItemStream.getExecutionEnvironment(), - rawFlinkTransform) {}; // we have to cheat around the ctor being protected - - keyedWorkItemStream.getExecutionEnvironment().addOperator(rawFlinkTransform); - - context.setOutputDataStream(context.getOutput(transform), outDataStream); - } - } - } - - private static class GBKIntoKeyedWorkItemsTranslator<K, InputT> - extends FlinkStreamingPipelineTranslator.StreamTransformTranslator< - SplittableParDo.GBKIntoKeyedWorkItems<K, InputT>> { - - @Override - boolean canTranslate( - SplittableParDo.GBKIntoKeyedWorkItems<K, InputT> transform, - FlinkStreamingTranslationContext context) { - return true; - } - - @Override - public void translateNode( - SplittableParDo.GBKIntoKeyedWorkItems<K, InputT> transform, - FlinkStreamingTranslationContext context) { - - PCollection<KV<K, InputT>> input = context.getInput(transform); - - KvCoder<K, InputT> inputKvCoder = (KvCoder<K, InputT>) input.getCoder(); - - SingletonKeyedWorkItemCoder<K, InputT> workItemCoder = SingletonKeyedWorkItemCoder.of( - inputKvCoder.getKeyCoder(), - inputKvCoder.getValueCoder(), - input.getWindowingStrategy().getWindowFn().windowCoder()); - - - WindowedValue. - FullWindowedValueCoder<SingletonKeyedWorkItem<K, InputT>> windowedWorkItemCoder = - WindowedValue.getFullCoder( - workItemCoder, - input.getWindowingStrategy().getWindowFn().windowCoder()); - - CoderTypeInformation<WindowedValue<SingletonKeyedWorkItem<K, InputT>>> workItemTypeInfo = - new CoderTypeInformation<>(windowedWorkItemCoder); - - DataStream<WindowedValue<KV<K, InputT>>> inputDataStream = context.getInputDataStream(input); - - DataStream<WindowedValue<SingletonKeyedWorkItem<K, InputT>>> workItemStream = - inputDataStream - .flatMap(new ToKeyedWorkItem<K, InputT>()) - .returns(workItemTypeInfo).name("ToKeyedWorkItem"); - - KeyedStream< - WindowedValue< - SingletonKeyedWorkItem<K, InputT>>, ByteBuffer> keyedWorkItemStream = workItemStream - .keyBy(new WorkItemKeySelector<K, InputT>(inputKvCoder.getKeyCoder())); - - context.setOutputDataStream(context.getOutput(transform), keyedWorkItemStream); - } - } - - private static class FlattenPCollectionTranslator<T> - extends FlinkStreamingPipelineTranslator.StreamTransformTranslator< - Flatten.PCollections<T>> { - - @Override - public void translateNode( - Flatten.PCollections<T> transform, - FlinkStreamingTranslationContext context) { - Map<TupleTag<?>, PValue> allInputs = context.getInputs(transform); - - if (allInputs.isEmpty()) { - - // create an empty dummy source to satisfy downstream operations - // we cannot create an empty source in Flink, therefore we have to - // add the flatMap that simply never forwards the single element - DataStreamSource<String> dummySource = - context.getExecutionEnvironment().fromElements("dummy"); - - DataStream<WindowedValue<T>> result = dummySource.flatMap( - new FlatMapFunction<String, WindowedValue<T>>() { - @Override - public void flatMap( - String s, - Collector<WindowedValue<T>> collector) throws Exception { - // never return anything - } - }).returns( - new CoderTypeInformation<>( - WindowedValue.getFullCoder( - (Coder<T>) VoidCoder.of(), - GlobalWindow.Coder.INSTANCE))); - context.setOutputDataStream(context.getOutput(transform), result); - - } else { - DataStream<T> result = null; - for (PValue input : allInputs.values()) { - DataStream<T> current = context.getInputDataStream(input); - result = (result == null) ? current : result.union(current); - } - context.setOutputDataStream(context.getOutput(transform), result); - } - } - } - - private static class ToKeyedWorkItem<K, InputT> - extends RichFlatMapFunction< - WindowedValue<KV<K, InputT>>, - WindowedValue<SingletonKeyedWorkItem<K, InputT>>> { - - @Override - public void flatMap( - WindowedValue<KV<K, InputT>> inWithMultipleWindows, - Collector<WindowedValue<SingletonKeyedWorkItem<K, InputT>>> out) throws Exception { - - // we need to wrap each one work item per window for now - // since otherwise the PushbackSideInputRunner will not correctly - // determine whether side inputs are ready - // - // this is tracked as https://issues.apache.org/jira/browse/BEAM-1850 - for (WindowedValue<KV<K, InputT>> in : inWithMultipleWindows.explodeWindows()) { - SingletonKeyedWorkItem<K, InputT> workItem = - new SingletonKeyedWorkItem<>( - in.getValue().getKey(), - in.withValue(in.getValue().getValue())); - - out.collect(in.withValue(workItem)); - } - } - } - -} http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTranslationContext.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTranslationContext.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTranslationContext.java deleted file mode 100644 index 1a943a3..0000000 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTranslationContext.java +++ /dev/null @@ -1,130 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.flink; - -import static com.google.common.base.Preconditions.checkNotNull; - -import com.google.common.collect.Iterables; -import java.util.HashMap; -import java.util.Map; -import org.apache.beam.runners.flink.translation.types.CoderTypeInformation; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.transforms.AppliedPTransform; -import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.util.WindowedValue; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PInput; -import org.apache.beam.sdk.values.POutput; -import org.apache.beam.sdk.values.PValue; -import org.apache.beam.sdk.values.TupleTag; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; - -/** - * Helper for keeping track of which {@link DataStream DataStreams} map - * to which {@link PTransform PTransforms}. - */ -class FlinkStreamingTranslationContext { - - private final StreamExecutionEnvironment env; - private final PipelineOptions options; - - /** - * Keeps a mapping between the output value of the PTransform (in Dataflow) and the - * Flink Operator that produced it, after the translation of the correspondinf PTransform - * to its Flink equivalent. - * */ - private final Map<PValue, DataStream<?>> dataStreams; - - private AppliedPTransform<?, ?, ?> currentTransform; - - public FlinkStreamingTranslationContext(StreamExecutionEnvironment env, PipelineOptions options) { - this.env = checkNotNull(env); - this.options = checkNotNull(options); - this.dataStreams = new HashMap<>(); - } - - public StreamExecutionEnvironment getExecutionEnvironment() { - return env; - } - - public PipelineOptions getPipelineOptions() { - return options; - } - - @SuppressWarnings("unchecked") - public <T> DataStream<T> getInputDataStream(PValue value) { - return (DataStream<T>) dataStreams.get(value); - } - - public void setOutputDataStream(PValue value, DataStream<?> set) { - if (!dataStreams.containsKey(value)) { - dataStreams.put(value, set); - } - } - - /** - * Sets the AppliedPTransform which carries input/output. - * @param currentTransform - */ - public void setCurrentTransform(AppliedPTransform<?, ?, ?> currentTransform) { - this.currentTransform = currentTransform; - } - - public <T> Coder<WindowedValue<T>> getCoder(PCollection<T> collection) { - Coder<T> valueCoder = collection.getCoder(); - - return WindowedValue.getFullCoder( - valueCoder, - collection.getWindowingStrategy().getWindowFn().windowCoder()); - } - - @SuppressWarnings("unchecked") - public <T> TypeInformation<WindowedValue<T>> getTypeInfo(PCollection<T> collection) { - Coder<T> valueCoder = collection.getCoder(); - WindowedValue.FullWindowedValueCoder<T> windowedValueCoder = - WindowedValue.getFullCoder( - valueCoder, - collection.getWindowingStrategy().getWindowFn().windowCoder()); - - return new CoderTypeInformation<>(windowedValueCoder); - } - - - @SuppressWarnings("unchecked") - public <T extends PValue> T getInput(PTransform<T, ?> transform) { - return (T) Iterables.getOnlyElement(currentTransform.getInputs().values()); - } - - public <T extends PInput> Map<TupleTag<?>, PValue> getInputs(PTransform<T, ?> transform) { - return currentTransform.getInputs(); - } - - @SuppressWarnings("unchecked") - public <T extends PValue> T getOutput(PTransform<?, T> transform) { - return (T) Iterables.getOnlyElement(currentTransform.getOutputs().values()); - } - - public <OutputT extends POutput> Map<TupleTag<?>, PValue> getOutputs( - PTransform<?, OutputT> transform) { - return currentTransform.getOutputs(); - } - -} http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingViewOverrides.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingViewOverrides.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingViewOverrides.java deleted file mode 100644 index f955f2a..0000000 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingViewOverrides.java +++ /dev/null @@ -1,372 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.flink; - -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.Map; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.CoderRegistry; -import org.apache.beam.sdk.coders.KvCoder; -import org.apache.beam.sdk.coders.ListCoder; -import org.apache.beam.sdk.transforms.Combine; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.transforms.View; -import org.apache.beam.sdk.util.PCollectionViews; -import org.apache.beam.sdk.values.KV; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PCollectionView; - -/** - * Flink streaming overrides for various view (side input) transforms. - */ -class FlinkStreamingViewOverrides { - - /** - * Specialized implementation for - * {@link org.apache.beam.sdk.transforms.View.AsMap View.AsMap} - * for the Flink runner in streaming mode. - */ - static class StreamingViewAsMap<K, V> - extends PTransform<PCollection<KV<K, V>>, PCollectionView<Map<K, V>>> { - - private final transient FlinkRunner runner; - - @SuppressWarnings("unused") // used via reflection in FlinkRunner#apply() - public StreamingViewAsMap(FlinkRunner runner, View.AsMap<K, V> transform) { - this.runner = runner; - } - - @Override - public PCollectionView<Map<K, V>> expand(PCollection<KV<K, V>> input) { - PCollectionView<Map<K, V>> view = - PCollectionViews.mapView( - input, - input.getWindowingStrategy(), - input.getCoder()); - - @SuppressWarnings({"rawtypes", "unchecked"}) - KvCoder<K, V> inputCoder = (KvCoder) input.getCoder(); - try { - inputCoder.getKeyCoder().verifyDeterministic(); - } catch (Coder.NonDeterministicException e) { - runner.recordViewUsesNonDeterministicKeyCoder(this); - } - - return input - .apply(Combine.globally(new Concatenate<KV<K, V>>()).withoutDefaults()) - .apply(CreateFlinkPCollectionView.<KV<K, V>, Map<K, V>>of(view)); - } - - @Override - protected String getKindString() { - return "StreamingViewAsMap"; - } - } - - /** - * Specialized expansion for {@link - * View.AsMultimap View.AsMultimap} for the - * Flink runner in streaming mode. - */ - static class StreamingViewAsMultimap<K, V> - extends PTransform<PCollection<KV<K, V>>, PCollectionView<Map<K, Iterable<V>>>> { - - private final transient FlinkRunner runner; - - /** - * Builds an instance of this class from the overridden transform. - */ - @SuppressWarnings("unused") // used via reflection in FlinkRunner#apply() - public StreamingViewAsMultimap(FlinkRunner runner, View.AsMultimap<K, V> transform) { - this.runner = runner; - } - - @Override - public PCollectionView<Map<K, Iterable<V>>> expand(PCollection<KV<K, V>> input) { - PCollectionView<Map<K, Iterable<V>>> view = - PCollectionViews.multimapView( - input, - input.getWindowingStrategy(), - input.getCoder()); - - @SuppressWarnings({"rawtypes", "unchecked"}) - KvCoder<K, V> inputCoder = (KvCoder) input.getCoder(); - try { - inputCoder.getKeyCoder().verifyDeterministic(); - } catch (Coder.NonDeterministicException e) { - runner.recordViewUsesNonDeterministicKeyCoder(this); - } - - return input - .apply(Combine.globally(new Concatenate<KV<K, V>>()).withoutDefaults()) - .apply(CreateFlinkPCollectionView.<KV<K, V>, Map<K, Iterable<V>>>of(view)); - } - - @Override - protected String getKindString() { - return "StreamingViewAsMultimap"; - } - } - - /** - * Specialized implementation for - * {@link View.AsList View.AsList} for the - * Flink runner in streaming mode. - */ - static class StreamingViewAsList<T> - extends PTransform<PCollection<T>, PCollectionView<List<T>>> { - /** - * Builds an instance of this class from the overridden transform. - */ - @SuppressWarnings("unused") // used via reflection in FlinkRunner#apply() - public StreamingViewAsList(FlinkRunner runner, View.AsList<T> transform) {} - - @Override - public PCollectionView<List<T>> expand(PCollection<T> input) { - PCollectionView<List<T>> view = - PCollectionViews.listView( - input, - input.getWindowingStrategy(), - input.getCoder()); - - return input.apply(Combine.globally(new Concatenate<T>()).withoutDefaults()) - .apply(CreateFlinkPCollectionView.<T, List<T>>of(view)); - } - - @Override - protected String getKindString() { - return "StreamingViewAsList"; - } - } - - /** - * Specialized implementation for - * {@link View.AsIterable View.AsIterable} for the - * Flink runner in streaming mode. - */ - static class StreamingViewAsIterable<T> - extends PTransform<PCollection<T>, PCollectionView<Iterable<T>>> { - /** - * Builds an instance of this class from the overridden transform. - */ - @SuppressWarnings("unused") // used via reflection in FlinkRunner#apply() - public StreamingViewAsIterable(FlinkRunner runner, View.AsIterable<T> transform) { } - - @Override - public PCollectionView<Iterable<T>> expand(PCollection<T> input) { - PCollectionView<Iterable<T>> view = - PCollectionViews.iterableView( - input, - input.getWindowingStrategy(), - input.getCoder()); - - return input.apply(Combine.globally(new Concatenate<T>()).withoutDefaults()) - .apply(CreateFlinkPCollectionView.<T, Iterable<T>>of(view)); - } - - @Override - protected String getKindString() { - return "StreamingViewAsIterable"; - } - } - - /** - * Specialized expansion for - * {@link View.AsSingleton View.AsSingleton} for the - * Flink runner in streaming mode. - */ - static class StreamingViewAsSingleton<T> - extends PTransform<PCollection<T>, PCollectionView<T>> { - private View.AsSingleton<T> transform; - - /** - * Builds an instance of this class from the overridden transform. - */ - @SuppressWarnings("unused") // used via reflection in FlinkRunner#apply() - public StreamingViewAsSingleton(FlinkRunner runner, View.AsSingleton<T> transform) { - this.transform = transform; - } - - @Override - public PCollectionView<T> expand(PCollection<T> input) { - Combine.Globally<T, T> combine = Combine.globally( - new SingletonCombine<>(transform.hasDefaultValue(), transform.defaultValue())); - if (!transform.hasDefaultValue()) { - combine = combine.withoutDefaults(); - } - return input.apply(combine.asSingletonView()); - } - - @Override - protected String getKindString() { - return "StreamingViewAsSingleton"; - } - - private static class SingletonCombine<T> extends Combine.BinaryCombineFn<T> { - private boolean hasDefaultValue; - private T defaultValue; - - SingletonCombine(boolean hasDefaultValue, T defaultValue) { - this.hasDefaultValue = hasDefaultValue; - this.defaultValue = defaultValue; - } - - @Override - public T apply(T left, T right) { - throw new IllegalArgumentException("PCollection with more than one element " - + "accessed as a singleton view. Consider using Combine.globally().asSingleton() to " - + "combine the PCollection into a single value"); - } - - @Override - public T identity() { - if (hasDefaultValue) { - return defaultValue; - } else { - throw new IllegalArgumentException( - "Empty PCollection accessed as a singleton view. " - + "Consider setting withDefault to provide a default value"); - } - } - } - } - - static class StreamingCombineGloballyAsSingletonView<InputT, OutputT> - extends PTransform<PCollection<InputT>, PCollectionView<OutputT>> { - Combine.GloballyAsSingletonView<InputT, OutputT> transform; - - /** - * Builds an instance of this class from the overridden transform. - */ - @SuppressWarnings("unused") // used via reflection in FlinkRunner#apply() - public StreamingCombineGloballyAsSingletonView( - FlinkRunner runner, - Combine.GloballyAsSingletonView<InputT, OutputT> transform) { - this.transform = transform; - } - - @Override - public PCollectionView<OutputT> expand(PCollection<InputT> input) { - PCollection<OutputT> combined = - input.apply(Combine.globally(transform.getCombineFn()) - .withoutDefaults() - .withFanout(transform.getFanout())); - - PCollectionView<OutputT> view = PCollectionViews.singletonView( - combined, - combined.getWindowingStrategy(), - transform.getInsertDefault(), - transform.getInsertDefault() - ? transform.getCombineFn().defaultValue() : null, - combined.getCoder()); - return combined - .apply(ParDo.of(new WrapAsList<OutputT>())) - .apply(CreateFlinkPCollectionView.<OutputT, OutputT>of(view)); - } - - @Override - protected String getKindString() { - return "StreamingCombineGloballyAsSingletonView"; - } - } - - private static class WrapAsList<T> extends DoFn<T, List<T>> { - @ProcessElement - public void processElement(ProcessContext c) { - c.output(Collections.singletonList(c.element())); - } - } - - /** - * Combiner that combines {@code T}s into a single {@code List<T>} containing all inputs. - * - * <p>For internal use by {@link StreamingViewAsMap}, {@link StreamingViewAsMultimap}, - * {@link StreamingViewAsList}, {@link StreamingViewAsIterable}. - * They require the input {@link PCollection} fits in memory. - * For a large {@link PCollection} this is expected to crash! - * - * @param <T> the type of elements to concatenate. - */ - private static class Concatenate<T> extends Combine.CombineFn<T, List<T>, List<T>> { - @Override - public List<T> createAccumulator() { - return new ArrayList<T>(); - } - - @Override - public List<T> addInput(List<T> accumulator, T input) { - accumulator.add(input); - return accumulator; - } - - @Override - public List<T> mergeAccumulators(Iterable<List<T>> accumulators) { - List<T> result = createAccumulator(); - for (List<T> accumulator : accumulators) { - result.addAll(accumulator); - } - return result; - } - - @Override - public List<T> extractOutput(List<T> accumulator) { - return accumulator; - } - - @Override - public Coder<List<T>> getAccumulatorCoder(CoderRegistry registry, Coder<T> inputCoder) { - return ListCoder.of(inputCoder); - } - - @Override - public Coder<List<T>> getDefaultOutputCoder(CoderRegistry registry, Coder<T> inputCoder) { - return ListCoder.of(inputCoder); - } - } - - /** - * Creates a primitive {@link PCollectionView}. - * - * <p>For internal use only by runner implementors. - * - * @param <ElemT> The type of the elements of the input PCollection - * @param <ViewT> The type associated with the {@link PCollectionView} used as a side input - */ - public static class CreateFlinkPCollectionView<ElemT, ViewT> - extends PTransform<PCollection<List<ElemT>>, PCollectionView<ViewT>> { - private PCollectionView<ViewT> view; - - private CreateFlinkPCollectionView(PCollectionView<ViewT> view) { - this.view = view; - } - - public static <ElemT, ViewT> CreateFlinkPCollectionView<ElemT, ViewT> of( - PCollectionView<ViewT> view) { - return new CreateFlinkPCollectionView<>(view); - } - - @Override - public PCollectionView<ViewT> expand(PCollection<List<ElemT>> input) { - return view; - } - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/PipelineTranslationOptimizer.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/PipelineTranslationOptimizer.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/PipelineTranslationOptimizer.java deleted file mode 100644 index 3acc3ea..0000000 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/PipelineTranslationOptimizer.java +++ /dev/null @@ -1,72 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.flink; - -import org.apache.beam.sdk.io.Read; -import org.apache.beam.sdk.runners.TransformHierarchy; -import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.values.PValue; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Traverses the Pipeline to determine the {@link TranslationMode} for this pipeline. - */ -class PipelineTranslationOptimizer extends FlinkPipelineTranslator { - - private static final Logger LOG = LoggerFactory.getLogger(PipelineTranslationOptimizer.class); - - private TranslationMode translationMode; - - private final FlinkPipelineOptions options; - - public PipelineTranslationOptimizer(TranslationMode defaultMode, FlinkPipelineOptions options) { - this.translationMode = defaultMode; - this.options = options; - } - - public TranslationMode getTranslationMode() { - - // override user-specified translation mode - if (options.isStreaming()) { - return TranslationMode.STREAMING; - } - - return translationMode; - } - - @Override - public CompositeBehavior enterCompositeTransform(TransformHierarchy.Node node) { - return CompositeBehavior.ENTER_TRANSFORM; - } - - @Override - public void leaveCompositeTransform(TransformHierarchy.Node node) {} - - @Override - public void visitPrimitiveTransform(TransformHierarchy.Node node) { - Class<? extends PTransform> transformClass = node.getTransform().getClass(); - if (transformClass == Read.Unbounded.class) { - LOG.info("Found {}. Switching to streaming execution.", transformClass); - translationMode = TranslationMode.STREAMING; - } - } - - @Override - public void visitValue(PValue value, TransformHierarchy.Node producer) {} -}