Remove the DirectPipelineRunner from the Core SDK
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/99654ca4 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/99654ca4 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/99654ca4 Branch: refs/heads/python-sdk Commit: 99654ca4bed6758d7128d0f0ad376e8b479d4eba Parents: 45e57e0 Author: Thomas Groh <tg...@google.com> Authored: Tue Jun 14 17:52:49 2016 -0700 Committer: Davor Bonaci <da...@google.com> Committed: Mon Jun 20 15:14:28 2016 -0700 ---------------------------------------------------------------------- .../examples/common/DataflowExampleUtils.java | 11 +- runners/spark/pom.xml | 6 + .../translation/TransformTranslatorTest.java | 4 +- .../java/org/apache/beam/sdk/io/PubsubIO.java | 1 - .../main/java/org/apache/beam/sdk/io/Read.java | 44 - .../java/org/apache/beam/sdk/io/TextIO.java | 1 - .../beam/sdk/options/DirectPipelineOptions.java | 1 - .../sdk/runners/DirectPipelineRegistrar.java | 55 - .../beam/sdk/runners/DirectPipelineRunner.java | 1298 ------------------ .../org/apache/beam/sdk/transforms/Flatten.java | 32 - .../org/apache/beam/sdk/transforms/ParDo.java | 302 +--- .../org/apache/beam/sdk/transforms/View.java | 24 - .../sdk/util/DirectModeExecutionContext.java | 130 -- .../apache/beam/sdk/util/DoFnRunnerBase.java | 1 - .../java/org/apache/beam/sdk/PipelineTest.java | 4 +- .../io/BoundedReadFromUnboundedSourceTest.java | 6 - .../runners/DirectPipelineRegistrarTest.java | 71 - .../sdk/runners/DirectPipelineRunnerTest.java | 222 --- .../beam/sdk/runners/PipelineRunnerTest.java | 9 +- .../apache/beam/sdk/transforms/CombineTest.java | 21 - .../beam/sdk/transforms/GroupByKeyTest.java | 13 +- .../apache/beam/sdk/transforms/ViewTest.java | 29 +- .../main/java/common/DataflowExampleUtils.java | 13 +- testing/travis/test_wordcount.sh | 4 +- 24 files changed, 40 insertions(+), 2262 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/99654ca4/examples/java/src/main/java/org/apache/beam/examples/common/DataflowExampleUtils.java ---------------------------------------------------------------------- diff --git a/examples/java/src/main/java/org/apache/beam/examples/common/DataflowExampleUtils.java b/examples/java/src/main/java/org/apache/beam/examples/common/DataflowExampleUtils.java index fb4f3bf..a0b7319 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/common/DataflowExampleUtils.java +++ b/examples/java/src/main/java/org/apache/beam/examples/common/DataflowExampleUtils.java @@ -17,6 +17,7 @@ */ package org.apache.beam.examples.common; +import org.apache.beam.runners.dataflow.BlockingDataflowPipelineRunner; import org.apache.beam.runners.dataflow.DataflowPipelineJob; import org.apache.beam.runners.dataflow.DataflowPipelineRunner; import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions; @@ -25,7 +26,7 @@ import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.io.TextIO; import org.apache.beam.sdk.options.BigQueryOptions; -import org.apache.beam.sdk.runners.DirectPipelineRunner; +import org.apache.beam.sdk.runners.PipelineRunner; import org.apache.beam.sdk.transforms.IntraBundleParallelization; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.util.AttemptBoundedExponentialBackOff; @@ -315,11 +316,13 @@ public class DataflowExampleUtils { /** * Do some runner setup: check that the DirectPipelineRunner is not used in conjunction with - * streaming, and if streaming is specified, use the DataflowPipelineRunner. Return the streaming - * flag value. + * streaming, and if streaming is specified, use the DataflowPipelineRunner. */ public void setupRunner() { - if (options.isStreaming() && options.getRunner() != DirectPipelineRunner.class) { + Class<? extends PipelineRunner<?>> runner = options.getRunner(); + if (options.isStreaming() + && (runner.equals(DataflowPipelineRunner.class) + || runner.equals(BlockingDataflowPipelineRunner.class))) { // In order to cancel the pipelines automatically, // {@literal DataflowPipelineRunner} is forced to be used. options.setRunner(DataflowPipelineRunner.class); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/99654ca4/runners/spark/pom.xml ---------------------------------------------------------------------- diff --git a/runners/spark/pom.xml b/runners/spark/pom.xml index 4110689..e7d0834 100644 --- a/runners/spark/pom.xml +++ b/runners/spark/pom.xml @@ -118,6 +118,12 @@ <artifactId>hamcrest-all</artifactId> <scope>test</scope> </dependency> + <dependency> + <groupId>org.apache.beam</groupId> + <artifactId>beam-runners-direct-java</artifactId> + <version>0.2.0-incubating-SNAPSHOT</version> + <scope>test</scope> + </dependency> </dependencies> <build> http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/99654ca4/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/TransformTranslatorTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/TransformTranslatorTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/TransformTranslatorTest.java index 4ef26d3..01f3070 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/TransformTranslatorTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/TransformTranslatorTest.java @@ -21,12 +21,12 @@ package org.apache.beam.runners.spark.translation; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.junit.Assert.assertThat; +import org.apache.beam.runners.direct.InProcessPipelineRunner; import org.apache.beam.runners.spark.SparkPipelineRunner; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.io.TextIO; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; -import org.apache.beam.sdk.runners.DirectPipelineRunner; import org.apache.beam.sdk.runners.PipelineRunner; import org.apache.beam.sdk.values.PCollection; @@ -58,7 +58,7 @@ public class TransformTranslatorTest { */ @Test public void testTextIOReadAndWriteTransforms() throws IOException { - String directOut = runPipeline(DirectPipelineRunner.class); + String directOut = runPipeline(InProcessPipelineRunner.class); String sparkOut = runPipeline(SparkPipelineRunner.class); List<String> directOutput = http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/99654ca4/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java index 7e24253..2a5698c 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java @@ -23,7 +23,6 @@ import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.coders.VoidCoder; import org.apache.beam.sdk.options.PubsubOptions; -import org.apache.beam.sdk.runners.DirectPipelineRunner; import org.apache.beam.sdk.runners.PipelineRunner; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFn; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/99654ca4/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java index fb40063..c0440f2 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java @@ -20,11 +20,9 @@ package org.apache.beam.sdk.io; import static org.apache.beam.sdk.util.StringUtils.approximateSimpleName; import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.runners.DirectPipelineRunner; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.util.SerializableUtils; -import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollection.IsBounded; @@ -32,9 +30,6 @@ import org.apache.beam.sdk.values.PInput; import org.joda.time.Duration; -import java.util.ArrayList; -import java.util.List; - import javax.annotation.Nullable; /** @@ -153,45 +148,6 @@ public class Read { .withLabel("Read Source")) .include(source); } - - static { - registerDefaultTransformEvaluator(); - } - - @SuppressWarnings({"rawtypes", "unchecked"}) - private static void registerDefaultTransformEvaluator() { - DirectPipelineRunner.registerDefaultTransformEvaluator( - Bounded.class, - new DirectPipelineRunner.TransformEvaluator<Bounded>() { - @Override - public void evaluate( - Bounded transform, DirectPipelineRunner.EvaluationContext context) { - evaluateReadHelper(transform, context); - } - - private <T> void evaluateReadHelper( - Read.Bounded<T> transform, DirectPipelineRunner.EvaluationContext context) { - try { - List<DirectPipelineRunner.ValueWithMetadata<T>> output = new ArrayList<>(); - BoundedSource<T> source = transform.getSource(); - try (BoundedSource.BoundedReader<T> reader = - source.createReader(context.getPipelineOptions())) { - for (boolean available = reader.start(); - available; - available = reader.advance()) { - output.add( - DirectPipelineRunner.ValueWithMetadata.of( - WindowedValue.timestampedValueInGlobalWindow( - reader.getCurrent(), reader.getCurrentTimestamp()))); - } - } - context.setPCollectionValuesWithMetadata(context.getOutput(transform), output); - } catch (Exception e) { - throw new RuntimeException(e); - } - } - }); - } } /** http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/99654ca4/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java index 13cb45e..bbef072 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java @@ -25,7 +25,6 @@ import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.coders.VoidCoder; import org.apache.beam.sdk.io.Read.Bounded; import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.runners.DirectPipelineRunner; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.util.IOChannelUtils; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/99654ca4/sdks/java/core/src/main/java/org/apache/beam/sdk/options/DirectPipelineOptions.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/DirectPipelineOptions.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/DirectPipelineOptions.java index 4cdc0ca..c2095e3 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/DirectPipelineOptions.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/DirectPipelineOptions.java @@ -18,7 +18,6 @@ package org.apache.beam.sdk.options; import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.runners.DirectPipelineRunner; import org.apache.beam.sdk.values.PCollection; import com.fasterxml.jackson.annotation.JsonIgnore; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/99654ca4/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/DirectPipelineRegistrar.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/DirectPipelineRegistrar.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/DirectPipelineRegistrar.java deleted file mode 100644 index 7dd0fdd..0000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/DirectPipelineRegistrar.java +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.runners; - -import org.apache.beam.sdk.options.DirectPipelineOptions; -import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.options.PipelineOptionsRegistrar; - -import com.google.auto.service.AutoService; -import com.google.common.collect.ImmutableList; - -/** - * Contains the {@link PipelineOptionsRegistrar} and {@link PipelineRunnerRegistrar} for - * the {@link DirectPipeline}. - */ -public class DirectPipelineRegistrar { - private DirectPipelineRegistrar() { } - - /** - * Register the {@link DirectPipelineRunner}. - */ - @AutoService(PipelineRunnerRegistrar.class) - public static class Runner implements PipelineRunnerRegistrar { - @Override - public Iterable<Class<? extends PipelineRunner<?>>> getPipelineRunners() { - return ImmutableList.<Class<? extends PipelineRunner<?>>>of(DirectPipelineRunner.class); - } - } - - /** - * Register the {@link DirectPipelineOptions}. - */ - @AutoService(PipelineOptionsRegistrar.class) - public static class Options implements PipelineOptionsRegistrar { - @Override - public Iterable<Class<? extends PipelineOptions>> getPipelineOptions() { - return ImmutableList.<Class<? extends PipelineOptions>>of(DirectPipelineOptions.class); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/99654ca4/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/DirectPipelineRunner.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/DirectPipelineRunner.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/DirectPipelineRunner.java deleted file mode 100644 index 1eb25c5..0000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/DirectPipelineRunner.java +++ /dev/null @@ -1,1298 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.runners; - -import static org.apache.beam.sdk.util.CoderUtils.encodeToByteArray; - -import static com.google.common.base.Preconditions.checkArgument; -import static com.google.common.base.Preconditions.checkState; - -import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.Pipeline.PipelineVisitor; -import org.apache.beam.sdk.PipelineResult; -import org.apache.beam.sdk.coders.CannotProvideCoderException; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.CoderException; -import org.apache.beam.sdk.coders.ListCoder; -import org.apache.beam.sdk.io.AvroIO; -import org.apache.beam.sdk.io.FileBasedSink; -import org.apache.beam.sdk.io.TextIO; -import org.apache.beam.sdk.options.DirectPipelineOptions; -import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.options.PipelineOptionsValidator; -import org.apache.beam.sdk.transforms.Aggregator; -import org.apache.beam.sdk.transforms.AppliedPTransform; -import org.apache.beam.sdk.transforms.Combine; -import org.apache.beam.sdk.transforms.Combine.KeyedCombineFn; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.GroupByKey; -import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.transforms.Partition; -import org.apache.beam.sdk.transforms.Partition.PartitionFn; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.transforms.windowing.Window; -import org.apache.beam.sdk.transforms.windowing.WindowFn; -import org.apache.beam.sdk.util.AppliedCombineFn; -import org.apache.beam.sdk.util.AssignWindows; -import org.apache.beam.sdk.util.GroupByKeyViaGroupByKeyOnly; -import org.apache.beam.sdk.util.GroupByKeyViaGroupByKeyOnly.GroupByKeyOnly; -import org.apache.beam.sdk.util.IOChannelUtils; -import org.apache.beam.sdk.util.MapAggregatorValues; -import org.apache.beam.sdk.util.PerKeyCombineFnRunner; -import org.apache.beam.sdk.util.PerKeyCombineFnRunners; -import org.apache.beam.sdk.util.SerializableUtils; -import org.apache.beam.sdk.util.WindowedValue; -import org.apache.beam.sdk.util.WindowingStrategy; -import org.apache.beam.sdk.util.common.Counter; -import org.apache.beam.sdk.util.common.CounterSet; -import org.apache.beam.sdk.values.KV; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PCollectionList; -import org.apache.beam.sdk.values.PCollectionView; -import org.apache.beam.sdk.values.PDone; -import org.apache.beam.sdk.values.PInput; -import org.apache.beam.sdk.values.POutput; -import org.apache.beam.sdk.values.PValue; -import org.apache.beam.sdk.values.TypedPValue; - -import com.google.common.base.Function; -import com.google.common.collect.Lists; - -import org.joda.time.Instant; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.Serializable; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Random; - -/** - * Executes the operations in the pipeline directly, in this process, without - * any optimization. Useful for small local execution and tests. - * - * <p>Throws an exception from {@link #run} if execution fails. - * - * <p><h3>Permissions</h3> - * When reading from a Dataflow source or writing to a Dataflow sink using - * {@code DirectPipelineRunner}, the Cloud Platform account that you configured with the - * <a href="https://cloud.google.com/sdk/gcloud">gcloud</a> executable will need access to the - * corresponding source/sink. - * - * <p>Please see <a href="https://cloud.google.com/dataflow/security-and-permissions">Google Cloud - * Dataflow Security and Permissions</a> for more details. - */ -@SuppressWarnings({"rawtypes", "unchecked"}) -public class DirectPipelineRunner - extends PipelineRunner<DirectPipelineRunner.EvaluationResults> { - private static final Logger LOG = LoggerFactory.getLogger(DirectPipelineRunner.class); - - /** - * A source of random data, which can be seeded if determinism is desired. - */ - private Random rand; - - /** - * A map from PTransform class to the corresponding - * TransformEvaluator to use to evaluate that transform. - * - * <p>A static map that contains system-wide defaults. - */ - private static Map<Class, TransformEvaluator> defaultTransformEvaluators = - new HashMap<>(); - - /** - * A map from PTransform class to the corresponding - * TransformEvaluator to use to evaluate that transform. - * - * <p>An instance map that contains bindings for this DirectPipelineRunner. - * Bindings in this map override those in the default map. - */ - private Map<Class, TransformEvaluator> localTransformEvaluators = - new HashMap<>(); - - /** - * Records that instances of the specified PTransform class - * should be evaluated by default by the corresponding - * TransformEvaluator. - */ - public static <TransformT extends PTransform<?, ?>> - void registerDefaultTransformEvaluator( - Class<TransformT> transformClass, - TransformEvaluator<? super TransformT> transformEvaluator) { - if (defaultTransformEvaluators.put(transformClass, transformEvaluator) - != null) { - throw new IllegalArgumentException( - "defining multiple evaluators for " + transformClass); - } - } - - ///////////////////////////////////////////////////////////////////////////// - - /** - * Records that instances of the specified PTransform class - * should be evaluated by the corresponding TransformEvaluator. - * Overrides any bindings specified by - * {@link #registerDefaultTransformEvaluator}. - */ - public <TransformT extends PTransform<?, ?>> - void registerTransformEvaluator( - Class<TransformT> transformClass, - TransformEvaluator<TransformT> transformEvaluator) { - if (localTransformEvaluators.put(transformClass, transformEvaluator) - != null) { - throw new IllegalArgumentException( - "defining multiple evaluators for " + transformClass); - } - } - - /** - * Returns the TransformEvaluator to use for instances of the - * specified PTransform class, or null if none registered. - */ - public <TransformT extends PTransform<?, ?>> - TransformEvaluator<TransformT> getTransformEvaluator(Class<TransformT> transformClass) { - TransformEvaluator<TransformT> transformEvaluator = - localTransformEvaluators.get(transformClass); - if (transformEvaluator == null) { - transformEvaluator = defaultTransformEvaluators.get(transformClass); - } - return transformEvaluator; - } - - /** - * Constructs a DirectPipelineRunner from the given options. - */ - public static DirectPipelineRunner fromOptions(PipelineOptions options) { - DirectPipelineOptions directOptions = - PipelineOptionsValidator.validate(DirectPipelineOptions.class, options); - LOG.debug("Creating DirectPipelineRunner"); - return new DirectPipelineRunner(directOptions); - } - - /** - * Enable runtime testing to verify that all functions and {@link Coder} - * instances can be serialized. - * - * <p>Enabled by default. - * - * <p>This method modifies the {@code DirectPipelineRunner} instance and - * returns itself. - */ - public DirectPipelineRunner withSerializabilityTesting(boolean enable) { - this.testSerializability = enable; - return this; - } - - /** - * Enable runtime testing to verify that all values can be encoded. - * - * <p>Enabled by default. - * - * <p>This method modifies the {@code DirectPipelineRunner} instance and - * returns itself. - */ - public DirectPipelineRunner withEncodabilityTesting(boolean enable) { - this.testEncodability = enable; - return this; - } - - /** - * Enable runtime testing to verify that functions do not depend on order - * of the elements. - * - * <p>This is accomplished by randomizing the order of elements. - * - * <p>Enabled by default. - * - * <p>This method modifies the {@code DirectPipelineRunner} instance and - * returns itself. - */ - public DirectPipelineRunner withUnorderednessTesting(boolean enable) { - this.testUnorderedness = enable; - return this; - } - - @Override - public <OutputT extends POutput, InputT extends PInput> OutputT apply( - PTransform<InputT, OutputT> transform, InputT input) { - if (transform instanceof Combine.GroupedValues) { - return (OutputT) applyTestCombine((Combine.GroupedValues) transform, (PCollection) input); - } else if (transform instanceof TextIO.Write.Bound) { - return (OutputT) applyTextIOWrite((TextIO.Write.Bound) transform, (PCollection<?>) input); - } else if (transform instanceof AvroIO.Write.Bound) { - return (OutputT) applyAvroIOWrite((AvroIO.Write.Bound) transform, (PCollection<?>) input); - } else if (transform instanceof GroupByKey) { - return (OutputT) - ((PCollection) input).apply(new GroupByKeyViaGroupByKeyOnly((GroupByKey) transform)); - } else if (transform instanceof Window.Bound) { - return (OutputT) - ((PCollection) input).apply(new AssignWindowsAndSetStrategy((Window.Bound) transform)); - } else { - return super.apply(transform, input); - } - } - - private <K, InputT, AccumT, OutputT> PCollection<KV<K, OutputT>> applyTestCombine( - Combine.GroupedValues<K, InputT, OutputT> transform, - PCollection<KV<K, Iterable<InputT>>> input) { - - PCollection<KV<K, OutputT>> output = input - .apply(ParDo.of(TestCombineDoFn.create(transform, input, testSerializability, rand)) - .withSideInputs(transform.getSideInputs())); - - try { - output.setCoder(transform.getDefaultOutputCoder(input)); - } catch (CannotProvideCoderException exc) { - // let coder inference occur later, if it can - } - return output; - } - - private static class ElementProcessingOrderPartitionFn<T> implements PartitionFn<T> { - private int elementNumber; - @Override - public int partitionFor(T elem, int numPartitions) { - return elementNumber++ % numPartitions; - } - } - - /** - * Applies TextIO.Write honoring user requested sharding controls (i.e. withNumShards) - * by applying a partition function based upon the number of shards the user requested. - */ - private static class DirectTextIOWrite<T> extends PTransform<PCollection<T>, PDone> { - private final TextIO.Write.Bound<T> transform; - - private DirectTextIOWrite(TextIO.Write.Bound<T> transform) { - this.transform = transform; - } - - @Override - public PDone apply(PCollection<T> input) { - checkState(transform.getNumShards() > 1, - "DirectTextIOWrite is expected to only be used when sharding controls are required."); - - // Evenly distribute all the elements across the partitions. - PCollectionList<T> partitionedElements = - input.apply(Partition.of(transform.getNumShards(), - new ElementProcessingOrderPartitionFn<T>())); - - // For each input PCollection partition, create a write transform that represents - // one of the specific shards. - for (int i = 0; i < transform.getNumShards(); ++i) { - /* - * This logic mirrors the file naming strategy within - * {@link FileBasedSink#generateDestinationFilenames()} - */ - String outputFilename = IOChannelUtils.constructName( - transform.getFilenamePrefix(), - transform.getShardNameTemplate(), - getFileExtension(transform.getFilenameSuffix()), - i, - transform.getNumShards()); - - String transformName = String.format("%s(Shard:%s)", transform.getName(), i); - partitionedElements.get(i).apply(transformName, - transform.withNumShards(1).withShardNameTemplate("").withSuffix("").to(outputFilename)); - } - return PDone.in(input.getPipeline()); - } - } - - /** - * Returns the file extension to be used. If the user did not request a file - * extension then this method returns the empty string. Otherwise this method - * adds a {@code "."} to the beginning of the users extension if one is not present. - * - * <p>This is copied from {@link FileBasedSink} to not expose it. - */ - private static String getFileExtension(String usersExtension) { - if (usersExtension == null || usersExtension.isEmpty()) { - return ""; - } - if (usersExtension.startsWith(".")) { - return usersExtension; - } - return "." + usersExtension; - } - - /** - * Apply the override for TextIO.Write.Bound if the user requested sharding controls - * greater than one. - */ - private <T> PDone applyTextIOWrite(TextIO.Write.Bound<T> transform, PCollection<T> input) { - if (transform.getNumShards() <= 1) { - // By default, the DirectPipelineRunner outputs to only 1 shard. Since the user never - // requested sharding controls greater than 1, we default to outputting to 1 file. - return super.apply(transform.withNumShards(1), input); - } - return input.apply(new DirectTextIOWrite<>(transform)); - } - - /** - * Applies AvroIO.Write honoring user requested sharding controls (i.e. withNumShards) - * by applying a partition function based upon the number of shards the user requested. - */ - private static class DirectAvroIOWrite<T> extends PTransform<PCollection<T>, PDone> { - private final AvroIO.Write.Bound<T> transform; - - private DirectAvroIOWrite(AvroIO.Write.Bound<T> transform) { - this.transform = transform; - } - - @Override - public PDone apply(PCollection<T> input) { - checkState(transform.getNumShards() > 1, - "DirectAvroIOWrite is expected to only be used when sharding controls are required."); - - // Evenly distribute all the elements across the partitions. - PCollectionList<T> partitionedElements = - input.apply(Partition.of(transform.getNumShards(), - new ElementProcessingOrderPartitionFn<T>())); - - // For each input PCollection partition, create a write transform that represents - // one of the specific shards. - for (int i = 0; i < transform.getNumShards(); ++i) { - /* - * This logic mirrors the file naming strategy within - * {@link FileBasedSink#generateDestinationFilenames()} - */ - String outputFilename = IOChannelUtils.constructName( - transform.getFilenamePrefix(), - transform.getShardNameTemplate(), - getFileExtension(transform.getFilenameSuffix()), - i, - transform.getNumShards()); - - String transformName = String.format("%s(Shard:%s)", transform.getName(), i); - partitionedElements.get(i).apply(transformName, - transform.withNumShards(1).withShardNameTemplate("").withSuffix("").to(outputFilename)); - } - return PDone.in(input.getPipeline()); - } - } - - private static class AssignWindowsAndSetStrategy<T, W extends BoundedWindow> - extends PTransform<PCollection<T>, PCollection<T>> { - - private final Window.Bound<T> wrapped; - - public AssignWindowsAndSetStrategy(Window.Bound<T> wrapped) { - this.wrapped = wrapped; - } - - @Override - public PCollection<T> apply(PCollection<T> input) { - WindowingStrategy<?, ?> outputStrategy = - wrapped.getOutputStrategyInternal(input.getWindowingStrategy()); - - WindowFn<T, BoundedWindow> windowFn = - (WindowFn<T, BoundedWindow>) outputStrategy.getWindowFn(); - - // If the Window.Bound transform only changed parts other than the WindowFn, then - // we skip AssignWindows even though it should be harmless in a perfect world. - // The world is not perfect, and a GBK may have set it to InvalidWindows to forcibly - // crash if another GBK is performed without explicitly setting the WindowFn. So we skip - // AssignWindows in this case. - if (wrapped.getWindowFn() == null) { - return input.apply("Identity", ParDo.of(new IdentityFn<T>())) - .setWindowingStrategyInternal(outputStrategy); - } else { - return input - .apply("AssignWindows", new AssignWindows<T, BoundedWindow>(windowFn)) - .setWindowingStrategyInternal(outputStrategy); - } - } - } - - private static class IdentityFn<T> extends DoFn<T, T> { - @Override - public void processElement(ProcessContext c) { - c.output(c.element()); - } - } - - /** - * Apply the override for AvroIO.Write.Bound if the user requested sharding controls - * greater than one. - */ - private <T> PDone applyAvroIOWrite(AvroIO.Write.Bound<T> transform, PCollection<T> input) { - if (transform.getNumShards() <= 1) { - // By default, the DirectPipelineRunner outputs to only 1 shard. Since the user never - // requested sharding controls greater than 1, we default to outputting to 1 file. - return super.apply(transform.withNumShards(1), input); - } - return input.apply(new DirectAvroIOWrite<>(transform)); - } - - /** - * The implementation may split the {@link KeyedCombineFn} into ADD, MERGE and EXTRACT phases ( - * see {@code org.apache.beam.sdk.runners.worker.CombineValuesFn}). In order to emulate - * this for the {@link DirectPipelineRunner} and provide an experience closer to the service, go - * through heavy serializability checks for the equivalent of the results of the ADD phase, but - * after the {@link org.apache.beam.sdk.transforms.GroupByKey} shuffle, and the MERGE - * phase. Doing these checks ensure that not only is the accumulator coder serializable, but - * the accumulator coder can actually serialize the data in question. - */ - public static class TestCombineDoFn<K, InputT, AccumT, OutputT> - extends DoFn<KV<K, Iterable<InputT>>, KV<K, OutputT>> { - private final PerKeyCombineFnRunner<? super K, ? super InputT, AccumT, OutputT> fnRunner; - private final Coder<AccumT> accumCoder; - private final boolean testSerializability; - private final Random rand; - - public static <K, InputT, AccumT, OutputT> TestCombineDoFn<K, InputT, AccumT, OutputT> create( - Combine.GroupedValues<K, InputT, OutputT> transform, - PCollection<KV<K, Iterable<InputT>>> input, - boolean testSerializability, - Random rand) { - - AppliedCombineFn<? super K, ? super InputT, ?, OutputT> fn = transform.getAppliedFn( - input.getPipeline().getCoderRegistry(), input.getCoder(), input.getWindowingStrategy()); - - return new TestCombineDoFn( - PerKeyCombineFnRunners.create(fn.getFn()), - fn.getAccumulatorCoder(), - testSerializability, - rand); - } - - public TestCombineDoFn( - PerKeyCombineFnRunner<? super K, ? super InputT, AccumT, OutputT> fnRunner, - Coder<AccumT> accumCoder, - boolean testSerializability, - Random rand) { - this.fnRunner = fnRunner; - this.accumCoder = accumCoder; - this.testSerializability = testSerializability; - this.rand = rand; - - // Check that this does not crash, specifically to catch anonymous CustomCoder subclasses. - this.accumCoder.getEncodingId(); - } - - @Override - public void processElement(ProcessContext c) throws Exception { - K key = c.element().getKey(); - Iterable<InputT> values = c.element().getValue(); - List<AccumT> groupedPostShuffle = - ensureSerializableByCoder(ListCoder.of(accumCoder), - addInputsRandomly(fnRunner, key, values, rand, c), - "After addInputs of KeyedCombineFn " + fnRunner.fn().toString()); - AccumT merged = - ensureSerializableByCoder(accumCoder, - fnRunner.mergeAccumulators(key, groupedPostShuffle, c), - "After mergeAccumulators of KeyedCombineFn " + fnRunner.fn().toString()); - // Note: The serializability of KV<K, OutputT> is ensured by the - // runner itself, since it's a transform output. - c.output(KV.of(key, fnRunner.extractOutput(key, merged, c))); - } - - /** - * Create a random list of accumulators from the given list of values. - * - * <p>Visible for testing purposes only. - */ - public static <K, AccumT, InputT> List<AccumT> addInputsRandomly( - PerKeyCombineFnRunner<? super K, ? super InputT, AccumT, ?> fnRunner, - K key, - Iterable<InputT> values, - Random random, - DoFn<?, ?>.ProcessContext c) { - List<AccumT> out = new ArrayList<AccumT>(); - int i = 0; - AccumT accumulator = fnRunner.createAccumulator(key, c); - boolean hasInput = false; - - for (InputT value : values) { - accumulator = fnRunner.addInput(key, accumulator, value, c); - hasInput = true; - - // For each index i, flip a 1/2^i weighted coin for whether to - // create a new accumulator after index i is added, i.e. [0] - // is guaranteed, [1] is an even 1/2, [2] is 1/4, etc. The - // goal is to partition the inputs into accumulators, and make - // the accumulators potentially lumpy. Also compact about half - // of the accumulators. - if (i == 0 || random.nextInt(1 << Math.min(i, 30)) == 0) { - if (i % 2 == 0) { - accumulator = fnRunner.compact(key, accumulator, c); - } - out.add(accumulator); - accumulator = fnRunner.createAccumulator(key, c); - hasInput = false; - } - i++; - } - if (hasInput) { - out.add(accumulator); - } - - Collections.shuffle(out, random); - return out; - } - - public <T> T ensureSerializableByCoder( - Coder<T> coder, T value, String errorContext) { - if (testSerializability) { - return SerializableUtils.ensureSerializableByCoder( - coder, value, errorContext); - } - return value; - } - } - - @Override - public EvaluationResults run(Pipeline pipeline) { - LOG.info("Executing pipeline using the DirectPipelineRunner."); - - Evaluator evaluator = new Evaluator(rand); - evaluator.run(pipeline); - - // Log all counter values for debugging purposes. - for (Counter counter : evaluator.getCounters()) { - LOG.info("Final aggregator value: {}", counter); - } - - LOG.info("Pipeline execution complete."); - - return evaluator; - } - - /** - * An evaluator of a PTransform. - */ - public interface TransformEvaluator<TransformT extends PTransform> { - public void evaluate(TransformT transform, - EvaluationContext context); - } - - /** - * The interface provided to registered callbacks for interacting - * with the {@code DirectPipelineRunner}, including reading and writing the - * values of {@link PCollection}s and {@link PCollectionView}s. - */ - public interface EvaluationResults extends PipelineResult { - /** - * Retrieves the value of the given PCollection. - * Throws an exception if the PCollection's value hasn't already been set. - */ - <T> List<T> getPCollection(PCollection<T> pc); - - /** - * Retrieves the windowed value of the given PCollection. - * Throws an exception if the PCollection's value hasn't already been set. - */ - <T> List<WindowedValue<T>> getPCollectionWindowedValues(PCollection<T> pc); - - /** - * Retrieves the values of each PCollection in the given - * PCollectionList. Throws an exception if the PCollectionList's - * value hasn't already been set. - */ - <T> List<List<T>> getPCollectionList(PCollectionList<T> pcs); - - /** - * Retrieves the values indicated by the given {@link PCollectionView}. - * Note that within the {@link org.apache.beam.sdk.transforms.DoFn.Context} - * implementation a {@link PCollectionView} should convert from this representation to a - * suitable side input value. - */ - <T, WindowedT> Iterable<WindowedValue<?>> getPCollectionView(PCollectionView<T> view); - } - - /** - * An immutable (value, timestamp) pair, along with other metadata necessary - * for the implementation of {@code DirectPipelineRunner}. - */ - public static class ValueWithMetadata<V> { - /** - * Returns a new {@code ValueWithMetadata} with the {@code WindowedValue}. - * Key is null. - */ - public static <V> ValueWithMetadata<V> of(WindowedValue<V> windowedValue) { - return new ValueWithMetadata<>(windowedValue, null); - } - - /** - * Returns a new {@code ValueWithMetadata} with the implicit key associated - * with this value set. The key is the last key grouped by in the chain of - * productions that produced this element. - * These keys are used internally by {@link DirectPipelineRunner} for keeping - * persisted state separate across keys. - */ - public ValueWithMetadata<V> withKey(Object key) { - return new ValueWithMetadata<>(windowedValue, key); - } - - /** - * Returns a new {@code ValueWithMetadata} that is a copy of this one, but with - * a different value. - */ - public <T> ValueWithMetadata<T> withValue(T value) { - return new ValueWithMetadata(windowedValue.withValue(value), getKey()); - } - - /** - * Returns the {@code WindowedValue} associated with this element. - */ - public WindowedValue<V> getWindowedValue() { - return windowedValue; - } - - /** - * Returns the value associated with this element. - * - * @see #withValue - */ - public V getValue() { - return windowedValue.getValue(); - } - - /** - * Returns the timestamp associated with this element. - */ - public Instant getTimestamp() { - return windowedValue.getTimestamp(); - } - - /** - * Returns the collection of windows this element has been placed into. May - * be null if the {@code PCollection} this element is in has not yet been - * windowed. - * - * @see #getWindows() - */ - public Collection<? extends BoundedWindow> getWindows() { - return windowedValue.getWindows(); - } - - - /** - * Returns the key associated with this element. May be null if the - * {@code PCollection} this element is in is not keyed. - * - * @see #withKey - */ - public Object getKey() { - return key; - } - - //////////////////////////////////////////////////////////////////////////// - - private final Object key; - private final WindowedValue<V> windowedValue; - - private ValueWithMetadata(WindowedValue<V> windowedValue, - Object key) { - this.windowedValue = windowedValue; - this.key = key; - } - } - - /** - * The interface provided to registered callbacks for interacting - * with the {@code DirectPipelineRunner}, including reading and writing the - * values of {@link PCollection}s and {@link PCollectionView}s. - */ - public interface EvaluationContext extends EvaluationResults { - /** - * Returns the configured pipeline options. - */ - DirectPipelineOptions getPipelineOptions(); - - /** - * Returns the input of the currently being processed transform. - */ - <InputT extends PInput> InputT getInput(PTransform<InputT, ?> transform); - - /** - * Returns the output of the currently being processed transform. - */ - <OutputT extends POutput> OutputT getOutput(PTransform<?, OutputT> transform); - - /** - * Sets the value of the given PCollection, where each element also has a timestamp - * and collection of windows. - * Throws an exception if the PCollection's value has already been set. - */ - <T> void setPCollectionValuesWithMetadata( - PCollection<T> pc, List<ValueWithMetadata<T>> elements); - - /** - * Sets the value of the given PCollection, where each element also has a timestamp - * and collection of windows. - * Throws an exception if the PCollection's value has already been set. - */ - <T> void setPCollectionWindowedValue(PCollection<T> pc, List<WindowedValue<T>> elements); - - /** - * Shorthand for setting the value of a PCollection where the elements do not have - * timestamps or windows. - * Throws an exception if the PCollection's value has already been set. - */ - <T> void setPCollection(PCollection<T> pc, List<T> elements); - - /** - * Retrieves the value of the given PCollection, along with element metadata - * such as timestamps and windows. - * Throws an exception if the PCollection's value hasn't already been set. - */ - <T> List<ValueWithMetadata<T>> getPCollectionValuesWithMetadata(PCollection<T> pc); - - /** - * Sets the value associated with the given {@link PCollectionView}. - * Throws an exception if the {@link PCollectionView}'s value has already been set. - */ - <ElemT, T, WindowedT> void setPCollectionView( - PCollectionView<T> pc, - Iterable<WindowedValue<ElemT>> value); - - /** - * Ensures that the element is encodable and decodable using the - * TypePValue's coder, by encoding it and decoding it, and - * returning the result. - */ - <T> T ensureElementEncodable(TypedPValue<T> pvalue, T element); - - /** - * If the evaluation context is testing unorderedness, - * randomly permutes the order of the elements, in a - * copy if !inPlaceAllowed, and returns the permuted list, - * otherwise returns the argument unchanged. - */ - <T> List<T> randomizeIfUnordered(List<T> elements, - boolean inPlaceAllowed); - - /** - * If the evaluation context is testing serializability, ensures - * that the argument function is serializable and deserializable - * by encoding it and then decoding it, and returning the result. - * Otherwise returns the argument unchanged. - */ - <FunctionT extends Serializable> FunctionT ensureSerializable(FunctionT fn); - - /** - * If the evaluation context is testing serializability, ensures - * that the argument Coder is serializable and deserializable - * by encoding it and then decoding it, and returning the result. - * Otherwise returns the argument unchanged. - */ - <T> Coder<T> ensureCoderSerializable(Coder<T> coder); - - /** - * If the evaluation context is testing serializability, ensures - * that the given data is serializable and deserializable with the - * given Coder by encoding it and then decoding it, and returning - * the result. Otherwise returns the argument unchanged. - * - * <p>Error context is prefixed to any thrown exceptions. - */ - <T> T ensureSerializableByCoder(Coder<T> coder, - T data, String errorContext); - - /** - * Returns a mutator, which can be used to add additional counters to - * this EvaluationContext. - */ - CounterSet.AddCounterMutator getAddCounterMutator(); - - /** - * Gets the step name for this transform. - */ - public String getStepName(PTransform<?, ?> transform); - } - - - ///////////////////////////////////////////////////////////////////////////// - - class Evaluator extends PipelineVisitor.Defaults implements EvaluationContext { - /** - * A map from PTransform to the step name of that transform. This is the internal name for the - * transform (e.g. "s2"). - */ - private final Map<PTransform<?, ?>, String> stepNames = new HashMap<>(); - private final Map<PValue, Object> store = new HashMap<>(); - private final CounterSet counters = new CounterSet(); - private AppliedPTransform<?, ?, ?> currentTransform; - - private Map<Aggregator<?, ?>, Collection<PTransform<?, ?>>> aggregatorSteps = null; - - /** - * A map from PTransform to the full name of that transform. This is the user name of the - * transform (e.g. "RemoveDuplicates/Combine/GroupByKey"). - */ - private final Map<PTransform<?, ?>, String> fullNames = new HashMap<>(); - - private Random rand; - - public Evaluator() { - this(new Random()); - } - - public Evaluator(Random rand) { - this.rand = rand; - } - - public void run(Pipeline pipeline) { - pipeline.traverseTopologically(this); - aggregatorSteps = new AggregatorPipelineExtractor(pipeline).getAggregatorSteps(); - } - - @Override - public DirectPipelineOptions getPipelineOptions() { - return options; - } - - @Override - public <InputT extends PInput> InputT getInput(PTransform<InputT, ?> transform) { - checkArgument(currentTransform != null && currentTransform.getTransform() == transform, - "can only be called with current transform"); - return (InputT) currentTransform.getInput(); - } - - @Override - public <OutputT extends POutput> OutputT getOutput(PTransform<?, OutputT> transform) { - checkArgument(currentTransform != null && currentTransform.getTransform() == transform, - "can only be called with current transform"); - return (OutputT) currentTransform.getOutput(); - } - - @Override - public void visitPrimitiveTransform(TransformTreeNode node) { - PTransform<?, ?> transform = node.getTransform(); - fullNames.put(transform, node.getFullName()); - TransformEvaluator evaluator = - getTransformEvaluator(transform.getClass()); - if (evaluator == null) { - throw new IllegalStateException( - "no evaluator registered for " + transform); - } - LOG.debug("Evaluating {}", transform); - currentTransform = AppliedPTransform.of( - node.getFullName(), node.getInput(), node.getOutput(), (PTransform) transform); - evaluator.evaluate(transform, this); - currentTransform = null; - } - - @Override - public void visitValue(PValue value, TransformTreeNode producer) { - LOG.debug("Checking evaluation of {}", value); - if (value.getProducingTransformInternal() == null) { - throw new RuntimeException( - "internal error: expecting a PValue to have a producingTransform"); - } - if (!producer.isCompositeNode()) { - // Verify that primitive transform outputs are already computed. - getPValue(value); - } - } - - /** - * Sets the value of the given PValue. - * Throws an exception if the PValue's value has already been set. - */ - void setPValue(PValue pvalue, Object contents) { - if (store.containsKey(pvalue)) { - throw new IllegalStateException( - "internal error: setting the value of " + pvalue - + " more than once"); - } - store.put(pvalue, contents); - } - - /** - * Retrieves the value of the given PValue. - * Throws an exception if the PValue's value hasn't already been set. - */ - Object getPValue(PValue pvalue) { - if (!store.containsKey(pvalue)) { - throw new IllegalStateException( - "internal error: getting the value of " + pvalue - + " before it has been computed"); - } - return store.get(pvalue); - } - - /** - * Convert a list of T to a list of {@code ValueWithMetadata<T>}, with a timestamp of 0 - * and null windows. - */ - <T> List<ValueWithMetadata<T>> toValueWithMetadata(List<T> values) { - List<ValueWithMetadata<T>> result = new ArrayList<>(values.size()); - for (T value : values) { - result.add(ValueWithMetadata.of(WindowedValue.valueInGlobalWindow(value))); - } - return result; - } - - /** - * Convert a list of {@code WindowedValue<T>} to a list of {@code ValueWithMetadata<T>}. - */ - <T> List<ValueWithMetadata<T>> toValueWithMetadataFromWindowedValue( - List<WindowedValue<T>> values) { - List<ValueWithMetadata<T>> result = new ArrayList<>(values.size()); - for (WindowedValue<T> value : values) { - result.add(ValueWithMetadata.of(value)); - } - return result; - } - - @Override - public <T> void setPCollection(PCollection<T> pc, List<T> elements) { - setPCollectionValuesWithMetadata(pc, toValueWithMetadata(elements)); - } - - @Override - public <T> void setPCollectionWindowedValue( - PCollection<T> pc, List<WindowedValue<T>> elements) { - setPCollectionValuesWithMetadata(pc, toValueWithMetadataFromWindowedValue(elements)); - } - - @Override - public <T> void setPCollectionValuesWithMetadata( - PCollection<T> pc, List<ValueWithMetadata<T>> elements) { - LOG.debug("Setting {} = {}", pc, elements); - ensurePCollectionEncodable(pc, elements); - setPValue(pc, elements); - } - - @Override - public <ElemT, T, WindowedT> void setPCollectionView( - PCollectionView<T> view, - Iterable<WindowedValue<ElemT>> value) { - LOG.debug("Setting {} = {}", view, value); - setPValue(view, value); - } - - /** - * Retrieves the value of the given {@link PCollection}. - * Throws an exception if the {@link PCollection}'s value hasn't already been set. - */ - @Override - public <T> List<T> getPCollection(PCollection<T> pc) { - List<T> result = new ArrayList<>(); - for (ValueWithMetadata<T> elem : getPCollectionValuesWithMetadata(pc)) { - result.add(elem.getValue()); - } - return result; - } - - @Override - public <T> List<WindowedValue<T>> getPCollectionWindowedValues(PCollection<T> pc) { - return Lists.transform( - getPCollectionValuesWithMetadata(pc), - new Function<ValueWithMetadata<T>, WindowedValue<T>>() { - @Override - public WindowedValue<T> apply(ValueWithMetadata<T> input) { - return input.getWindowedValue(); - }}); - } - - @Override - public <T> List<ValueWithMetadata<T>> getPCollectionValuesWithMetadata(PCollection<T> pc) { - List<ValueWithMetadata<T>> elements = (List<ValueWithMetadata<T>>) getPValue(pc); - elements = randomizeIfUnordered(elements, false /* not inPlaceAllowed */); - LOG.debug("Getting {} = {}", pc, elements); - return elements; - } - - @Override - public <T> List<List<T>> getPCollectionList(PCollectionList<T> pcs) { - List<List<T>> elementsList = new ArrayList<>(); - for (PCollection<T> pc : pcs.getAll()) { - elementsList.add(getPCollection(pc)); - } - return elementsList; - } - - /** - * Retrieves the value indicated by the given {@link PCollectionView}. - * Note that within the {@link DoFnContext} a {@link PCollectionView} - * converts from this representation to a suitable side input value. - */ - @Override - public <T, WindowedT> Iterable<WindowedValue<?>> getPCollectionView(PCollectionView<T> view) { - Iterable<WindowedValue<?>> value = (Iterable<WindowedValue<?>>) getPValue(view); - LOG.debug("Getting {} = {}", view, value); - return value; - } - - /** - * If {@code testEncodability}, ensures that the {@link PCollection}'s coder and elements are - * encodable and decodable by encoding them and decoding them, and returning the result. - * Otherwise returns the argument elements. - */ - <T> List<ValueWithMetadata<T>> ensurePCollectionEncodable( - PCollection<T> pc, List<ValueWithMetadata<T>> elements) { - ensureCoderSerializable(pc.getCoder()); - if (!testEncodability) { - return elements; - } - List<ValueWithMetadata<T>> elementsCopy = new ArrayList<>(elements.size()); - for (ValueWithMetadata<T> element : elements) { - elementsCopy.add( - element.withValue(ensureElementEncodable(pc, element.getValue()))); - } - return elementsCopy; - } - - @Override - public <T> T ensureElementEncodable(TypedPValue<T> pvalue, T element) { - return ensureSerializableByCoder( - pvalue.getCoder(), element, "Within " + pvalue.toString()); - } - - @Override - public <T> List<T> randomizeIfUnordered(List<T> elements, - boolean inPlaceAllowed) { - if (!testUnorderedness) { - return elements; - } - List<T> elementsCopy = new ArrayList<>(elements); - Collections.shuffle(elementsCopy, rand); - return elementsCopy; - } - - @Override - public <FunctionT extends Serializable> FunctionT ensureSerializable(FunctionT fn) { - if (!testSerializability) { - return fn; - } - return SerializableUtils.ensureSerializable(fn); - } - - @Override - public <T> Coder<T> ensureCoderSerializable(Coder<T> coder) { - if (testSerializability) { - SerializableUtils.ensureSerializable(coder); - } - return coder; - } - - @Override - public <T> T ensureSerializableByCoder( - Coder<T> coder, T value, String errorContext) { - if (testSerializability) { - return SerializableUtils.ensureSerializableByCoder( - coder, value, errorContext); - } - return value; - } - - @Override - public CounterSet.AddCounterMutator getAddCounterMutator() { - return counters.getAddCounterMutator(); - } - - @Override - public String getStepName(PTransform<?, ?> transform) { - String stepName = stepNames.get(transform); - if (stepName == null) { - stepName = "s" + (stepNames.size() + 1); - stepNames.put(transform, stepName); - } - return stepName; - } - - /** - * Returns the CounterSet generated during evaluation, which includes - * user-defined Aggregators and may include system-defined counters. - */ - public CounterSet getCounters() { - return counters; - } - - /** - * Returns JobState.DONE in all situations. The Evaluator is not returned - * until the pipeline has been traversed, so it will either be returned - * after a successful run or the run call will terminate abnormally. - */ - @Override - public State getState() { - return State.DONE; - } - - @Override - public <T> AggregatorValues<T> getAggregatorValues(Aggregator<?, T> aggregator) { - Map<String, T> stepValues = new HashMap<>(); - for (PTransform<?, ?> step : aggregatorSteps.get(aggregator)) { - String stepName = String.format("user-%s-%s", stepNames.get(step), aggregator.getName()); - String fullName = fullNames.get(step); - Counter<?> counter = counters.getExistingCounter(stepName); - if (counter == null) { - throw new IllegalArgumentException( - "Aggregator " + aggregator + " is not used in this pipeline"); - } - stepValues.put(fullName, (T) counter.getAggregate()); - } - return new MapAggregatorValues<>(stepValues); - } - } - - ///////////////////////////////////////////////////////////////////////////// - - /** - * The key by which GBK groups inputs - elements are grouped by the encoded form of the key, - * but the original key may be accessed as well. - */ - private static class GroupingKey<K> { - private K key; - private byte[] encodedKey; - - public GroupingKey(K key, byte[] encodedKey) { - this.key = key; - this.encodedKey = encodedKey; - } - - public K getKey() { - return key; - } - - @Override - public boolean equals(Object o) { - if (o instanceof GroupingKey) { - GroupingKey<?> that = (GroupingKey<?>) o; - return Arrays.equals(this.encodedKey, that.encodedKey); - } else { - return false; - } - } - - @Override - public int hashCode() { - return Arrays.hashCode(encodedKey); - } - } - - private final DirectPipelineOptions options; - private boolean testSerializability; - private boolean testEncodability; - private boolean testUnorderedness; - - /** Returns a new DirectPipelineRunner. */ - private DirectPipelineRunner(DirectPipelineOptions options) { - this.options = options; - // (Re-)register standard IO factories. Clobbers any prior credentials. - IOChannelUtils.registerStandardIOFactories(options); - long randomSeed; - if (options.getDirectPipelineRunnerRandomSeed() != null) { - randomSeed = options.getDirectPipelineRunnerRandomSeed(); - } else { - randomSeed = new Random().nextLong(); - } - - LOG.debug("DirectPipelineRunner using random seed {}.", randomSeed); - rand = new Random(randomSeed); - - testSerializability = options.isTestSerializability(); - testEncodability = options.isTestEncodability(); - testUnorderedness = options.isTestUnorderedness(); - } - - /** - * Get the options used in this {@link Pipeline}. - */ - public DirectPipelineOptions getPipelineOptions() { - return options; - } - - @Override - public String toString() { - return "DirectPipelineRunner#" + hashCode(); - } - - public static <K, V> void evaluateGroupByKeyOnly( - GroupByKeyOnly<K, V> transform, - EvaluationContext context) { - PCollection<KV<K, V>> input = context.getInput(transform); - - List<ValueWithMetadata<KV<K, V>>> inputElems = - context.getPCollectionValuesWithMetadata(input); - - Coder<K> keyCoder = GroupByKey.getKeyCoder(input.getCoder()); - - Map<GroupingKey<K>, List<V>> groupingMap = new HashMap<>(); - - for (ValueWithMetadata<KV<K, V>> elem : inputElems) { - K key = elem.getValue().getKey(); - V value = elem.getValue().getValue(); - byte[] encodedKey; - try { - encodedKey = encodeToByteArray(keyCoder, key); - } catch (CoderException exn) { - // TODO: Put in better element printing: - // truncate if too long. - throw new IllegalArgumentException( - "unable to encode key " + key + " of input to " + transform - + " using " + keyCoder, - exn); - } - GroupingKey<K> groupingKey = - new GroupingKey<>(key, encodedKey); - List<V> values = groupingMap.get(groupingKey); - if (values == null) { - values = new ArrayList<V>(); - groupingMap.put(groupingKey, values); - } - values.add(value); - } - - List<ValueWithMetadata<KV<K, Iterable<V>>>> outputElems = - new ArrayList<>(); - for (Map.Entry<GroupingKey<K>, List<V>> entry : groupingMap.entrySet()) { - GroupingKey<K> groupingKey = entry.getKey(); - K key = groupingKey.getKey(); - List<V> values = entry.getValue(); - values = context.randomizeIfUnordered(values, true /* inPlaceAllowed */); - outputElems.add(ValueWithMetadata - .of(WindowedValue.valueInEmptyWindows(KV.<K, Iterable<V>>of(key, values))) - .withKey(key)); - } - - context.setPCollectionValuesWithMetadata(context.getOutput(transform), - outputElems); - } - - @SuppressWarnings({"rawtypes", "unchecked"}) - public - static <K, V> void registerGroupByKeyOnly() { - registerDefaultTransformEvaluator( - GroupByKeyOnly.class, - new TransformEvaluator<GroupByKeyOnly>() { - @Override - public void evaluate( - GroupByKeyOnly transform, - EvaluationContext context) { - evaluateGroupByKeyOnly(transform, context); - } - }); - } - - static { - registerGroupByKeyOnly(); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/99654ca4/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Flatten.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Flatten.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Flatten.java index 7c6fed3..93917f3 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Flatten.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Flatten.java @@ -20,16 +20,12 @@ package org.apache.beam.sdk.transforms; import org.apache.beam.sdk.coders.CannotProvideCoderException; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.IterableLikeCoder; -import org.apache.beam.sdk.runners.DirectPipelineRunner; import org.apache.beam.sdk.transforms.windowing.WindowFn; import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollection.IsBounded; import org.apache.beam.sdk.values.PCollectionList; -import java.util.ArrayList; -import java.util.List; - /** * {@code Flatten<T>} takes multiple {@code PCollection<T>}s bundled * into a {@code PCollectionList<T>} and returns a single @@ -189,32 +185,4 @@ public class Flatten { .setCoder(elemCoder); } } - - ///////////////////////////////////////////////////////////////////////////// - - static { - DirectPipelineRunner.registerDefaultTransformEvaluator( - FlattenPCollectionList.class, - new DirectPipelineRunner.TransformEvaluator<FlattenPCollectionList>() { - @Override - public void evaluate( - FlattenPCollectionList transform, - DirectPipelineRunner.EvaluationContext context) { - evaluateHelper(transform, context); - } - }); - } - - private static <T> void evaluateHelper( - FlattenPCollectionList<T> transform, - DirectPipelineRunner.EvaluationContext context) { - List<DirectPipelineRunner.ValueWithMetadata<T>> outputElems = new ArrayList<>(); - PCollectionList<T> inputs = context.getInput(transform); - - for (PCollection<T> input : inputs.getAll()) { - outputElems.addAll(context.getPCollectionValuesWithMetadata(input)); - } - - context.setPCollectionValuesWithMetadata(context.getOutput(transform), outputElems); - } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/99654ca4/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java index 511f0d8..cb7d372 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java @@ -21,27 +21,12 @@ import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.coders.CannotProvideCoderException; import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.CoderException; -import org.apache.beam.sdk.runners.DirectPipelineRunner; import org.apache.beam.sdk.runners.PipelineRunner; import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.transforms.display.DisplayData.Builder; import org.apache.beam.sdk.transforms.windowing.WindowFn; -import org.apache.beam.sdk.util.DirectModeExecutionContext; -import org.apache.beam.sdk.util.DirectSideInputReader; -import org.apache.beam.sdk.util.DoFnRunner; -import org.apache.beam.sdk.util.DoFnRunnerBase; -import org.apache.beam.sdk.util.DoFnRunners; -import org.apache.beam.sdk.util.IllegalMutationException; -import org.apache.beam.sdk.util.MutationDetector; -import org.apache.beam.sdk.util.MutationDetectors; -import org.apache.beam.sdk.util.PTuple; import org.apache.beam.sdk.util.SerializableUtils; -import org.apache.beam.sdk.util.SideInputReader; import org.apache.beam.sdk.util.StringUtils; -import org.apache.beam.sdk.util.UserCodeException; -import org.apache.beam.sdk.util.WindowedValue; -import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionTuple; import org.apache.beam.sdk.values.PCollectionView; @@ -50,16 +35,10 @@ import org.apache.beam.sdk.values.TupleTagList; import org.apache.beam.sdk.values.TypedPValue; import com.google.common.collect.ImmutableList; -import com.google.common.collect.Maps; import java.io.Serializable; import java.util.Arrays; -import java.util.Collections; import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentMap; - -import javax.annotation.Nullable; /** * {@link ParDo} is the core element-wise transform in Google Cloud @@ -84,7 +63,7 @@ import javax.annotation.Nullable; * <p>Conceptually, when a {@link ParDo} transform is executed, the * elements of the input {@link PCollection} are first divided up * into some number of "bundles". These are farmed off to distributed - * worker machines (or run locally, if using the {@link DirectPipelineRunner}). + * worker machines (or run locally, if using the {@code DirectRunner}). * For each bundle of input elements processing proceeds as follows: * * <ol> @@ -1072,288 +1051,11 @@ public class ParDo { } } - ///////////////////////////////////////////////////////////////////////////// - - static { - DirectPipelineRunner.registerDefaultTransformEvaluator( - Bound.class, - new DirectPipelineRunner.TransformEvaluator<Bound>() { - @Override - public void evaluate( - Bound transform, - DirectPipelineRunner.EvaluationContext context) { - evaluateSingleHelper(transform, context); - } - }); - } - - private static <InputT, OutputT> void evaluateSingleHelper( - Bound<InputT, OutputT> transform, - DirectPipelineRunner.EvaluationContext context) { - TupleTag<OutputT> mainOutputTag = new TupleTag<>("out"); - - DirectModeExecutionContext executionContext = DirectModeExecutionContext.create(); - - PCollectionTuple outputs = PCollectionTuple.of(mainOutputTag, context.getOutput(transform)); - - evaluateHelper( - transform.fn, - context.getStepName(transform), - context.getInput(transform), - transform.sideInputs, - mainOutputTag, - Collections.<TupleTag<?>>emptyList(), - outputs, - context, - executionContext); - - context.setPCollectionValuesWithMetadata( - context.getOutput(transform), - executionContext.getOutput(mainOutputTag)); - } - - ///////////////////////////////////////////////////////////////////////////// - - static { - DirectPipelineRunner.registerDefaultTransformEvaluator( - BoundMulti.class, - new DirectPipelineRunner.TransformEvaluator<BoundMulti>() { - @Override - public void evaluate( - BoundMulti transform, - DirectPipelineRunner.EvaluationContext context) { - evaluateMultiHelper(transform, context); - } - }); - } - - private static <InputT, OutputT> void evaluateMultiHelper( - BoundMulti<InputT, OutputT> transform, - DirectPipelineRunner.EvaluationContext context) { - - DirectModeExecutionContext executionContext = DirectModeExecutionContext.create(); - - evaluateHelper( - transform.fn, - context.getStepName(transform), - context.getInput(transform), - transform.sideInputs, - transform.mainOutputTag, - transform.sideOutputTags.getAll(), - context.getOutput(transform), - context, - executionContext); - - for (Map.Entry<TupleTag<?>, PCollection<?>> entry - : context.getOutput(transform).getAll().entrySet()) { - @SuppressWarnings("unchecked") - TupleTag<Object> tag = (TupleTag<Object>) entry.getKey(); - @SuppressWarnings("unchecked") - PCollection<Object> pc = (PCollection<Object>) entry.getValue(); - - context.setPCollectionValuesWithMetadata( - pc, - (tag == transform.mainOutputTag - ? executionContext.getOutput(tag) - : executionContext.getSideOutput(tag))); - } - } - - /** - * Evaluates a single-output or multi-output {@link ParDo} directly. - * - * <p>This evaluation method is intended for use in testing scenarios; it is designed for clarity - * and correctness-checking, not speed. - * - * <p>Of particular note, this performs best-effort checking that inputs and outputs are not - * mutated in violation of the requirements upon a {@link DoFn}. - */ - private static <InputT, OutputT, ActualInputT extends InputT> void evaluateHelper( - DoFn<InputT, OutputT> doFn, - String stepName, - PCollection<ActualInputT> input, - List<PCollectionView<?>> sideInputs, - TupleTag<OutputT> mainOutputTag, - List<TupleTag<?>> sideOutputTags, - PCollectionTuple outputs, - DirectPipelineRunner.EvaluationContext context, - DirectModeExecutionContext executionContext) { - // TODO: Run multiple shards? - DoFn<InputT, OutputT> fn = context.ensureSerializable(doFn); - - SideInputReader sideInputReader = makeSideInputReader(context, sideInputs); - - // When evaluating via the DirectPipelineRunner, this output manager checks each output for - // illegal mutations when the next output comes along. We then verify again after finishBundle() - // The common case we expect this to catch is a user mutating an input in order to repeatedly - // emit "variations". - ImmutabilityCheckingOutputManager<ActualInputT> outputManager = - new ImmutabilityCheckingOutputManager<>( - fn.getClass().getSimpleName(), - new DoFnRunnerBase.ListOutputManager(), - outputs); - - DoFnRunner<InputT, OutputT> fnRunner = - DoFnRunners.createDefault( - context.getPipelineOptions(), - fn, - sideInputReader, - outputManager, - mainOutputTag, - sideOutputTags, - executionContext.getOrCreateStepContext(stepName, stepName), - context.getAddCounterMutator(), - input.getWindowingStrategy()); - - fnRunner.startBundle(); - - for (DirectPipelineRunner.ValueWithMetadata<ActualInputT> elem - : context.getPCollectionValuesWithMetadata(input)) { - if (elem.getValue() instanceof KV) { - // In case the DoFn needs keyed state, set the implicit keys to the keys - // in the input elements. - @SuppressWarnings("unchecked") - KV<?, ?> kvElem = (KV<?, ?>) elem.getValue(); - executionContext.setKey(kvElem.getKey()); - } else { - executionContext.setKey(elem.getKey()); - } - - // We check the input for mutations only through the call span of processElement. - // This will miss some cases, but the check is ad hoc and best effort. The common case - // is that the input is mutated to be used for output. - try { - MutationDetector inputMutationDetector = MutationDetectors.forValueWithCoder( - elem.getWindowedValue().getValue(), input.getCoder()); - @SuppressWarnings("unchecked") - WindowedValue<InputT> windowedElem = ((WindowedValue<InputT>) elem.getWindowedValue()); - fnRunner.processElement(windowedElem); - inputMutationDetector.verifyUnmodified(); - } catch (CoderException e) { - throw UserCodeException.wrap(e); - } catch (IllegalMutationException exn) { - throw new IllegalMutationException( - String.format("DoFn %s mutated input value %s of class %s (new value was %s)." - + " Input values must not be mutated in any way.", - fn.getClass().getSimpleName(), - exn.getSavedValue(), exn.getSavedValue().getClass(), exn.getNewValue()), - exn.getSavedValue(), - exn.getNewValue(), - exn); - } - } - - // Note that the input could have been retained and mutated prior to this final output, - // but for now it degrades readability too much to be worth trying to catch that particular - // corner case. - fnRunner.finishBundle(); - outputManager.verifyLatestOutputsUnmodified(); - } - - private static SideInputReader makeSideInputReader( - DirectPipelineRunner.EvaluationContext context, List<PCollectionView<?>> sideInputs) { - PTuple sideInputValues = PTuple.empty(); - for (PCollectionView<?> view : sideInputs) { - sideInputValues = sideInputValues.and( - view.getTagInternal(), - context.getPCollectionView(view)); - } - return DirectSideInputReader.of(sideInputValues); - } - private static void populateDisplayData( DisplayData.Builder builder, DoFn<?, ?> fn, Class<?> fnClass) { builder .include(fn) .add(DisplayData.item("fn", fnClass) - .withLabel("Transform Function")); - } - - /** - * A {@code DoFnRunner.OutputManager} that provides facilities for checking output values for - * illegal mutations. - * - * <p>When used via the try-with-resources pattern, it is guaranteed that every value passed - * to {@link #output} will have been checked for illegal mutation. - */ - private static class ImmutabilityCheckingOutputManager<InputT> - implements DoFnRunners.OutputManager, AutoCloseable { - - private final DoFnRunners.OutputManager underlyingOutputManager; - private final ConcurrentMap<TupleTag<?>, MutationDetector> mutationDetectorForTag; - private final PCollectionTuple outputs; - private String doFnName; - - public ImmutabilityCheckingOutputManager( - String doFnName, - DoFnRunners.OutputManager underlyingOutputManager, - PCollectionTuple outputs) { - this.doFnName = doFnName; - this.underlyingOutputManager = underlyingOutputManager; - this.outputs = outputs; - this.mutationDetectorForTag = Maps.newConcurrentMap(); - } - - @Override - public <T> void output(TupleTag<T> tag, WindowedValue<T> output) { - - // Skip verifying undeclared outputs, since we don't have coders for them. - if (outputs.has(tag)) { - try { - MutationDetector newDetector = - MutationDetectors.forValueWithCoder( - output.getValue(), outputs.get(tag).getCoder()); - MutationDetector priorDetector = mutationDetectorForTag.put(tag, newDetector); - verifyOutputUnmodified(priorDetector); - } catch (CoderException e) { - throw UserCodeException.wrap(e); - } - } - - // Actually perform the output. - underlyingOutputManager.output(tag, output); - } - - /** - * Throws {@link IllegalMutationException} if the prior output for any tag has been mutated - * since being output. - */ - public void verifyLatestOutputsUnmodified() { - for (MutationDetector detector : mutationDetectorForTag.values()) { - verifyOutputUnmodified(detector); - } - } - - /** - * Adapts the error message from the provided {@code detector}. - * - * <p>The {@code detector} may be null, in which case no check is performed. This is merely - * to consolidate null checking to this method. - */ - private <T> void verifyOutputUnmodified(@Nullable MutationDetector detector) { - if (detector == null) { - return; - } - - try { - detector.verifyUnmodified(); - } catch (IllegalMutationException exn) { - throw new IllegalMutationException(String.format( - "DoFn %s mutated value %s after it was output (new value was %s)." - + " Values must not be mutated in any way after being output.", - doFnName, exn.getSavedValue(), exn.getNewValue()), - exn.getSavedValue(), exn.getNewValue(), - exn); - } - } - - /** - * When used in a {@code try}-with-resources block, verifies all of the latest outputs upon - * {@link #close()}. - */ - @Override - public void close() { - verifyLatestOutputsUnmodified(); - } + .withLabel("Transform Function")); } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/99654ca4/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java index 3df915b..7a97c13 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java @@ -17,10 +17,8 @@ */ package org.apache.beam.sdk.transforms; -import org.apache.beam.sdk.runners.DirectPipelineRunner; import org.apache.beam.sdk.runners.PipelineRunner; import org.apache.beam.sdk.util.PCollectionViews; -import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionView; @@ -445,27 +443,5 @@ public class View { public PCollectionView<ViewT> apply(PCollection<ElemT> input) { return view; } - - static { - DirectPipelineRunner.registerDefaultTransformEvaluator( - CreatePCollectionView.class, - new DirectPipelineRunner.TransformEvaluator<CreatePCollectionView>() { - @SuppressWarnings("rawtypes") - @Override - public void evaluate( - CreatePCollectionView transform, - DirectPipelineRunner.EvaluationContext context) { - evaluateTyped(transform, context); - } - - private <ElemT, ViewT> void evaluateTyped( - CreatePCollectionView<ElemT, ViewT> transform, - DirectPipelineRunner.EvaluationContext context) { - List<WindowedValue<ElemT>> elems = - context.getPCollectionWindowedValues(context.getInput(transform)); - context.setPCollectionView(context.getOutput(transform), elems); - } - }); - } } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/99654ca4/sdks/java/core/src/main/java/org/apache/beam/sdk/util/DirectModeExecutionContext.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/DirectModeExecutionContext.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/DirectModeExecutionContext.java deleted file mode 100644 index 85e36dd..0000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/DirectModeExecutionContext.java +++ /dev/null @@ -1,130 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.util; - -import static com.google.common.base.Preconditions.checkNotNull; - -import org.apache.beam.sdk.runners.DirectPipelineRunner.ValueWithMetadata; -import org.apache.beam.sdk.util.state.InMemoryStateInternals; -import org.apache.beam.sdk.util.state.StateInternals; -import org.apache.beam.sdk.values.TupleTag; - -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; - -import java.util.List; -import java.util.Map; - -/** - * {@link ExecutionContext} for use in direct mode. - */ -public class DirectModeExecutionContext - extends BaseExecutionContext<DirectModeExecutionContext.StepContext> { - - private Object key; - private List<ValueWithMetadata<?>> output = Lists.newArrayList(); - private Map<TupleTag<?>, List<ValueWithMetadata<?>>> sideOutputs = Maps.newHashMap(); - - protected DirectModeExecutionContext() {} - - public static DirectModeExecutionContext create() { - return new DirectModeExecutionContext(); - } - - @Override - protected StepContext createStepContext(String stepName, String transformName) { - return new StepContext(this, stepName, transformName); - } - - public Object getKey() { - return key; - } - - public void setKey(Object newKey) { - // The direct mode runner may reorder elements, so we need to keep - // around the state used for each key. - for (ExecutionContext.StepContext stepContext : getAllStepContexts()) { - ((StepContext) stepContext).switchKey(newKey); - } - key = newKey; - } - - @Override - public void noteOutput(WindowedValue<?> outputElem) { - output.add(ValueWithMetadata.of(outputElem).withKey(getKey())); - } - - @Override - public void noteSideOutput(TupleTag<?> tag, WindowedValue<?> outputElem) { - List<ValueWithMetadata<?>> output = sideOutputs.get(tag); - if (output == null) { - output = Lists.newArrayList(); - sideOutputs.put(tag, output); - } - output.add(ValueWithMetadata.of(outputElem).withKey(getKey())); - } - - public <T> List<ValueWithMetadata<T>> getOutput(@SuppressWarnings("unused") TupleTag<T> tag) { - @SuppressWarnings({"unchecked", "rawtypes"}) // Cast not expressible without rawtypes - List<ValueWithMetadata<T>> typedOutput = (List) output; - return typedOutput; - } - - public <T> List<ValueWithMetadata<T>> getSideOutput(TupleTag<T> tag) { - if (sideOutputs.containsKey(tag)) { - @SuppressWarnings({"unchecked", "rawtypes"}) // Cast not expressible without rawtypes - List<ValueWithMetadata<T>> typedOutput = (List) sideOutputs.get(tag); - return typedOutput; - } else { - return Lists.newArrayList(); - } - } - - /** - * {@link ExecutionContext.StepContext} used in direct mode. - */ - public static class StepContext extends BaseExecutionContext.StepContext { - - /** A map from each key to the state associated with it. */ - private final Map<Object, InMemoryStateInternals<Object>> stateInternals = Maps.newHashMap(); - private InMemoryStateInternals<Object> currentStateInternals = null; - - private StepContext(ExecutionContext executionContext, String stepName, String transformName) { - super(executionContext, stepName, transformName); - switchKey(null); - } - - public void switchKey(Object newKey) { - currentStateInternals = stateInternals.get(newKey); - if (currentStateInternals == null) { - currentStateInternals = InMemoryStateInternals.forKey(newKey); - stateInternals.put(newKey, currentStateInternals); - } - } - - @Override - public StateInternals<Object> stateInternals() { - return checkNotNull(currentStateInternals); - } - - @Override - public TimerInternals timerInternals() { - throw new UnsupportedOperationException("Direct mode cannot return timerInternals"); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/99654ca4/sdks/java/core/src/main/java/org/apache/beam/sdk/util/DoFnRunnerBase.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/DoFnRunnerBase.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/DoFnRunnerBase.java index 75861fe..58b10a7 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/DoFnRunnerBase.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/DoFnRunnerBase.java @@ -20,7 +20,6 @@ package org.apache.beam.sdk.util; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.IterableCoder; import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.runners.DirectPipelineRunner; import org.apache.beam.sdk.transforms.Aggregator; import org.apache.beam.sdk.transforms.Combine.CombineFn; import org.apache.beam.sdk.transforms.DoFn;