Reduce visibility of DirectRunner classes Move inner classes of the DirectRunner to reduce total API Surface.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/9edd8599 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/9edd8599 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/9edd8599 Branch: refs/heads/DSL_SQL Commit: 9edd8599c28228cd5d7d5df1084f7e63684964d8 Parents: b263cb7 Author: Thomas Groh <tg...@google.com> Authored: Wed Apr 26 13:38:37 2017 -0700 Committer: Thomas Groh <tg...@google.com> Committed: Wed May 3 14:12:05 2017 -0700 ---------------------------------------------------------------------- .../direct/AbstractModelEnforcement.java | 1 - .../direct/BoundedReadEvaluatorFactory.java | 2 - .../beam/runners/direct/BundleFactory.java | 3 +- .../org/apache/beam/runners/direct/Clock.java | 2 +- .../runners/direct/CloningBundleFactory.java | 2 - .../beam/runners/direct/CommittedBundle.java | 82 +++++++++++++ .../beam/runners/direct/CommittedResult.java | 1 - .../beam/runners/direct/CompletionCallback.java | 1 - .../CopyOnAccessInMemoryStateInternals.java | 2 +- .../apache/beam/runners/direct/DirectGraph.java | 12 +- .../beam/runners/direct/DirectMetrics.java | 1 - .../beam/runners/direct/DirectRunner.java | 122 ++----------------- .../beam/runners/direct/EmptyInputProvider.java | 1 - .../runners/direct/EmptyTransformEvaluator.java | 50 -------- .../beam/runners/direct/EvaluationContext.java | 3 - .../runners/direct/ExecutorServiceFactory.java | 2 +- .../direct/ExecutorServiceParallelExecutor.java | 1 - .../runners/direct/FlattenEvaluatorFactory.java | 2 - .../GroupAlsoByWindowEvaluatorFactory.java | 2 - .../direct/GroupByKeyOnlyEvaluatorFactory.java | 2 - .../ImmutabilityCheckingBundleFactory.java | 2 - .../direct/ImmutabilityEnforcementFactory.java | 1 - .../direct/ImmutableListBundleFactory.java | 2 - .../beam/runners/direct/ModelEnforcement.java | 10 +- .../runners/direct/ModelEnforcementFactory.java | 3 +- .../beam/runners/direct/NanosOffsetClock.java | 2 +- .../runners/direct/PCollectionViewWriter.java | 34 ++++++ .../beam/runners/direct/ParDoEvaluator.java | 1 - .../runners/direct/ParDoEvaluatorFactory.java | 1 - .../direct/PassthroughTransformEvaluator.java | 1 - .../beam/runners/direct/PipelineExecutor.java | 1 - .../beam/runners/direct/RootInputProvider.java | 1 - .../runners/direct/RootProviderRegistry.java | 1 - ...littableProcessElementsEvaluatorFactory.java | 1 - .../direct/StatefulParDoEvaluatorFactory.java | 1 - .../runners/direct/StepTransformResult.java | 3 +- .../direct/TestStreamEvaluatorFactory.java | 2 - .../beam/runners/direct/TransformEvaluator.java | 3 +- .../direct/TransformEvaluatorFactory.java | 6 +- .../direct/TransformEvaluatorRegistry.java | 1 - .../beam/runners/direct/TransformExecutor.java | 1 - .../beam/runners/direct/TransformResult.java | 3 +- .../direct/UnboundedReadEvaluatorFactory.java | 2 - .../beam/runners/direct/UncommittedBundle.java | 57 +++++++++ .../runners/direct/ViewEvaluatorFactory.java | 3 +- .../beam/runners/direct/WatermarkManager.java | 3 +- .../runners/direct/WindowEvaluatorFactory.java | 2 - .../direct/BoundedReadEvaluatorFactoryTest.java | 2 - .../direct/CloningBundleFactoryTest.java | 2 - .../runners/direct/CommittedResultTest.java | 12 +- .../beam/runners/direct/DirectMetricsTest.java | 1 - .../runners/direct/EvaluationContextTest.java | 3 - .../direct/FlattenEvaluatorFactoryTest.java | 2 - .../direct/GroupByKeyEvaluatorFactoryTest.java | 2 - .../GroupByKeyOnlyEvaluatorFactoryTest.java | 2 - .../ImmutabilityCheckingBundleFactoryTest.java | 2 - .../ImmutabilityEnforcementFactoryTest.java | 1 - .../direct/ImmutableListBundleFactoryTest.java | 2 - .../beam/runners/direct/ParDoEvaluatorTest.java | 1 - .../StatefulParDoEvaluatorFactoryTest.java | 2 - .../runners/direct/StepTransformResultTest.java | 1 - .../direct/TestStreamEvaluatorFactoryTest.java | 1 - .../runners/direct/TransformExecutorTest.java | 1 - .../UnboundedReadEvaluatorFactoryTest.java | 2 - .../direct/ViewEvaluatorFactoryTest.java | 2 - .../runners/direct/WatermarkManagerTest.java | 2 - .../direct/WindowEvaluatorFactoryTest.java | 2 - 67 files changed, 214 insertions(+), 273 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/9edd8599/runners/direct-java/src/main/java/org/apache/beam/runners/direct/AbstractModelEnforcement.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/AbstractModelEnforcement.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/AbstractModelEnforcement.java index f09164b..40faf5a 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/AbstractModelEnforcement.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/AbstractModelEnforcement.java @@ -17,7 +17,6 @@ */ package org.apache.beam.runners.direct; -import org.apache.beam.runners.direct.DirectRunner.CommittedBundle; import org.apache.beam.sdk.util.WindowedValue; /** http://git-wip-us.apache.org/repos/asf/beam/blob/9edd8599/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java index 0c2afe8..26f9851 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java @@ -31,8 +31,6 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import javax.annotation.Nullable; -import org.apache.beam.runners.direct.DirectRunner.CommittedBundle; -import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle; import org.apache.beam.runners.direct.StepTransformResult.Builder; import org.apache.beam.sdk.io.BoundedSource; import org.apache.beam.sdk.io.BoundedSource.BoundedReader; http://git-wip-us.apache.org/repos/asf/beam/blob/9edd8599/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BundleFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BundleFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BundleFactory.java index b1cb9b1..e39b5d8 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BundleFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BundleFactory.java @@ -18,14 +18,13 @@ package org.apache.beam.runners.direct; import org.apache.beam.runners.direct.DirectGroupByKey.DirectGroupByKeyOnly; -import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.values.PCollection; /** * A factory that creates {@link UncommittedBundle UncommittedBundles}. */ -public interface BundleFactory { +interface BundleFactory { /** * Create an {@link UncommittedBundle} from an empty input. Elements added to the bundle do not * belong to a {@link PCollection}. http://git-wip-us.apache.org/repos/asf/beam/blob/9edd8599/runners/direct-java/src/main/java/org/apache/beam/runners/direct/Clock.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/Clock.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/Clock.java index 88f8aab..1a93c62 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/Clock.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/Clock.java @@ -22,7 +22,7 @@ import org.joda.time.Instant; /** * Access to the current time. */ -public interface Clock { +interface Clock { /** * Returns the current time as an {@link Instant}. */ http://git-wip-us.apache.org/repos/asf/beam/blob/9edd8599/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CloningBundleFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CloningBundleFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CloningBundleFactory.java index 33241e3..68b059f 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CloningBundleFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CloningBundleFactory.java @@ -18,8 +18,6 @@ package org.apache.beam.runners.direct; -import org.apache.beam.runners.direct.DirectRunner.CommittedBundle; -import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.CoderException; import org.apache.beam.sdk.util.CoderUtils; http://git-wip-us.apache.org/repos/asf/beam/blob/9edd8599/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CommittedBundle.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CommittedBundle.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CommittedBundle.java new file mode 100644 index 0000000..79a96fe --- /dev/null +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CommittedBundle.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.direct; + +import javax.annotation.Nullable; +import org.apache.beam.runners.core.TimerInternals.TimerData; +import org.apache.beam.sdk.transforms.GroupByKey; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.values.PCollection; +import org.joda.time.Instant; + +/** + * Part of a {@link PCollection}. Elements are output to an {@link UncommittedBundle}, which will + * eventually committed. Committed elements are executed by the {@link PTransform PTransforms} + * that consume the {@link PCollection} this bundle is + * a part of at a later point. + * @param <T> the type of elements contained within this bundle + */ +interface CommittedBundle<T> { + /** + * Returns the PCollection that the elements of this bundle belong to. + */ + @Nullable + PCollection<T> getPCollection(); + + /** + * Returns the key that was output in the most recent {@link GroupByKey} in the + * execution of this bundle. + */ + StructuralKey<?> getKey(); + + /** + * Returns an {@link Iterable} containing all of the elements that have been added to this + * {@link CommittedBundle}. + */ + Iterable<WindowedValue<T>> getElements(); + + /** + * Returns the minimum timestamp among all of the elements of this {@link CommittedBundle}. + */ + Instant getMinTimestamp(); + + /** + * Returns the processing time output watermark at the time the producing {@link PTransform} + * committed this bundle. Downstream synchronized processing time watermarks cannot progress + * past this point before consuming this bundle. + * + * <p>This value is no greater than the earliest incomplete processing time or synchronized + * processing time {@link TimerData timer} at the time this bundle was committed, including any + * timers that fired to produce this bundle. + */ + Instant getSynchronizedProcessingOutputWatermark(); + + /** + * Return a new {@link CommittedBundle} that is like this one, except calls to + * {@link #getElements()} will return the provided elements. This bundle is unchanged. + * + * <p>The value of the {@link #getSynchronizedProcessingOutputWatermark() synchronized + * processing output watermark} of the returned {@link CommittedBundle} is equal to the value + * returned from the current bundle. This is used to ensure a {@link PTransform} that could not + * complete processing on input elements properly holds the synchronized processing time to the + * appropriate value. + */ + CommittedBundle<T> withElements(Iterable<WindowedValue<T>> elements); +} http://git-wip-us.apache.org/repos/asf/beam/blob/9edd8599/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CommittedResult.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CommittedResult.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CommittedResult.java index 4db7e18..99abdd3 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CommittedResult.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CommittedResult.java @@ -21,7 +21,6 @@ package org.apache.beam.runners.direct; import com.google.auto.value.AutoValue; import java.util.Set; import javax.annotation.Nullable; -import org.apache.beam.runners.direct.DirectRunner.CommittedBundle; import org.apache.beam.sdk.transforms.AppliedPTransform; import org.apache.beam.sdk.transforms.View.CreatePCollectionView; http://git-wip-us.apache.org/repos/asf/beam/blob/9edd8599/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CompletionCallback.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CompletionCallback.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CompletionCallback.java index 766259d..7b5ef4b 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CompletionCallback.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CompletionCallback.java @@ -17,7 +17,6 @@ */ package org.apache.beam.runners.direct; -import org.apache.beam.runners.direct.DirectRunner.CommittedBundle; import org.apache.beam.sdk.transforms.AppliedPTransform; /** http://git-wip-us.apache.org/repos/asf/beam/blob/9edd8599/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternals.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternals.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternals.java index ef3a053..d2af93c 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternals.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternals.java @@ -60,7 +60,7 @@ import org.joda.time.Instant; * of {@link InMemoryState}. Whenever state that exists in the underlying {@link StateTable} is * accessed, an independent copy will be created within this table. */ -public class CopyOnAccessInMemoryStateInternals<K> implements StateInternals { +class CopyOnAccessInMemoryStateInternals<K> implements StateInternals { private final CopyOnAccessInMemoryStateTable table; private K key; http://git-wip-us.apache.org/repos/asf/beam/blob/9edd8599/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGraph.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGraph.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGraph.java index f208f6e..e163d83 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGraph.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGraph.java @@ -63,27 +63,27 @@ class DirectGraph { this.stepNames = stepNames; } - public AppliedPTransform<?, ?, ?> getProducer(PValue produced) { + AppliedPTransform<?, ?, ?> getProducer(PValue produced) { return producers.get(produced); } - public List<AppliedPTransform<?, ?, ?>> getPrimitiveConsumers(PValue consumed) { + List<AppliedPTransform<?, ?, ?>> getPrimitiveConsumers(PValue consumed) { return primitiveConsumers.get(consumed); } - public Set<AppliedPTransform<?, ?, ?>> getRootTransforms() { + Set<AppliedPTransform<?, ?, ?>> getRootTransforms() { return rootTransforms; } - public Set<PCollectionView<?>> getViews() { + Set<PCollectionView<?>> getViews() { return views; } - public String getStepName(AppliedPTransform<?, ?, ?> step) { + String getStepName(AppliedPTransform<?, ?, ?> step) { return stepNames.get(step); } - public Collection<AppliedPTransform<?, ?, ?>> getPrimitiveTransforms() { + Collection<AppliedPTransform<?, ?, ?>> getPrimitiveTransforms() { return stepNames.keySet(); } } http://git-wip-us.apache.org/repos/asf/beam/blob/9edd8599/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectMetrics.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectMetrics.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectMetrics.java index fb126fb..b6ca492 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectMetrics.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectMetrics.java @@ -30,7 +30,6 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicReference; import javax.annotation.concurrent.GuardedBy; -import org.apache.beam.runners.direct.DirectRunner.CommittedBundle; import org.apache.beam.sdk.metrics.DistributionData; import org.apache.beam.sdk.metrics.DistributionResult; import org.apache.beam.sdk.metrics.GaugeData; http://git-wip-us.apache.org/repos/asf/beam/blob/9edd8599/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java index e063bc3..c6168b3e 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java @@ -27,9 +27,7 @@ import java.util.EnumSet; import java.util.List; import java.util.Map; import java.util.Set; -import javax.annotation.Nullable; import org.apache.beam.runners.core.SplittableParDo.GBKIntoKeyedWorkItems; -import org.apache.beam.runners.core.TimerInternals.TimerData; import org.apache.beam.runners.core.construction.PTransformMatchers; import org.apache.beam.runners.direct.DirectRunner.DirectPipelineResult; import org.apache.beam.runners.direct.TestStreamEvaluatorFactory.DirectTestStreamFactory; @@ -49,119 +47,14 @@ import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.ParDo.MultiOutput; import org.apache.beam.sdk.transforms.View.CreatePCollectionView; import org.apache.beam.sdk.util.UserCodeException; -import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PCollectionView; import org.joda.time.Duration; -import org.joda.time.Instant; /** * An In-Memory implementation of the Dataflow Programming Model. Supports Unbounded * {@link PCollection PCollections}. */ public class DirectRunner extends PipelineRunner<DirectPipelineResult> { - /** - * Part of a {@link PCollection}. Elements are output to a bundle, which will cause them to be - * executed by {@link PTransform PTransforms} that consume the {@link PCollection} this bundle is - * a part of at a later point. This is an uncommitted bundle and can have elements added to it. - * - * @param <T> the type of elements that can be added to this bundle - */ - interface UncommittedBundle<T> { - /** - * Returns the PCollection that the elements of this {@link UncommittedBundle} belong to. - */ - @Nullable - PCollection<T> getPCollection(); - - /** - * Outputs an element to this bundle. - * - * @param element the element to add to this bundle - * @return this bundle - */ - UncommittedBundle<T> add(WindowedValue<T> element); - - /** - * Commits this {@link UncommittedBundle}, returning an immutable {@link CommittedBundle} - * containing all of the elements that were added to it. The {@link #add(WindowedValue)} method - * will throw an {@link IllegalStateException} if called after a call to commit. - * @param synchronizedProcessingTime the synchronized processing time at which this bundle was - * committed - */ - CommittedBundle<T> commit(Instant synchronizedProcessingTime); - } - - /** - * Part of a {@link PCollection}. Elements are output to an {@link UncommittedBundle}, which will - * eventually committed. Committed elements are executed by the {@link PTransform PTransforms} - * that consume the {@link PCollection} this bundle is - * a part of at a later point. - * @param <T> the type of elements contained within this bundle - */ - interface CommittedBundle<T> { - /** - * Returns the PCollection that the elements of this bundle belong to. - */ - @Nullable - PCollection<T> getPCollection(); - - /** - * Returns the key that was output in the most recent {@link GroupByKey} in the - * execution of this bundle. - */ - StructuralKey<?> getKey(); - - /** - * Returns an {@link Iterable} containing all of the elements that have been added to this - * {@link CommittedBundle}. - */ - Iterable<WindowedValue<T>> getElements(); - - /** - * Returns the minimum timestamp among all of the elements of this {@link CommittedBundle}. - */ - Instant getMinTimestamp(); - - /** - * Returns the processing time output watermark at the time the producing {@link PTransform} - * committed this bundle. Downstream synchronized processing time watermarks cannot progress - * past this point before consuming this bundle. - * - * <p>This value is no greater than the earliest incomplete processing time or synchronized - * processing time {@link TimerData timer} at the time this bundle was committed, including any - * timers that fired to produce this bundle. - */ - Instant getSynchronizedProcessingOutputWatermark(); - - /** - * Return a new {@link CommittedBundle} that is like this one, except calls to - * {@link #getElements()} will return the provided elements. This bundle is unchanged. - * - * <p>The value of the {@link #getSynchronizedProcessingOutputWatermark() synchronized - * processing output watermark} of the returned {@link CommittedBundle} is equal to the value - * returned from the current bundle. This is used to ensure a {@link PTransform} that could not - * complete processing on input elements properly holds the synchronized processing time to the - * appropriate value. - */ - CommittedBundle<T> withElements(Iterable<WindowedValue<T>> elements); - } - - /** - * A {@link PCollectionViewWriter} is responsible for writing contents of a {@link PCollection} to - * a storage mechanism that can be read from while constructing a {@link PCollectionView}. - * - * @param <ElemT> the type of elements the input {@link PCollection} contains. - * @param <ViewT> the type of the PCollectionView this writer writes to. - */ - interface PCollectionViewWriter<ElemT, ViewT> { - void add(Iterable<WindowedValue<ElemT>> values); - } - - /** The set of {@link PTransform PTransforms} that execute a UDF. Useful for some enforcements. */ - private static final Set<Class<? extends PTransform>> CONTAINS_UDF = - ImmutableSet.of( - Read.Bounded.class, Read.Unbounded.class, ParDo.SingleOutput.class, MultiOutput.class); enum Enforcement { ENCODABILITY { @@ -177,11 +70,18 @@ public class DirectRunner extends PipelineRunner<DirectPipelineResult> { } }; + /** + * The set of {@link PTransform PTransforms} that execute a UDF. Useful for some enforcements. + */ + private static final Set<Class<? extends PTransform>> CONTAINS_UDF = + ImmutableSet.of( + Read.Bounded.class, Read.Unbounded.class, ParDo.SingleOutput.class, MultiOutput.class); + public abstract boolean appliesTo(PCollection<?> collection, DirectGraph graph); //////////////////////////////////////////////////////////////////////////////////////////////// // Utilities for creating enforcements - public static Set<Enforcement> enabled(DirectOptions options) { + static Set<Enforcement> enabled(DirectOptions options) { EnumSet<Enforcement> enabled = EnumSet.noneOf(Enforcement.class); if (options.isEnforceEncodability()) { enabled.add(ENCODABILITY); @@ -192,7 +92,8 @@ public class DirectRunner extends PipelineRunner<DirectPipelineResult> { return Collections.unmodifiableSet(enabled); } - public static BundleFactory bundleFactoryFor(Set<Enforcement> enforcements, DirectGraph graph) { + static BundleFactory bundleFactoryFor( + Set<Enforcement> enforcements, DirectGraph graph) { BundleFactory bundleFactory = enforcements.contains(Enforcement.ENCODABILITY) ? CloningBundleFactory.create() @@ -430,7 +331,4 @@ public class DirectRunner extends PipelineRunner<DirectPipelineResult> { return NanosOffsetClock.create(); } } - - private static class ComplexParDoMatcher { - } } http://git-wip-us.apache.org/repos/asf/beam/blob/9edd8599/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EmptyInputProvider.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EmptyInputProvider.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EmptyInputProvider.java index 98d4a64..396cdee 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EmptyInputProvider.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EmptyInputProvider.java @@ -19,7 +19,6 @@ package org.apache.beam.runners.direct; import java.util.Collection; import java.util.Collections; -import org.apache.beam.runners.direct.DirectRunner.CommittedBundle; import org.apache.beam.sdk.transforms.AppliedPTransform; import org.apache.beam.sdk.transforms.Flatten; import org.apache.beam.sdk.values.PCollection; http://git-wip-us.apache.org/repos/asf/beam/blob/9edd8599/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EmptyTransformEvaluator.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EmptyTransformEvaluator.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EmptyTransformEvaluator.java deleted file mode 100644 index 85e5e70..0000000 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EmptyTransformEvaluator.java +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.direct; - -import org.apache.beam.sdk.transforms.AppliedPTransform; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.util.WindowedValue; - -/** - * A {@link TransformEvaluator} that ignores all input and produces no output. The result of - * invoking {@link #finishBundle()} on this evaluator is to return an - * {@link TransformResult} with no elements and a timestamp hold equal to - * {@link BoundedWindow#TIMESTAMP_MIN_VALUE}. Because the result contains no elements, this hold - * will not affect the watermark. - */ -final class EmptyTransformEvaluator<T> implements TransformEvaluator<T> { - public static <T> TransformEvaluator<T> create(AppliedPTransform<?, ?, ?> transform) { - return new EmptyTransformEvaluator<T>(transform); - } - - private final AppliedPTransform<?, ?, ?> transform; - - private EmptyTransformEvaluator(AppliedPTransform<?, ?, ?> transform) { - this.transform = transform; - } - - @Override - public void processElement(WindowedValue<T> element) throws Exception {} - - @Override - public TransformResult<T> finishBundle() throws Exception { - return StepTransformResult.<T>withHold(transform, BoundedWindow.TIMESTAMP_MIN_VALUE) - .build(); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/9edd8599/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java index 93d6f96..3cdf351 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java @@ -35,9 +35,6 @@ import org.apache.beam.runners.core.ExecutionContext; import org.apache.beam.runners.core.TimerInternals.TimerData; import org.apache.beam.runners.direct.CommittedResult.OutputType; import org.apache.beam.runners.direct.DirectGroupByKey.DirectGroupByKeyOnly; -import org.apache.beam.runners.direct.DirectRunner.CommittedBundle; -import org.apache.beam.runners.direct.DirectRunner.PCollectionViewWriter; -import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle; import org.apache.beam.runners.direct.WatermarkManager.FiredTimers; import org.apache.beam.runners.direct.WatermarkManager.TransformWatermarks; import org.apache.beam.sdk.Pipeline; http://git-wip-us.apache.org/repos/asf/beam/blob/9edd8599/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceFactory.java index 91dc258..f9e9fa9 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceFactory.java @@ -25,7 +25,7 @@ import java.util.concurrent.ExecutorService; * another (e.g., if any executor is shut down the remaining executors should continue to process * work). */ -public interface ExecutorServiceFactory { +interface ExecutorServiceFactory { /** * Create a new {@link ExecutorService}. */ http://git-wip-us.apache.org/repos/asf/beam/blob/9edd8599/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java index c802c58..4da62d5 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java @@ -47,7 +47,6 @@ import javax.annotation.Nullable; import org.apache.beam.runners.core.KeyedWorkItem; import org.apache.beam.runners.core.KeyedWorkItems; import org.apache.beam.runners.core.TimerInternals.TimerData; -import org.apache.beam.runners.direct.DirectRunner.CommittedBundle; import org.apache.beam.runners.direct.WatermarkManager.FiredTimers; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.PipelineResult.State; http://git-wip-us.apache.org/repos/asf/beam/blob/9edd8599/runners/direct-java/src/main/java/org/apache/beam/runners/direct/FlattenEvaluatorFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/FlattenEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/FlattenEvaluatorFactory.java index 7c6d2a1..341ea4b 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/FlattenEvaluatorFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/FlattenEvaluatorFactory.java @@ -18,8 +18,6 @@ package org.apache.beam.runners.direct; import com.google.common.collect.Iterables; -import org.apache.beam.runners.direct.DirectRunner.CommittedBundle; -import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle; import org.apache.beam.sdk.transforms.AppliedPTransform; import org.apache.beam.sdk.transforms.Flatten; import org.apache.beam.sdk.transforms.Flatten.PCollections; http://git-wip-us.apache.org/repos/asf/beam/blob/9edd8599/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java index d006553..d00e408 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java @@ -39,8 +39,6 @@ import org.apache.beam.runners.core.triggers.ExecutableTriggerStateMachine; import org.apache.beam.runners.core.triggers.TriggerStateMachines; import org.apache.beam.runners.direct.DirectExecutionContext.DirectStepContext; import org.apache.beam.runners.direct.DirectGroupByKey.DirectGroupAlsoByWindow; -import org.apache.beam.runners.direct.DirectRunner.CommittedBundle; -import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.metrics.Counter; import org.apache.beam.sdk.metrics.Metrics; http://git-wip-us.apache.org/repos/asf/beam/blob/9edd8599/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupByKeyOnlyEvaluatorFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupByKeyOnlyEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupByKeyOnlyEvaluatorFactory.java index ac0b14f..1ea8e76 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupByKeyOnlyEvaluatorFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupByKeyOnlyEvaluatorFactory.java @@ -31,8 +31,6 @@ import org.apache.beam.runners.core.GroupByKeyViaGroupByKeyOnly.GroupByKeyOnly; import org.apache.beam.runners.core.KeyedWorkItem; import org.apache.beam.runners.core.KeyedWorkItems; import org.apache.beam.runners.direct.DirectGroupByKey.DirectGroupByKeyOnly; -import org.apache.beam.runners.direct.DirectRunner.CommittedBundle; -import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle; import org.apache.beam.runners.direct.StepTransformResult.Builder; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.CoderException; http://git-wip-us.apache.org/repos/asf/beam/blob/9edd8599/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactory.java index 8d77e25..9aabddc 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactory.java @@ -21,9 +21,7 @@ import static com.google.common.base.Preconditions.checkNotNull; import com.google.common.collect.HashMultimap; import com.google.common.collect.SetMultimap; -import org.apache.beam.runners.direct.DirectRunner.CommittedBundle; import org.apache.beam.runners.direct.DirectRunner.Enforcement; -import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.CoderException; import org.apache.beam.sdk.transforms.DoFn; http://git-wip-us.apache.org/repos/asf/beam/blob/9edd8599/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutabilityEnforcementFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutabilityEnforcementFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutabilityEnforcementFactory.java index 85fc374..8880af9 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutabilityEnforcementFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutabilityEnforcementFactory.java @@ -19,7 +19,6 @@ package org.apache.beam.runners.direct; import java.util.IdentityHashMap; import java.util.Map; -import org.apache.beam.runners.direct.DirectRunner.CommittedBundle; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.CoderException; import org.apache.beam.sdk.transforms.AppliedPTransform; http://git-wip-us.apache.org/repos/asf/beam/blob/9edd8599/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutableListBundleFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutableListBundleFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutableListBundleFactory.java index 36264ee..73734d0 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutableListBundleFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutableListBundleFactory.java @@ -23,8 +23,6 @@ import static com.google.common.base.Preconditions.checkState; import com.google.auto.value.AutoValue; import com.google.common.collect.ImmutableList; import javax.annotation.Nullable; -import org.apache.beam.runners.direct.DirectRunner.CommittedBundle; -import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.PCollection; http://git-wip-us.apache.org/repos/asf/beam/blob/9edd8599/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ModelEnforcement.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ModelEnforcement.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ModelEnforcement.java index 96dbc2b..d2e9424 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ModelEnforcement.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ModelEnforcement.java @@ -27,7 +27,7 @@ import org.apache.beam.sdk.values.PCollection; * * <p>ModelEnforcement is performed on a per-element and per-bundle basis. The * {@link ModelEnforcement} is provided with the input bundle as part of - * {@link ModelEnforcementFactory#forBundle(DirectRunner.CommittedBundle, AppliedPTransform)} each + * {@link ModelEnforcementFactory#forBundle(CommittedBundle, AppliedPTransform)} each * element before and after that element is provided to an underlying {@link TransformEvaluator}, * and the output {@link TransformResult} and committed output bundles after the * {@link TransformEvaluator} has completed. @@ -37,7 +37,7 @@ import org.apache.beam.sdk.values.PCollection; * (such as the immutability of input elements). When the element is output or the bundle is * completed, the required conditions can be enforced across all elements. */ -public interface ModelEnforcement<T> { +interface ModelEnforcement<T> { /** * Called before a call to {@link TransformEvaluator#processElement(WindowedValue)} on the * provided {@link WindowedValue}. @@ -53,10 +53,10 @@ public interface ModelEnforcement<T> { /** * Called after a bundle has been completed and {@link TransformEvaluator#finishBundle()} has been * called, producing the provided {@link TransformResult} and - * {@link DirectRunner.CommittedBundle output bundles}. + * {@link CommittedBundle output bundles}. */ void afterFinish( - DirectRunner.CommittedBundle<T> input, + CommittedBundle<T> input, TransformResult<T> result, - Iterable<? extends DirectRunner.CommittedBundle<?>> outputs); + Iterable<? extends CommittedBundle<?>> outputs); } http://git-wip-us.apache.org/repos/asf/beam/blob/9edd8599/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ModelEnforcementFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ModelEnforcementFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ModelEnforcementFactory.java index e0bbfcb..30f1d20 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ModelEnforcementFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ModelEnforcementFactory.java @@ -17,7 +17,6 @@ */ package org.apache.beam.runners.direct; -import org.apache.beam.runners.direct.DirectRunner.CommittedBundle; import org.apache.beam.sdk.transforms.AppliedPTransform; /** @@ -25,6 +24,6 @@ import org.apache.beam.sdk.transforms.AppliedPTransform; * {@link CommittedBundle bundle}. {@link ModelEnforcement} instances are created before the * {@link TransformEvaluator} is created. */ -public interface ModelEnforcementFactory { +interface ModelEnforcementFactory { <T> ModelEnforcement<T> forBundle(CommittedBundle<T> input, AppliedPTransform<?, ?, ?> consumer); } http://git-wip-us.apache.org/repos/asf/beam/blob/9edd8599/runners/direct-java/src/main/java/org/apache/beam/runners/direct/NanosOffsetClock.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/NanosOffsetClock.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/NanosOffsetClock.java index 5a2b18d..5e86f4b 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/NanosOffsetClock.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/NanosOffsetClock.java @@ -23,7 +23,7 @@ import org.joda.time.Instant; /** * A {@link Clock} that uses {@link System#nanoTime()} to track the progress of time. */ -public class NanosOffsetClock implements Clock { +class NanosOffsetClock implements Clock { private final long baseMillis; private final long nanosAtBaseMillis; http://git-wip-us.apache.org/repos/asf/beam/blob/9edd8599/runners/direct-java/src/main/java/org/apache/beam/runners/direct/PCollectionViewWriter.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/PCollectionViewWriter.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/PCollectionViewWriter.java new file mode 100644 index 0000000..10c6b74 --- /dev/null +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/PCollectionViewWriter.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.direct; + +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionView; + +/** + * A {@link PCollectionViewWriter} is responsible for writing contents of a {@link PCollection} to + * a storage mechanism that can be read from while constructing a {@link PCollectionView}. + * + * @param <ElemT> the type of elements the input {@link PCollection} contains. + * @param <ViewT> the type of the PCollectionView this writer writes to. + */ +interface PCollectionViewWriter<ElemT, ViewT> { + void add(Iterable<WindowedValue<ElemT>> values); +} http://git-wip-us.apache.org/repos/asf/beam/blob/9edd8599/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java index 2ea8a91..a3a345f 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java @@ -29,7 +29,6 @@ import org.apache.beam.runners.core.PushbackSideInputDoFnRunner; import org.apache.beam.runners.core.SimplePushbackSideInputDoFnRunner; import org.apache.beam.runners.core.TimerInternals.TimerData; import org.apache.beam.runners.direct.DirectExecutionContext.DirectStepContext; -import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.AppliedPTransform; import org.apache.beam.sdk.transforms.DoFn; http://git-wip-us.apache.org/repos/asf/beam/blob/9edd8599/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java index b00c2b6..39595d8 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java @@ -25,7 +25,6 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import org.apache.beam.runners.direct.DirectExecutionContext.DirectStepContext; -import org.apache.beam.runners.direct.DirectRunner.CommittedBundle; import org.apache.beam.sdk.transforms.AppliedPTransform; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.ParDo; http://git-wip-us.apache.org/repos/asf/beam/blob/9edd8599/runners/direct-java/src/main/java/org/apache/beam/runners/direct/PassthroughTransformEvaluator.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/PassthroughTransformEvaluator.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/PassthroughTransformEvaluator.java index 153af65..c57932c 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/PassthroughTransformEvaluator.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/PassthroughTransformEvaluator.java @@ -17,7 +17,6 @@ */ package org.apache.beam.runners.direct; -import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle; import org.apache.beam.sdk.transforms.AppliedPTransform; import org.apache.beam.sdk.util.WindowedValue; http://git-wip-us.apache.org/repos/asf/beam/blob/9edd8599/runners/direct-java/src/main/java/org/apache/beam/runners/direct/PipelineExecutor.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/PipelineExecutor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/PipelineExecutor.java index 82f59a7..07212c7 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/PipelineExecutor.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/PipelineExecutor.java @@ -18,7 +18,6 @@ package org.apache.beam.runners.direct; import java.util.Collection; -import org.apache.beam.runners.direct.DirectRunner.CommittedBundle; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.PipelineResult.State; import org.apache.beam.sdk.transforms.AppliedPTransform; http://git-wip-us.apache.org/repos/asf/beam/blob/9edd8599/runners/direct-java/src/main/java/org/apache/beam/runners/direct/RootInputProvider.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/RootInputProvider.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/RootInputProvider.java index c3df103..88e0769 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/RootInputProvider.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/RootInputProvider.java @@ -19,7 +19,6 @@ package org.apache.beam.runners.direct; import java.util.Collection; -import org.apache.beam.runners.direct.DirectRunner.CommittedBundle; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.transforms.AppliedPTransform; import org.apache.beam.sdk.transforms.PTransform; http://git-wip-us.apache.org/repos/asf/beam/blob/9edd8599/runners/direct-java/src/main/java/org/apache/beam/runners/direct/RootProviderRegistry.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/RootProviderRegistry.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/RootProviderRegistry.java index eb9492c..b06a41c 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/RootProviderRegistry.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/RootProviderRegistry.java @@ -22,7 +22,6 @@ import static com.google.common.base.Preconditions.checkNotNull; import com.google.common.collect.ImmutableMap; import java.util.Collection; import java.util.Map; -import org.apache.beam.runners.direct.DirectRunner.CommittedBundle; import org.apache.beam.sdk.io.Read; import org.apache.beam.sdk.transforms.AppliedPTransform; import org.apache.beam.sdk.transforms.Flatten.PCollections; http://git-wip-us.apache.org/repos/asf/beam/blob/9edd8599/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java index 5f6b4f7..fb3a962 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java @@ -33,7 +33,6 @@ import org.apache.beam.runners.core.StateInternals; import org.apache.beam.runners.core.StateInternalsFactory; import org.apache.beam.runners.core.TimerInternals; import org.apache.beam.runners.core.TimerInternalsFactory; -import org.apache.beam.runners.direct.DirectRunner.CommittedBundle; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.AppliedPTransform; import org.apache.beam.sdk.transforms.DoFn; http://git-wip-us.apache.org/repos/asf/beam/blob/9edd8599/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java index 7cf3840..f278e08 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java @@ -37,7 +37,6 @@ import org.apache.beam.runners.core.StateTag; import org.apache.beam.runners.core.StateTags; import org.apache.beam.runners.core.TimerInternals.TimerData; import org.apache.beam.runners.direct.DirectExecutionContext.DirectStepContext; -import org.apache.beam.runners.direct.DirectRunner.CommittedBundle; import org.apache.beam.runners.direct.ParDoMultiOverrideFactory.StatefulParDo; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.transforms.AppliedPTransform; http://git-wip-us.apache.org/repos/asf/beam/blob/9edd8599/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StepTransformResult.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StepTransformResult.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StepTransformResult.java index 2a2ccab..7e5f824 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StepTransformResult.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StepTransformResult.java @@ -24,7 +24,6 @@ import java.util.Collection; import java.util.EnumSet; import java.util.Set; import org.apache.beam.runners.direct.CommittedResult.OutputType; -import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle; import org.apache.beam.runners.direct.WatermarkManager.TimerUpdate; import org.apache.beam.sdk.metrics.MetricUpdates; import org.apache.beam.sdk.transforms.AppliedPTransform; @@ -36,7 +35,7 @@ import org.joda.time.Instant; * An immutable {@link TransformResult}. */ @AutoValue -public abstract class StepTransformResult<InputT> implements TransformResult<InputT> { +abstract class StepTransformResult<InputT> implements TransformResult<InputT> { public static <InputT> Builder<InputT> withHold( AppliedPTransform<?, ?, ?> transform, Instant watermarkHold) { http://git-wip-us.apache.org/repos/asf/beam/blob/9edd8599/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java index cba754e..b5486c0 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java @@ -29,8 +29,6 @@ import java.util.Map; import java.util.concurrent.atomic.AtomicReference; import javax.annotation.Nullable; import org.apache.beam.runners.core.construction.ReplacementOutputs; -import org.apache.beam.runners.direct.DirectRunner.CommittedBundle; -import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle; import org.apache.beam.sdk.runners.PTransformOverrideFactory; import org.apache.beam.sdk.testing.TestStream; import org.apache.beam.sdk.testing.TestStream.ElementEvent; http://git-wip-us.apache.org/repos/asf/beam/blob/9edd8599/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluator.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluator.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluator.java index 79c942b..1a7209d 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluator.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluator.java @@ -17,7 +17,6 @@ */ package org.apache.beam.runners.direct; -import org.apache.beam.runners.direct.DirectRunner.CommittedBundle; import org.apache.beam.sdk.util.WindowedValue; /** @@ -26,7 +25,7 @@ import org.apache.beam.sdk.util.WindowedValue; * * @param <InputT> the type of elements that will be passed to {@link #processElement} */ -public interface TransformEvaluator<InputT> { +interface TransformEvaluator<InputT> { /** * Process an element in the input {@link CommittedBundle}. * http://git-wip-us.apache.org/repos/asf/beam/blob/9edd8599/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorFactory.java index c7bc46f..c187359 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorFactory.java @@ -30,7 +30,7 @@ import org.apache.beam.sdk.transforms.PTransform; * <p>{@link TransformEvaluatorFactory TransformEvaluatorFactories} will be reused within a single * execution of a {@link Pipeline} but will not be reused across executions. */ -public interface TransformEvaluatorFactory { +interface TransformEvaluatorFactory { /** * Create a new {@link TransformEvaluator} for the application of the {@link PTransform}. * @@ -47,13 +47,13 @@ public interface TransformEvaluatorFactory { */ @Nullable <InputT> TransformEvaluator<InputT> forApplication( - AppliedPTransform<?, ?, ?> application, DirectRunner.CommittedBundle<?> inputBundle) + AppliedPTransform<?, ?, ?> application, CommittedBundle<?> inputBundle) throws Exception; /** * Cleans up any state maintained by this {@link TransformEvaluatorFactory}. Called after a * {@link Pipeline} is shut down. No more calls to - * {@link #forApplication(AppliedPTransform, DirectRunner.CommittedBundle)} will be made after + * {@link #forApplication(AppliedPTransform, CommittedBundle)} will be made after * a call to {@link #cleanup()}. */ void cleanup() throws Exception; http://git-wip-us.apache.org/repos/asf/beam/blob/9edd8599/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java index d06c460..a00253a 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java @@ -28,7 +28,6 @@ import java.util.concurrent.atomic.AtomicBoolean; import org.apache.beam.runners.core.SplittableParDo; import org.apache.beam.runners.direct.DirectGroupByKey.DirectGroupAlsoByWindow; import org.apache.beam.runners.direct.DirectGroupByKey.DirectGroupByKeyOnly; -import org.apache.beam.runners.direct.DirectRunner.CommittedBundle; import org.apache.beam.runners.direct.ParDoMultiOverrideFactory.StatefulParDo; import org.apache.beam.runners.direct.ViewOverrideFactory.WriteView; import org.apache.beam.sdk.io.Read; http://git-wip-us.apache.org/repos/asf/beam/blob/9edd8599/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java index bbc0aae..26c4f5c 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java @@ -21,7 +21,6 @@ import java.io.Closeable; import java.util.ArrayList; import java.util.Collection; import java.util.concurrent.Callable; -import org.apache.beam.runners.direct.DirectRunner.CommittedBundle; import org.apache.beam.sdk.metrics.MetricUpdates; import org.apache.beam.sdk.metrics.MetricsContainer; import org.apache.beam.sdk.metrics.MetricsEnvironment; http://git-wip-us.apache.org/repos/asf/beam/blob/9edd8599/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformResult.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformResult.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformResult.java index 3a95df7..0b0790e 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformResult.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformResult.java @@ -20,7 +20,6 @@ package org.apache.beam.runners.direct; import java.util.Set; import javax.annotation.Nullable; import org.apache.beam.runners.direct.CommittedResult.OutputType; -import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle; import org.apache.beam.runners.direct.WatermarkManager.TimerUpdate; import org.apache.beam.sdk.metrics.MetricUpdates; import org.apache.beam.sdk.transforms.AppliedPTransform; @@ -36,7 +35,7 @@ import org.joda.time.Instant; * <p>Every transform evaluator has a defined input type, but {@link ParDo} has multiple outputs * so there is not necesssarily a defined output type. */ -public interface TransformResult<InputT> { +interface TransformResult<InputT> { /** * Returns the {@link AppliedPTransform} that produced this result. * http://git-wip-us.apache.org/repos/asf/beam/blob/9edd8599/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java index d3609f8..922a681 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java @@ -29,8 +29,6 @@ import java.util.Collections; import java.util.List; import java.util.concurrent.ThreadLocalRandom; import javax.annotation.Nullable; -import org.apache.beam.runners.direct.DirectRunner.CommittedBundle; -import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle; import org.apache.beam.runners.direct.UnboundedReadDeduplicator.NeverDeduplicator; import org.apache.beam.sdk.io.Read; import org.apache.beam.sdk.io.Read.Unbounded; http://git-wip-us.apache.org/repos/asf/beam/blob/9edd8599/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UncommittedBundle.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UncommittedBundle.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UncommittedBundle.java new file mode 100644 index 0000000..07fa138 --- /dev/null +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UncommittedBundle.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.direct; + +import javax.annotation.Nullable; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.values.PCollection; +import org.joda.time.Instant; + +/** + * Part of a {@link PCollection}. Elements are output to a bundle, which will cause them to be + * executed by {@link PTransform PTransforms} that consume the {@link PCollection} this bundle is + * a part of at a later point. This is an uncommitted bundle and can have elements added to it. + * + * @param <T> the type of elements that can be added to this bundle + */ +interface UncommittedBundle<T> { + /** + * Returns the PCollection that the elements of this {@link UncommittedBundle} belong to. + */ + @Nullable + PCollection<T> getPCollection(); + + /** + * Outputs an element to this bundle. + * + * @param element the element to add to this bundle + * @return this bundle + */ + UncommittedBundle<T> add(WindowedValue<T> element); + + /** + * Commits this {@link UncommittedBundle}, returning an immutable {@link CommittedBundle} + * containing all of the elements that were added to it. The {@link #add(WindowedValue)} method + * will throw an {@link IllegalStateException} if called after a call to commit. + * @param synchronizedProcessingTime the synchronized processing time at which this bundle was + * committed + */ + CommittedBundle<T> commit(Instant synchronizedProcessingTime); +} http://git-wip-us.apache.org/repos/asf/beam/blob/9edd8599/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewEvaluatorFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewEvaluatorFactory.java index 8cbe8fc..f4648e9 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewEvaluatorFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewEvaluatorFactory.java @@ -21,7 +21,6 @@ import com.google.common.collect.Iterables; import java.util.ArrayList; import java.util.List; import org.apache.beam.runners.direct.CommittedResult.OutputType; -import org.apache.beam.runners.direct.DirectRunner.PCollectionViewWriter; import org.apache.beam.runners.direct.StepTransformResult.Builder; import org.apache.beam.runners.direct.ViewOverrideFactory.WriteView; import org.apache.beam.sdk.transforms.AppliedPTransform; @@ -50,7 +49,7 @@ class ViewEvaluatorFactory implements TransformEvaluatorFactory { @Override public <T> TransformEvaluator<T> forApplication( AppliedPTransform<?, ?, ?> application, - DirectRunner.CommittedBundle<?> inputBundle) { + CommittedBundle<?> inputBundle) { @SuppressWarnings({"cast", "unchecked", "rawtypes"}) TransformEvaluator<T> evaluator = createEvaluator( (AppliedPTransform) application); http://git-wip-us.apache.org/repos/asf/beam/blob/9edd8599/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java index 8c04362..b576e00 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java @@ -54,7 +54,6 @@ import javax.annotation.concurrent.GuardedBy; import org.apache.beam.runners.core.StateNamespace; import org.apache.beam.runners.core.TimerInternals; import org.apache.beam.runners.core.TimerInternals.TimerData; -import org.apache.beam.runners.direct.DirectRunner.CommittedBundle; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.transforms.AppliedPTransform; import org.apache.beam.sdk.transforms.PTransform; @@ -127,7 +126,7 @@ import org.joda.time.Instant; * Watermark_PCollection = Watermark_Out_ProducingPTransform * </pre> */ -public class WatermarkManager { +class WatermarkManager { // The number of updates to apply in #tryApplyPendingUpdates private static final int MAX_INCREMENTAL_UPDATES = 10; http://git-wip-us.apache.org/repos/asf/beam/blob/9edd8599/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WindowEvaluatorFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WindowEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WindowEvaluatorFactory.java index 2550924..30d507b 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WindowEvaluatorFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WindowEvaluatorFactory.java @@ -20,8 +20,6 @@ package org.apache.beam.runners.direct; import com.google.common.collect.Iterables; import java.util.Collection; import javax.annotation.Nullable; -import org.apache.beam.runners.direct.DirectRunner.CommittedBundle; -import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle; import org.apache.beam.sdk.transforms.AppliedPTransform; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; http://git-wip-us.apache.org/repos/asf/beam/blob/9edd8599/runners/direct-java/src/test/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest.java index 2b5b46d..df7c18e 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest.java @@ -39,8 +39,6 @@ import java.util.List; import java.util.NoSuchElementException; import java.util.concurrent.CountDownLatch; import org.apache.beam.runners.direct.BoundedReadEvaluatorFactory.BoundedSourceShard; -import org.apache.beam.runners.direct.DirectRunner.CommittedBundle; -import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle; import org.apache.beam.sdk.coders.BigEndianLongCoder; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.VarLongCoder; http://git-wip-us.apache.org/repos/asf/beam/blob/9edd8599/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CloningBundleFactoryTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CloningBundleFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CloningBundleFactoryTest.java index c6054b6..7d037d1 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CloningBundleFactoryTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CloningBundleFactoryTest.java @@ -31,8 +31,6 @@ import com.google.common.collect.Iterables; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; -import org.apache.beam.runners.direct.DirectRunner.CommittedBundle; -import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle; import org.apache.beam.sdk.coders.CoderException; import org.apache.beam.sdk.coders.CustomCoder; import org.apache.beam.sdk.coders.KvCoder; http://git-wip-us.apache.org/repos/asf/beam/blob/9edd8599/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CommittedResultTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CommittedResultTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CommittedResultTest.java index 68d6eba..077cd43 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CommittedResultTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CommittedResultTest.java @@ -73,7 +73,7 @@ public class CommittedResultTest implements Serializable { CommittedResult.create( StepTransformResult.withoutHold(transform).build(), bundleFactory.createBundle(created).commit(Instant.now()), - Collections.<DirectRunner.CommittedBundle<?>>emptyList(), + Collections.<CommittedBundle<?>>emptyList(), EnumSet.noneOf(OutputType.class)); assertThat(result.getTransform(), Matchers.<AppliedPTransform<?, ?, ?>>equalTo(transform)); @@ -81,7 +81,7 @@ public class CommittedResultTest implements Serializable { @Test public void getUncommittedElementsEqualInput() { - DirectRunner.CommittedBundle<Integer> bundle = + CommittedBundle<Integer> bundle = bundleFactory.createBundle(created) .add(WindowedValue.valueInGlobalWindow(2)) .commit(Instant.now()); @@ -89,11 +89,11 @@ public class CommittedResultTest implements Serializable { CommittedResult.create( StepTransformResult.withoutHold(transform).build(), bundle, - Collections.<DirectRunner.CommittedBundle<?>>emptyList(), + Collections.<CommittedBundle<?>>emptyList(), EnumSet.noneOf(OutputType.class)); assertThat(result.getUnprocessedInputs(), - Matchers.<DirectRunner.CommittedBundle<?>>equalTo(bundle)); + Matchers.<CommittedBundle<?>>equalTo(bundle)); } @Test @@ -102,7 +102,7 @@ public class CommittedResultTest implements Serializable { CommittedResult.create( StepTransformResult.withoutHold(transform).build(), null, - Collections.<DirectRunner.CommittedBundle<?>>emptyList(), + Collections.<CommittedBundle<?>>emptyList(), EnumSet.noneOf(OutputType.class)); assertThat(result.getUnprocessedInputs(), nullValue()); @@ -110,7 +110,7 @@ public class CommittedResultTest implements Serializable { @Test public void getOutputsEqualInput() { - List<? extends DirectRunner.CommittedBundle<?>> outputs = + List<? extends CommittedBundle<?>> outputs = ImmutableList.of(bundleFactory.createBundle(PCollection.createPrimitiveOutputInternal(p, WindowingStrategy.globalDefault(), PCollection.IsBounded.BOUNDED)).commit(Instant.now()), http://git-wip-us.apache.org/repos/asf/beam/blob/9edd8599/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectMetricsTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectMetricsTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectMetricsTest.java index ee51e9a..d5d0aff 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectMetricsTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectMetricsTest.java @@ -26,7 +26,6 @@ import static org.hamcrest.Matchers.containsInAnyOrder; import static org.junit.Assert.assertThat; import com.google.common.collect.ImmutableList; -import org.apache.beam.runners.direct.DirectRunner.CommittedBundle; import org.apache.beam.sdk.metrics.DistributionData; import org.apache.beam.sdk.metrics.DistributionResult; import org.apache.beam.sdk.metrics.GaugeData; http://git-wip-us.apache.org/repos/asf/beam/blob/9edd8599/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java index bfbcd79..40582d9 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java @@ -36,9 +36,6 @@ import org.apache.beam.runners.core.StateTag; import org.apache.beam.runners.core.StateTags; import org.apache.beam.runners.core.TimerInternals.TimerData; import org.apache.beam.runners.direct.DirectExecutionContext.DirectStepContext; -import org.apache.beam.runners.direct.DirectRunner.CommittedBundle; -import org.apache.beam.runners.direct.DirectRunner.PCollectionViewWriter; -import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle; import org.apache.beam.runners.direct.WatermarkManager.FiredTimers; import org.apache.beam.runners.direct.WatermarkManager.TimerUpdate; import org.apache.beam.sdk.coders.ByteArrayCoder; http://git-wip-us.apache.org/repos/asf/beam/blob/9edd8599/runners/direct-java/src/test/java/org/apache/beam/runners/direct/FlattenEvaluatorFactoryTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/FlattenEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/FlattenEvaluatorFactoryTest.java index e07c9f9..7dc01e6 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/FlattenEvaluatorFactoryTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/FlattenEvaluatorFactoryTest.java @@ -24,8 +24,6 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import com.google.common.collect.Iterables; -import org.apache.beam.runners.direct.DirectRunner.CommittedBundle; -import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle; import org.apache.beam.sdk.coders.VarIntCoder; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.AppliedPTransform; http://git-wip-us.apache.org/repos/asf/beam/blob/9edd8599/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyEvaluatorFactoryTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyEvaluatorFactoryTest.java index fefafd0..6dcd5e2 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyEvaluatorFactoryTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyEvaluatorFactoryTest.java @@ -28,8 +28,6 @@ import com.google.common.collect.Multiset; import org.apache.beam.runners.core.KeyedWorkItem; import org.apache.beam.runners.core.KeyedWorkItems; import org.apache.beam.runners.direct.DirectGroupByKey.DirectGroupByKeyOnly; -import org.apache.beam.runners.direct.DirectRunner.CommittedBundle; -import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; http://git-wip-us.apache.org/repos/asf/beam/blob/9edd8599/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyOnlyEvaluatorFactoryTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyOnlyEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyOnlyEvaluatorFactoryTest.java index 94514ad..1373219 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyOnlyEvaluatorFactoryTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyOnlyEvaluatorFactoryTest.java @@ -28,8 +28,6 @@ import com.google.common.collect.Multiset; import org.apache.beam.runners.core.KeyedWorkItem; import org.apache.beam.runners.core.KeyedWorkItems; import org.apache.beam.runners.direct.DirectGroupByKey.DirectGroupByKeyOnly; -import org.apache.beam.runners.direct.DirectRunner.CommittedBundle; -import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; http://git-wip-us.apache.org/repos/asf/beam/blob/9edd8599/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactoryTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactoryTest.java index 838e0bd..95c0ad1 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactoryTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactoryTest.java @@ -20,8 +20,6 @@ package org.apache.beam.runners.direct; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.junit.Assert.assertThat; -import org.apache.beam.runners.direct.DirectRunner.CommittedBundle; -import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle; import org.apache.beam.sdk.coders.ByteArrayCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.testing.TestPipeline; http://git-wip-us.apache.org/repos/asf/beam/blob/9edd8599/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityEnforcementFactoryTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityEnforcementFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityEnforcementFactoryTest.java index cd3e9b4..1cd5786 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityEnforcementFactoryTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityEnforcementFactoryTest.java @@ -19,7 +19,6 @@ package org.apache.beam.runners.direct; import java.io.Serializable; import java.util.Collections; -import org.apache.beam.runners.direct.DirectRunner.CommittedBundle; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.AppliedPTransform; import org.apache.beam.sdk.transforms.Count; http://git-wip-us.apache.org/repos/asf/beam/blob/9edd8599/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutableListBundleFactoryTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutableListBundleFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutableListBundleFactoryTest.java index c5ad0cd..4a392db 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutableListBundleFactoryTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutableListBundleFactoryTest.java @@ -26,8 +26,6 @@ import com.google.common.collect.ImmutableList; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; -import org.apache.beam.runners.direct.DirectRunner.CommittedBundle; -import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle; import org.apache.beam.sdk.coders.ByteArrayCoder; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.StringUtf8Coder; http://git-wip-us.apache.org/repos/asf/beam/blob/9edd8599/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java index 69dbc22..ef8add9 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java @@ -31,7 +31,6 @@ import java.util.Collection; import java.util.List; import javax.annotation.Nullable; import org.apache.beam.runners.direct.DirectExecutionContext.DirectStepContext; -import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle; import org.apache.beam.runners.direct.WatermarkManager.TimerUpdate; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.AppliedPTransform;