http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/BoundedReadEvaluatorFactory.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/BoundedReadEvaluatorFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/BoundedReadEvaluatorFactory.java deleted file mode 100644 index a394090..0000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/BoundedReadEvaluatorFactory.java +++ /dev/null @@ -1,155 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.runners.inprocess; - -import org.apache.beam.sdk.io.BoundedSource; -import org.apache.beam.sdk.io.BoundedSource.BoundedReader; -import org.apache.beam.sdk.io.Read.Bounded; -import org.apache.beam.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle; -import org.apache.beam.sdk.runners.inprocess.InProcessPipelineRunner.UncommittedBundle; -import org.apache.beam.sdk.transforms.AppliedPTransform; -import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.util.WindowedValue; -import org.apache.beam.sdk.values.PCollection; - -import java.io.IOException; -import java.util.Queue; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.ConcurrentMap; - -import javax.annotation.Nullable; - -/** - * A {@link TransformEvaluatorFactory} that produces {@link TransformEvaluator TransformEvaluators} - * for the {@link Bounded Read.Bounded} primitive {@link PTransform}. - */ -final class BoundedReadEvaluatorFactory implements TransformEvaluatorFactory { - /* - * An evaluator for a Source is stateful, to ensure data is not read multiple times. - * Evaluators are cached here to ensure that the reader is not restarted if the evaluator is - * retriggered. - */ - private final ConcurrentMap<EvaluatorKey, Queue<? extends BoundedReadEvaluator<?>>> - sourceEvaluators = new ConcurrentHashMap<>(); - - @SuppressWarnings({"unchecked", "rawtypes"}) - @Override - public <InputT> TransformEvaluator<InputT> forApplication( - AppliedPTransform<?, ?, ?> application, - @Nullable CommittedBundle<?> inputBundle, - InProcessEvaluationContext evaluationContext) - throws IOException { - return getTransformEvaluator((AppliedPTransform) application, evaluationContext); - } - - private <OutputT> TransformEvaluator<?> getTransformEvaluator( - final AppliedPTransform<?, PCollection<OutputT>, Bounded<OutputT>> transform, - final InProcessEvaluationContext evaluationContext) { - BoundedReadEvaluator<?> evaluator = - getTransformEvaluatorQueue(transform, evaluationContext).poll(); - if (evaluator == null) { - return EmptyTransformEvaluator.create(transform); - } - return evaluator; - } - - /** - * Get the queue of {@link TransformEvaluator TransformEvaluators} that produce elements for the - * provided application of {@link Bounded Read.Bounded}, initializing it if required. - * - * <p>This method is thread-safe, and will only produce new evaluators if no other invocation has - * already done so. - */ - @SuppressWarnings("unchecked") - private <OutputT> Queue<BoundedReadEvaluator<OutputT>> getTransformEvaluatorQueue( - final AppliedPTransform<?, PCollection<OutputT>, Bounded<OutputT>> transform, - final InProcessEvaluationContext evaluationContext) { - // Key by the application and the context the evaluation is occurring in (which call to - // Pipeline#run). - EvaluatorKey key = new EvaluatorKey(transform, evaluationContext); - Queue<BoundedReadEvaluator<OutputT>> evaluatorQueue = - (Queue<BoundedReadEvaluator<OutputT>>) sourceEvaluators.get(key); - if (evaluatorQueue == null) { - evaluatorQueue = new ConcurrentLinkedQueue<>(); - if (sourceEvaluators.putIfAbsent(key, evaluatorQueue) == null) { - // If no queue existed in the evaluators, add an evaluator to initialize the evaluator - // factory for this transform - BoundedSource<OutputT> source = transform.getTransform().getSource(); - BoundedReadEvaluator<OutputT> evaluator = - new BoundedReadEvaluator<OutputT>(transform, evaluationContext, source); - evaluatorQueue.offer(evaluator); - } else { - // otherwise return the existing Queue that arrived before us - evaluatorQueue = (Queue<BoundedReadEvaluator<OutputT>>) sourceEvaluators.get(key); - } - } - return evaluatorQueue; - } - - /** - * A {@link BoundedReadEvaluator} produces elements from an underlying {@link BoundedSource}, - * discarding all input elements. Within the call to {@link #finishBundle()}, the evaluator - * creates the {@link BoundedReader} and consumes all available input. - * - * <p>A {@link BoundedReadEvaluator} should only be created once per {@link BoundedSource}, and - * each evaluator should only be called once per evaluation of the pipeline. Otherwise, the source - * may produce duplicate elements. - */ - private static class BoundedReadEvaluator<OutputT> implements TransformEvaluator<Object> { - private final AppliedPTransform<?, PCollection<OutputT>, Bounded<OutputT>> transform; - private final InProcessEvaluationContext evaluationContext; - /** - * The source being read from by this {@link BoundedReadEvaluator}. This may not be the same - * as the source derived from {@link #transform} due to splitting. - */ - private BoundedSource<OutputT> source; - - public BoundedReadEvaluator( - AppliedPTransform<?, PCollection<OutputT>, Bounded<OutputT>> transform, - InProcessEvaluationContext evaluationContext, - BoundedSource<OutputT> source) { - this.transform = transform; - this.evaluationContext = evaluationContext; - this.source = source; - } - - @Override - public void processElement(WindowedValue<Object> element) {} - - @Override - public InProcessTransformResult finishBundle() throws IOException { - try (final BoundedReader<OutputT> reader = - source.createReader(evaluationContext.getPipelineOptions());) { - boolean contentsRemaining = reader.start(); - UncommittedBundle<OutputT> output = - evaluationContext.createRootBundle(transform.getOutput()); - while (contentsRemaining) { - output.add( - WindowedValue.timestampedValueInGlobalWindow( - reader.getCurrent(), reader.getCurrentTimestamp())); - contentsRemaining = reader.advance(); - } - return StepTransformResult.withHold(transform, BoundedWindow.TIMESTAMP_MAX_VALUE) - .addOutput(output) - .build(); - } - } - } -}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/BundleFactory.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/BundleFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/BundleFactory.java deleted file mode 100644 index 5479b00..0000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/BundleFactory.java +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.runners.inprocess; - -import org.apache.beam.sdk.runners.inprocess.GroupByKeyEvaluatorFactory.InProcessGroupByKeyOnly; -import org.apache.beam.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle; -import org.apache.beam.sdk.runners.inprocess.InProcessPipelineRunner.UncommittedBundle; -import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.values.PCollection; - -/** - * A factory that creates {@link UncommittedBundle UncommittedBundles}. - */ -public interface BundleFactory { - /** - * Create an {@link UncommittedBundle} from an empty input. Elements added to the bundle belong to - * the {@code output} {@link PCollection}. - */ - public <T> UncommittedBundle<T> createRootBundle(PCollection<T> output); - - /** - * Create an {@link UncommittedBundle} from the specified input. Elements added to the bundle - * belong to the {@code output} {@link PCollection}. - */ - public <T> UncommittedBundle<T> createBundle(CommittedBundle<?> input, PCollection<T> output); - - /** - * Create an {@link UncommittedBundle} with the specified keys at the specified step. For use by - * {@link InProcessGroupByKeyOnly} {@link PTransform PTransforms}. Elements added to the bundle - * belong to the {@code output} {@link PCollection}. - */ - public <T> UncommittedBundle<T> createKeyedBundle( - CommittedBundle<?> input, Object key, PCollection<T> output); -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/CachedThreadPoolExecutorServiceFactory.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/CachedThreadPoolExecutorServiceFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/CachedThreadPoolExecutorServiceFactory.java deleted file mode 100644 index 12427d9..0000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/CachedThreadPoolExecutorServiceFactory.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.runners.inprocess; - -import org.apache.beam.sdk.options.DefaultValueFactory; -import org.apache.beam.sdk.options.PipelineOptions; - -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; - -/** - * A {@link ExecutorServiceFactory} that produces cached thread pools via - * {@link Executors#newCachedThreadPool()}. - */ -class CachedThreadPoolExecutorServiceFactory - implements DefaultValueFactory<ExecutorServiceFactory>, ExecutorServiceFactory { - private static final CachedThreadPoolExecutorServiceFactory INSTANCE = - new CachedThreadPoolExecutorServiceFactory(); - - @Override - public ExecutorServiceFactory create(PipelineOptions options) { - return INSTANCE; - } - - @Override - public ExecutorService create() { - return Executors.newCachedThreadPool(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/Clock.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/Clock.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/Clock.java deleted file mode 100644 index 7a51251..0000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/Clock.java +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.runners.inprocess; - -import org.joda.time.Instant; - -/** - * Access to the current time. - */ -public interface Clock { - /** - * Returns the current time as an {@link Instant}. - */ - Instant now(); -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/CommittedResult.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/CommittedResult.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/CommittedResult.java deleted file mode 100644 index 10e9697..0000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/CommittedResult.java +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.sdk.runners.inprocess; - -import org.apache.beam.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle; -import org.apache.beam.sdk.transforms.AppliedPTransform; - -import com.google.auto.value.AutoValue; - -/** - * A {@link InProcessTransformResult} that has been committed. - */ -@AutoValue -abstract class CommittedResult { - /** - * Returns the {@link AppliedPTransform} that produced this result. - */ - public abstract AppliedPTransform<?, ?, ?> getTransform(); - - /** - * Returns the outputs produced by the transform. - */ - public abstract Iterable<? extends CommittedBundle<?>> getOutputs(); - - public static CommittedResult create( - InProcessTransformResult original, Iterable<? extends CommittedBundle<?>> outputs) { - return new AutoValue_CommittedResult(original.getTransform(), - outputs); - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/CompletionCallback.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/CompletionCallback.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/CompletionCallback.java deleted file mode 100644 index 30a2b92..0000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/CompletionCallback.java +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.runners.inprocess; - -import org.apache.beam.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle; - -/** - * A callback for completing a bundle of input. - */ -interface CompletionCallback { - /** - * Handle a successful result, returning the committed outputs of the result. - */ - CommittedResult handleResult( - CommittedBundle<?> inputBundle, InProcessTransformResult result); - - /** - * Handle a result that terminated abnormally due to the provided {@link Throwable}. - */ - void handleThrowable(CommittedBundle<?> inputBundle, Throwable t); -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ConsumerTrackingPipelineVisitor.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ConsumerTrackingPipelineVisitor.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ConsumerTrackingPipelineVisitor.java deleted file mode 100644 index f6ea4af..0000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ConsumerTrackingPipelineVisitor.java +++ /dev/null @@ -1,173 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.runners.inprocess; - -import static com.google.common.base.Preconditions.checkState; - -import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.Pipeline.PipelineVisitor; -import org.apache.beam.sdk.runners.PipelineRunner; -import org.apache.beam.sdk.runners.TransformTreeNode; -import org.apache.beam.sdk.transforms.AppliedPTransform; -import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.values.PCollectionView; -import org.apache.beam.sdk.values.PInput; -import org.apache.beam.sdk.values.PValue; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Map; -import java.util.Set; - -/** - * Tracks the {@link AppliedPTransform AppliedPTransforms} that consume each {@link PValue} in the - * {@link Pipeline}. This is used to schedule consuming {@link PTransform PTransforms} to consume - * input after the upstream transform has produced and committed output. - */ -public class ConsumerTrackingPipelineVisitor implements PipelineVisitor { - private Map<PValue, Collection<AppliedPTransform<?, ?, ?>>> valueToConsumers = new HashMap<>(); - private Collection<AppliedPTransform<?, ?, ?>> rootTransforms = new ArrayList<>(); - private Collection<PCollectionView<?>> views = new ArrayList<>(); - private Map<AppliedPTransform<?, ?, ?>, String> stepNames = new HashMap<>(); - private Set<PValue> toFinalize = new HashSet<>(); - private int numTransforms = 0; - private boolean finalized = false; - - @Override - public void enterCompositeTransform(TransformTreeNode node) { - checkState( - !finalized, - "Attempting to traverse a pipeline (node %s) with a %s " - + "which has already visited a Pipeline and is finalized", - node.getFullName(), - ConsumerTrackingPipelineVisitor.class.getSimpleName()); - } - - @Override - public void leaveCompositeTransform(TransformTreeNode node) { - checkState( - !finalized, - "Attempting to traverse a pipeline (node %s) with a %s which is already finalized", - node.getFullName(), - ConsumerTrackingPipelineVisitor.class.getSimpleName()); - if (node.isRootNode()) { - finalized = true; - } - } - - @Override - public void visitTransform(TransformTreeNode node) { - toFinalize.removeAll(node.getInput().expand()); - AppliedPTransform<?, ?, ?> appliedTransform = getAppliedTransform(node); - stepNames.put(appliedTransform, genStepName()); - if (node.getInput().expand().isEmpty()) { - rootTransforms.add(appliedTransform); - } else { - for (PValue value : node.getInput().expand()) { - valueToConsumers.get(value).add(appliedTransform); - } - } - } - - private AppliedPTransform<?, ?, ?> getAppliedTransform(TransformTreeNode node) { - @SuppressWarnings({"rawtypes", "unchecked"}) - AppliedPTransform<?, ?, ?> application = AppliedPTransform.of( - node.getFullName(), node.getInput(), node.getOutput(), (PTransform) node.getTransform()); - return application; - } - - @Override - public void visitValue(PValue value, TransformTreeNode producer) { - toFinalize.add(value); - for (PValue expandedValue : value.expand()) { - valueToConsumers.put(expandedValue, new ArrayList<AppliedPTransform<?, ?, ?>>()); - if (expandedValue instanceof PCollectionView) { - views.add((PCollectionView<?>) expandedValue); - } - expandedValue.recordAsOutput(getAppliedTransform(producer)); - } - value.recordAsOutput(getAppliedTransform(producer)); - } - - private String genStepName() { - return String.format("s%s", numTransforms++); - } - - - /** - * Returns a mapping of each fully-expanded {@link PValue} to each - * {@link AppliedPTransform} that consumes it. For each AppliedPTransform in the collection - * returned from {@code getValueToCustomers().get(PValue)}, - * {@code AppliedPTransform#getInput().expand()} will contain the argument {@link PValue}. - */ - public Map<PValue, Collection<AppliedPTransform<?, ?, ?>>> getValueToConsumers() { - checkState( - finalized, - "Can't call getValueToConsumers before the Pipeline has been completely traversed"); - - return valueToConsumers; - } - - /** - * Returns the mapping for each {@link AppliedPTransform} in the {@link Pipeline} to a unique step - * name. - */ - public Map<AppliedPTransform<?, ?, ?>, String> getStepNames() { - checkState( - finalized, "Can't call getStepNames before the Pipeline has been completely traversed"); - - return stepNames; - } - - /** - * Returns the root transforms of the {@link Pipeline}. A root {@link AppliedPTransform} consumes - * a {@link PInput} where the {@link PInput#expand()} returns an empty collection. - */ - public Collection<AppliedPTransform<?, ?, ?>> getRootTransforms() { - checkState( - finalized, - "Can't call getRootTransforms before the Pipeline has been completely traversed"); - - return rootTransforms; - } - - /** - * Returns all of the {@link PCollectionView PCollectionViews} contained in the visited - * {@link Pipeline}. - */ - public Collection<PCollectionView<?>> getViews() { - checkState(finalized, "Can't call getViews before the Pipeline has been completely traversed"); - - return views; - } - - /** - * Returns all of the {@link PValue PValues} that have been produced but not consumed. These - * {@link PValue PValues} should be finalized by the {@link PipelineRunner} before the - * {@link Pipeline} is executed. - */ - public Set<PValue> getUnfinalizedPValues() { - checkState( - finalized, - "Can't call getUnfinalizedPValues before the Pipeline has been completely traversed"); - - return toFinalize; - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/EmptyTransformEvaluator.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/EmptyTransformEvaluator.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/EmptyTransformEvaluator.java deleted file mode 100644 index d198903..0000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/EmptyTransformEvaluator.java +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.runners.inprocess; - -import org.apache.beam.sdk.transforms.AppliedPTransform; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.util.WindowedValue; - -/** - * A {@link TransformEvaluator} that ignores all input and produces no output. The result of - * invoking {@link #finishBundle()} on this evaluator is to return an - * {@link InProcessTransformResult} with no elements and a timestamp hold equal to - * {@link BoundedWindow#TIMESTAMP_MIN_VALUE}. Because the result contains no elements, this hold - * will not affect the watermark. - */ -final class EmptyTransformEvaluator<T> implements TransformEvaluator<T> { - public static <T> TransformEvaluator<T> create(AppliedPTransform<?, ?, ?> transform) { - return new EmptyTransformEvaluator<T>(transform); - } - - private final AppliedPTransform<?, ?, ?> transform; - - private EmptyTransformEvaluator(AppliedPTransform<?, ?, ?> transform) { - this.transform = transform; - } - - @Override - public void processElement(WindowedValue<T> element) throws Exception {} - - @Override - public InProcessTransformResult finishBundle() throws Exception { - return StepTransformResult.withHold(transform, BoundedWindow.TIMESTAMP_MIN_VALUE) - .build(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/EncodabilityEnforcementFactory.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/EncodabilityEnforcementFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/EncodabilityEnforcementFactory.java deleted file mode 100644 index d234d4f..0000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/EncodabilityEnforcementFactory.java +++ /dev/null @@ -1,70 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.runners.inprocess; - -import static com.google.common.base.Preconditions.checkArgument; - -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle; -import org.apache.beam.sdk.transforms.AppliedPTransform; -import org.apache.beam.sdk.util.CoderUtils; -import org.apache.beam.sdk.util.UserCodeException; -import org.apache.beam.sdk.util.WindowedValue; -import org.apache.beam.sdk.values.PCollection; - -/** - * Enforces that all elements in a {@link PCollection} can be encoded using that - * {@link PCollection PCollection's} {@link Coder}. - */ -class EncodabilityEnforcementFactory implements ModelEnforcementFactory { - public static EncodabilityEnforcementFactory create() { - return new EncodabilityEnforcementFactory(); - } - - @Override - public <T> ModelEnforcement<T> forBundle( - CommittedBundle<T> input, AppliedPTransform<?, ?, ?> consumer) { - return new EncodabilityEnforcement<>(input); - } - - private static class EncodabilityEnforcement<T> extends AbstractModelEnforcement<T> { - private Coder<T> coder; - - public EncodabilityEnforcement(CommittedBundle<T> input) { - coder = input.getPCollection().getCoder(); - } - - @Override - public void beforeElement(WindowedValue<T> element) { - try { - T clone = CoderUtils.clone(coder, element.getValue()); - if (coder.consistentWithEquals()) { - checkArgument( - coder.structuralValue(element.getValue()).equals(coder.structuralValue(clone)), - "Coder %s of class %s does not maintain structural value equality" - + " on input element %s", - coder, - coder.getClass().getSimpleName(), - element.getValue()); - } - } catch (Exception e) { - throw UserCodeException.wrap(e); - } - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/EvaluatorKey.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/EvaluatorKey.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/EvaluatorKey.java deleted file mode 100644 index 9d8fc43..0000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/EvaluatorKey.java +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.runners.inprocess; - -import org.apache.beam.sdk.transforms.AppliedPTransform; - -import java.util.Objects; - -/** - * A (Transform, Pipeline Execution) key for stateful evaluators. - * - * Source evaluators are stateful to ensure data is not read multiple times. Evaluators are cached - * to ensure that the reader is not restarted if the evaluator is retriggered. An - * {@link EvaluatorKey} is used to ensure that multiple Pipelines can be executed without sharing - * the same evaluators. - */ -final class EvaluatorKey { - private final AppliedPTransform<?, ?, ?> transform; - private final InProcessEvaluationContext context; - - public EvaluatorKey(AppliedPTransform<?, ?, ?> transform, InProcessEvaluationContext context) { - this.transform = transform; - this.context = context; - } - - @Override - public int hashCode() { - return Objects.hash(transform, context); - } - - @Override - public boolean equals(Object other) { - if (other == null || !(other instanceof EvaluatorKey)) { - return false; - } - EvaluatorKey that = (EvaluatorKey) other; - return Objects.equals(this.transform, that.transform) - && Objects.equals(this.context, that.context); - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ExecutorServiceFactory.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ExecutorServiceFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ExecutorServiceFactory.java deleted file mode 100644 index cfbf7b4..0000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ExecutorServiceFactory.java +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.runners.inprocess; - -import java.util.concurrent.ExecutorService; - -/** - * A factory that creates {@link ExecutorService ExecutorServices}. - * {@link ExecutorService ExecutorServices} created by this factory should be independent of one - * another (e.g., if any executor is shut down the remaining executors should continue to process - * work). - */ -public interface ExecutorServiceFactory { - /** - * Create a new {@link ExecutorService}. - */ - ExecutorService create(); -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ExecutorServiceParallelExecutor.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ExecutorServiceParallelExecutor.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ExecutorServiceParallelExecutor.java deleted file mode 100644 index 19bf35d..0000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ExecutorServiceParallelExecutor.java +++ /dev/null @@ -1,478 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.runners.inprocess; - -import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.runners.inprocess.InMemoryWatermarkManager.FiredTimers; -import org.apache.beam.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle; -import org.apache.beam.sdk.transforms.AppliedPTransform; -import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.util.KeyedWorkItem; -import org.apache.beam.sdk.util.KeyedWorkItems; -import org.apache.beam.sdk.util.TimeDomain; -import org.apache.beam.sdk.util.TimerInternals.TimerData; -import org.apache.beam.sdk.util.WindowedValue; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PValue; - -import com.google.common.base.MoreObjects; -import com.google.common.base.Optional; -import com.google.common.cache.CacheBuilder; -import com.google.common.cache.CacheLoader; -import com.google.common.cache.LoadingCache; -import com.google.common.collect.ImmutableList; - -import org.joda.time.Instant; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.Collection; -import java.util.Collections; -import java.util.Map; -import java.util.Queue; -import java.util.Set; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.ExecutorService; - -import javax.annotation.Nullable; - -/** - * An {@link InProcessExecutor} that uses an underlying {@link ExecutorService} and - * {@link InProcessEvaluationContext} to execute a {@link Pipeline}. - */ -final class ExecutorServiceParallelExecutor implements InProcessExecutor { - private static final Logger LOG = LoggerFactory.getLogger(ExecutorServiceParallelExecutor.class); - - private final ExecutorService executorService; - - private final Map<PValue, Collection<AppliedPTransform<?, ?, ?>>> valueToConsumers; - private final Set<PValue> keyedPValues; - private final TransformEvaluatorRegistry registry; - @SuppressWarnings("rawtypes") - private final Map<Class<? extends PTransform>, Collection<ModelEnforcementFactory>> - transformEnforcements; - - private final InProcessEvaluationContext evaluationContext; - - private final LoadingCache<StepAndKey, TransformExecutorService> executorServices; - private final ConcurrentMap<TransformExecutor<?>, Boolean> scheduledExecutors; - - private final Queue<ExecutorUpdate> allUpdates; - private final BlockingQueue<VisibleExecutorUpdate> visibleUpdates; - - private final TransformExecutorService parallelExecutorService; - private final CompletionCallback defaultCompletionCallback; - - private Collection<AppliedPTransform<?, ?, ?>> rootNodes; - - public static ExecutorServiceParallelExecutor create( - ExecutorService executorService, - Map<PValue, Collection<AppliedPTransform<?, ?, ?>>> valueToConsumers, - Set<PValue> keyedPValues, - TransformEvaluatorRegistry registry, - @SuppressWarnings("rawtypes") - Map<Class<? extends PTransform>, Collection<ModelEnforcementFactory>> transformEnforcements, - InProcessEvaluationContext context) { - return new ExecutorServiceParallelExecutor( - executorService, valueToConsumers, keyedPValues, registry, transformEnforcements, context); - } - - private ExecutorServiceParallelExecutor( - ExecutorService executorService, - Map<PValue, Collection<AppliedPTransform<?, ?, ?>>> valueToConsumers, - Set<PValue> keyedPValues, - TransformEvaluatorRegistry registry, - @SuppressWarnings("rawtypes") - Map<Class<? extends PTransform>, Collection<ModelEnforcementFactory>> transformEnforcements, - InProcessEvaluationContext context) { - this.executorService = executorService; - this.valueToConsumers = valueToConsumers; - this.keyedPValues = keyedPValues; - this.registry = registry; - this.transformEnforcements = transformEnforcements; - this.evaluationContext = context; - - scheduledExecutors = new ConcurrentHashMap<>(); - // Weak Values allows TransformExecutorServices that are no longer in use to be reclaimed. - // Executing TransformExecutorServices have a strong reference to their TransformExecutorService - // which stops the TransformExecutorServices from being prematurely garbage collected - executorServices = - CacheBuilder.newBuilder().weakValues().build(serialTransformExecutorServiceCacheLoader()); - - this.allUpdates = new ConcurrentLinkedQueue<>(); - this.visibleUpdates = new ArrayBlockingQueue<>(20); - - parallelExecutorService = - TransformExecutorServices.parallel(executorService, scheduledExecutors); - defaultCompletionCallback = new DefaultCompletionCallback(); - } - - private CacheLoader<StepAndKey, TransformExecutorService> - serialTransformExecutorServiceCacheLoader() { - return new CacheLoader<StepAndKey, TransformExecutorService>() { - @Override - public TransformExecutorService load(StepAndKey stepAndKey) throws Exception { - return TransformExecutorServices.serial(executorService, scheduledExecutors); - } - }; - } - - @Override - public void start(Collection<AppliedPTransform<?, ?, ?>> roots) { - rootNodes = ImmutableList.copyOf(roots); - Runnable monitorRunnable = new MonitorRunnable(); - executorService.submit(monitorRunnable); - } - - @SuppressWarnings("unchecked") - public void scheduleConsumption( - AppliedPTransform<?, ?, ?> consumer, - @Nullable CommittedBundle<?> bundle, - CompletionCallback onComplete) { - evaluateBundle(consumer, bundle, onComplete); - } - - private <T> void evaluateBundle( - final AppliedPTransform<?, ?, ?> transform, - @Nullable final CommittedBundle<T> bundle, - final CompletionCallback onComplete) { - TransformExecutorService transformExecutor; - - if (bundle != null && isKeyed(bundle.getPCollection())) { - final StepAndKey stepAndKey = - StepAndKey.of(transform, bundle == null ? null : bundle.getKey()); - // This executor will remain reachable until it has executed all scheduled transforms. - // The TransformExecutors keep a strong reference to the Executor, the ExecutorService keeps - // a reference to the scheduled TransformExecutor callable. Follow-up TransformExecutors - // (scheduled due to the completion of another TransformExecutor) are provided to the - // ExecutorService before the Earlier TransformExecutor callable completes. - transformExecutor = executorServices.getUnchecked(stepAndKey); - } else { - transformExecutor = parallelExecutorService; - } - - Collection<ModelEnforcementFactory> enforcements = - MoreObjects.firstNonNull( - transformEnforcements.get(transform.getTransform().getClass()), - Collections.<ModelEnforcementFactory>emptyList()); - - TransformExecutor<T> callable = - TransformExecutor.create( - registry, - enforcements, - evaluationContext, - bundle, - transform, - onComplete, - transformExecutor); - transformExecutor.schedule(callable); - } - - private boolean isKeyed(PValue pvalue) { - return keyedPValues.contains(pvalue); - } - - private void scheduleConsumers(CommittedBundle<?> bundle) { - for (AppliedPTransform<?, ?, ?> consumer : valueToConsumers.get(bundle.getPCollection())) { - scheduleConsumption(consumer, bundle, defaultCompletionCallback); - } - } - - @Override - public void awaitCompletion() throws Throwable { - VisibleExecutorUpdate update; - do { - update = visibleUpdates.take(); - if (update.throwable.isPresent()) { - throw update.throwable.get(); - } - } while (!update.isDone()); - executorService.shutdown(); - } - - /** - * The default {@link CompletionCallback}. The default completion callback is used to complete - * transform evaluations that are triggered due to the arrival of elements from an upstream - * transform, or for a source transform. - */ - private class DefaultCompletionCallback implements CompletionCallback { - @Override - public CommittedResult handleResult( - CommittedBundle<?> inputBundle, InProcessTransformResult result) { - CommittedResult committedResult = - evaluationContext.handleResult(inputBundle, Collections.<TimerData>emptyList(), result); - for (CommittedBundle<?> outputBundle : committedResult.getOutputs()) { - allUpdates.offer(ExecutorUpdate.fromBundle(outputBundle)); - } - return committedResult; - } - - @Override - public void handleThrowable(CommittedBundle<?> inputBundle, Throwable t) { - allUpdates.offer(ExecutorUpdate.fromThrowable(t)); - } - } - - /** - * A {@link CompletionCallback} where the completed bundle was produced to deliver some collection - * of {@link TimerData timers}. When the evaluator completes successfully, reports all of the - * timers used to create the input to the {@link InProcessEvaluationContext evaluation context} - * as part of the result. - */ - private class TimerCompletionCallback implements CompletionCallback { - private final Iterable<TimerData> timers; - - private TimerCompletionCallback(Iterable<TimerData> timers) { - this.timers = timers; - } - - @Override - public CommittedResult handleResult( - CommittedBundle<?> inputBundle, InProcessTransformResult result) { - CommittedResult committedResult = - evaluationContext.handleResult(inputBundle, timers, result); - for (CommittedBundle<?> outputBundle : committedResult.getOutputs()) { - allUpdates.offer(ExecutorUpdate.fromBundle(outputBundle)); - } - return committedResult; - } - - @Override - public void handleThrowable(CommittedBundle<?> inputBundle, Throwable t) { - allUpdates.offer(ExecutorUpdate.fromThrowable(t)); - } - } - - /** - * An internal status update on the state of the executor. - * - * Used to signal when the executor should be shut down (due to an exception). - */ - private static class ExecutorUpdate { - private final Optional<? extends CommittedBundle<?>> bundle; - private final Optional<? extends Throwable> throwable; - - public static ExecutorUpdate fromBundle(CommittedBundle<?> bundle) { - return new ExecutorUpdate(bundle, null); - } - - public static ExecutorUpdate fromThrowable(Throwable t) { - return new ExecutorUpdate(null, t); - } - - private ExecutorUpdate(CommittedBundle<?> producedBundle, Throwable throwable) { - this.bundle = Optional.fromNullable(producedBundle); - this.throwable = Optional.fromNullable(throwable); - } - - public Optional<? extends CommittedBundle<?>> getBundle() { - return bundle; - } - - public Optional<? extends Throwable> getException() { - return throwable; - } - - @Override - public String toString() { - return MoreObjects.toStringHelper(ExecutorUpdate.class) - .add("bundle", bundle) - .add("exception", throwable) - .toString(); - } - } - - /** - * An update of interest to the user. Used in {@link #awaitCompletion} to decide whether to - * return normally or throw an exception. - */ - private static class VisibleExecutorUpdate { - private final Optional<? extends Throwable> throwable; - private final boolean done; - - public static VisibleExecutorUpdate fromThrowable(Throwable e) { - return new VisibleExecutorUpdate(false, e); - } - - public static VisibleExecutorUpdate finished() { - return new VisibleExecutorUpdate(true, null); - } - - private VisibleExecutorUpdate(boolean done, @Nullable Throwable exception) { - this.throwable = Optional.fromNullable(exception); - this.done = done; - } - - public boolean isDone() { - return done; - } - } - - private class MonitorRunnable implements Runnable { - private final String runnableName = - String.format( - "%s$%s-monitor", - evaluationContext.getPipelineOptions().getAppName(), - ExecutorServiceParallelExecutor.class.getSimpleName()); - - @Override - public void run() { - String oldName = Thread.currentThread().getName(); - Thread.currentThread().setName(runnableName); - try { - ExecutorUpdate update = allUpdates.poll(); - // pull all of the pending work off of the queue - while (update != null) { - LOG.debug("Executor Update: {}", update); - if (update.getBundle().isPresent()) { - scheduleConsumers(update.getBundle().get()); - } else if (update.getException().isPresent()) { - visibleUpdates.offer(VisibleExecutorUpdate.fromThrowable(update.getException().get())); - } - update = allUpdates.poll(); - } - boolean timersFired = fireTimers(); - addWorkIfNecessary(timersFired); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - LOG.error("Monitor died due to being interrupted"); - while (!visibleUpdates.offer(VisibleExecutorUpdate.fromThrowable(e))) { - visibleUpdates.poll(); - } - } catch (Throwable t) { - LOG.error("Monitor thread died due to throwable", t); - while (!visibleUpdates.offer(VisibleExecutorUpdate.fromThrowable(t))) { - visibleUpdates.poll(); - } - } finally { - if (!shouldShutdown()) { - // The monitor thread should always be scheduled; but we only need to be scheduled once - executorService.submit(this); - } - Thread.currentThread().setName(oldName); - } - } - - /** - * Fires any available timers. Returns true if at least one timer was fired. - */ - private boolean fireTimers() throws Exception { - try { - boolean firedTimers = false; - for (Map.Entry<AppliedPTransform<?, ?, ?>, Map<Object, FiredTimers>> transformTimers : - evaluationContext.extractFiredTimers().entrySet()) { - AppliedPTransform<?, ?, ?> transform = transformTimers.getKey(); - for (Map.Entry<Object, FiredTimers> keyTimers : transformTimers.getValue().entrySet()) { - for (TimeDomain domain : TimeDomain.values()) { - Collection<TimerData> delivery = keyTimers.getValue().getTimers(domain); - if (delivery.isEmpty()) { - continue; - } - KeyedWorkItem<Object, Object> work = - KeyedWorkItems.timersWorkItem(keyTimers.getKey(), delivery); - @SuppressWarnings({"unchecked", "rawtypes"}) - CommittedBundle<?> bundle = - evaluationContext - .createKeyedBundle( - null, keyTimers.getKey(), (PCollection) transform.getInput()) - .add(WindowedValue.valueInEmptyWindows(work)) - .commit(Instant.now()); - scheduleConsumption(transform, bundle, new TimerCompletionCallback(delivery)); - firedTimers = true; - } - } - } - return firedTimers; - } catch (Exception e) { - LOG.error("Internal Error while delivering timers", e); - throw e; - } - } - - private boolean shouldShutdown() { - if (evaluationContext.isDone()) { - LOG.debug("Pipeline is finished. Shutting down. {}"); - while (!visibleUpdates.offer(VisibleExecutorUpdate.finished())) { - visibleUpdates.poll(); - } - executorService.shutdown(); - return true; - } - return false; - } - - /** - * If all active {@link TransformExecutor TransformExecutors} are in a blocked state, - * add more work from root nodes that may have additional work. This ensures that if a pipeline - * has elements available from the root nodes it will add those elements when necessary. - */ - private void addWorkIfNecessary(boolean firedTimers) { - // If any timers have fired, they will add more work; We don't need to add more - if (firedTimers) { - return; - } - for (TransformExecutor<?> executor : scheduledExecutors.keySet()) { - if (!isExecutorBlocked(executor)) { - // We have at least one executor that can proceed without adding additional work - return; - } - } - // All current TransformExecutors are blocked; add more work from the roots. - for (AppliedPTransform<?, ?, ?> root : rootNodes) { - if (!evaluationContext.isDone(root)) { - scheduleConsumption(root, null, defaultCompletionCallback); - } - } - } - - /** - * Return true if the provided executor might make more progress if no action is taken. - * - * <p>May return false even if all executor threads are currently blocked or cleaning up, as - * these can cause more work to be scheduled. If this does not occur, after these calls - * terminate, future calls will return true if all executors are waiting. - */ - private boolean isExecutorBlocked(TransformExecutor<?> executor) { - Thread thread = executor.getThread(); - if (thread == null) { - return false; - } - switch (thread.getState()) { - case TERMINATED: - throw new IllegalStateException(String.format( - "Unexpectedly encountered a Terminated TransformExecutor %s", executor)); - case WAITING: - case TIMED_WAITING: - // The thread is waiting for some external input. Adding more work may cause the thread - // to stop waiting (e.g. the thread is waiting on an unbounded side input) - return true; - case BLOCKED: - // The executor is blocked on acquisition of a java monitor. This usually means it is - // making a call to the EvaluationContext, but not a model-blocking call - and will - // eventually complete, at which point we may reevaluate. - default: - // NEW and RUNNABLE threads can make progress - return false; - } - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/FlattenEvaluatorFactory.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/FlattenEvaluatorFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/FlattenEvaluatorFactory.java deleted file mode 100644 index 4e23dde..0000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/FlattenEvaluatorFactory.java +++ /dev/null @@ -1,85 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.runners.inprocess; - -import org.apache.beam.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle; -import org.apache.beam.sdk.runners.inprocess.InProcessPipelineRunner.UncommittedBundle; -import org.apache.beam.sdk.transforms.AppliedPTransform; -import org.apache.beam.sdk.transforms.Flatten; -import org.apache.beam.sdk.transforms.Flatten.FlattenPCollectionList; -import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.util.WindowedValue; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PCollectionList; - -/** - * The {@link InProcessPipelineRunner} {@link TransformEvaluatorFactory} for the {@link Flatten} - * {@link PTransform}. - */ -class FlattenEvaluatorFactory implements TransformEvaluatorFactory { - @Override - public <InputT> TransformEvaluator<InputT> forApplication( - AppliedPTransform<?, ?, ?> application, - CommittedBundle<?> inputBundle, - InProcessEvaluationContext evaluationContext) { - @SuppressWarnings({"cast", "unchecked", "rawtypes"}) - TransformEvaluator<InputT> evaluator = (TransformEvaluator<InputT>) createInMemoryEvaluator( - (AppliedPTransform) application, inputBundle, evaluationContext); - return evaluator; - } - - private <InputT> TransformEvaluator<InputT> createInMemoryEvaluator( - final AppliedPTransform< - PCollectionList<InputT>, PCollection<InputT>, FlattenPCollectionList<InputT>> - application, - final CommittedBundle<InputT> inputBundle, - final InProcessEvaluationContext evaluationContext) { - if (inputBundle == null) { - // it is impossible to call processElement on a flatten with no input bundle. A Flatten with - // no input bundle occurs as an output of Flatten.pcollections(PCollectionList.empty()) - return new FlattenEvaluator<>( - null, StepTransformResult.withoutHold(application).build()); - } - final UncommittedBundle<InputT> outputBundle = - evaluationContext.createBundle(inputBundle, application.getOutput()); - final InProcessTransformResult result = - StepTransformResult.withoutHold(application).addOutput(outputBundle).build(); - return new FlattenEvaluator<>(outputBundle, result); - } - - private static class FlattenEvaluator<InputT> implements TransformEvaluator<InputT> { - private final UncommittedBundle<InputT> outputBundle; - private final InProcessTransformResult result; - - public FlattenEvaluator( - UncommittedBundle<InputT> outputBundle, InProcessTransformResult result) { - this.outputBundle = outputBundle; - this.result = result; - } - - @Override - public void processElement(WindowedValue<InputT> element) { - outputBundle.add(element); - } - - @Override - public InProcessTransformResult finishBundle() { - return result; - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ForwardingPTransform.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ForwardingPTransform.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ForwardingPTransform.java deleted file mode 100644 index 85aa1c4..0000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ForwardingPTransform.java +++ /dev/null @@ -1,62 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.runners.inprocess; - -import org.apache.beam.sdk.coders.CannotProvideCoderException; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.transforms.display.DisplayData; -import org.apache.beam.sdk.values.PInput; -import org.apache.beam.sdk.values.POutput; -import org.apache.beam.sdk.values.TypedPValue; - -/** - * A base class for implementing {@link PTransform} overrides, which behave identically to the - * delegate transform but with overridden methods. Implementors are required to implement - * {@link #delegate()}, which returns the object to forward calls to, and {@link #apply(PInput)}. - */ -public abstract class ForwardingPTransform<InputT extends PInput, OutputT extends POutput> - extends PTransform<InputT, OutputT> { - protected abstract PTransform<InputT, OutputT> delegate(); - - @Override - public OutputT apply(InputT input) { - return delegate().apply(input); - } - - @Override - public void validate(InputT input) { - delegate().validate(input); - } - - @Override - public String getName() { - return delegate().getName(); - } - - @Override - public <T> Coder<T> getDefaultOutputCoder(InputT input, @SuppressWarnings("unused") - TypedPValue<T> output) throws CannotProvideCoderException { - return delegate().getDefaultOutputCoder(input, output); - } - - @Override - public void populateDisplayData(DisplayData.Builder builder) { - delegate().populateDisplayData(builder); - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/GroupByKeyEvaluatorFactory.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/GroupByKeyEvaluatorFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/GroupByKeyEvaluatorFactory.java deleted file mode 100644 index 4cec841..0000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/GroupByKeyEvaluatorFactory.java +++ /dev/null @@ -1,274 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.runners.inprocess; - -import static org.apache.beam.sdk.util.CoderUtils.encodeToByteArray; - -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.CoderException; -import org.apache.beam.sdk.coders.IterableCoder; -import org.apache.beam.sdk.coders.KvCoder; -import org.apache.beam.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle; -import org.apache.beam.sdk.runners.inprocess.InProcessPipelineRunner.UncommittedBundle; -import org.apache.beam.sdk.runners.inprocess.StepTransformResult.Builder; -import org.apache.beam.sdk.transforms.AppliedPTransform; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.GroupByKey; -import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.util.GroupAlsoByWindowViaWindowSetDoFn; -import org.apache.beam.sdk.util.GroupByKeyViaGroupByKeyOnly.ReifyTimestampsAndWindows; -import org.apache.beam.sdk.util.KeyedWorkItem; -import org.apache.beam.sdk.util.KeyedWorkItemCoder; -import org.apache.beam.sdk.util.KeyedWorkItems; -import org.apache.beam.sdk.util.SystemReduceFn; -import org.apache.beam.sdk.util.WindowedValue; -import org.apache.beam.sdk.util.WindowingStrategy; -import org.apache.beam.sdk.values.KV; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PInput; -import org.apache.beam.sdk.values.POutput; - -import com.google.common.annotations.VisibleForTesting; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -/** - * The {@link InProcessPipelineRunner} {@link TransformEvaluatorFactory} for the {@link GroupByKey} - * {@link PTransform}. - */ -class GroupByKeyEvaluatorFactory implements TransformEvaluatorFactory { - @Override - public <InputT> TransformEvaluator<InputT> forApplication( - AppliedPTransform<?, ?, ?> application, - CommittedBundle<?> inputBundle, - InProcessEvaluationContext evaluationContext) { - @SuppressWarnings({"cast", "unchecked", "rawtypes"}) - TransformEvaluator<InputT> evaluator = createEvaluator( - (AppliedPTransform) application, (CommittedBundle) inputBundle, evaluationContext); - return evaluator; - } - - private <K, V> TransformEvaluator<KV<K, WindowedValue<V>>> createEvaluator( - final AppliedPTransform< - PCollection<KV<K, WindowedValue<V>>>, PCollection<KeyedWorkItem<K, V>>, - InProcessGroupByKeyOnly<K, V>> - application, - final CommittedBundle<KV<K, V>> inputBundle, - final InProcessEvaluationContext evaluationContext) { - return new GroupByKeyEvaluator<K, V>(evaluationContext, inputBundle, application); - } - - private static class GroupByKeyEvaluator<K, V> - implements TransformEvaluator<KV<K, WindowedValue<V>>> { - private final InProcessEvaluationContext evaluationContext; - - private final CommittedBundle<KV<K, V>> inputBundle; - private final AppliedPTransform< - PCollection<KV<K, WindowedValue<V>>>, PCollection<KeyedWorkItem<K, V>>, - InProcessGroupByKeyOnly<K, V>> - application; - private final Coder<K> keyCoder; - private Map<GroupingKey<K>, List<WindowedValue<V>>> groupingMap; - - public GroupByKeyEvaluator( - InProcessEvaluationContext evaluationContext, - CommittedBundle<KV<K, V>> inputBundle, - AppliedPTransform< - PCollection<KV<K, WindowedValue<V>>>, PCollection<KeyedWorkItem<K, V>>, - InProcessGroupByKeyOnly<K, V>> - application) { - this.evaluationContext = evaluationContext; - this.inputBundle = inputBundle; - this.application = application; - - PCollection<KV<K, WindowedValue<V>>> input = application.getInput(); - keyCoder = getKeyCoder(input.getCoder()); - groupingMap = new HashMap<>(); - } - - private Coder<K> getKeyCoder(Coder<KV<K, WindowedValue<V>>> coder) { - if (!(coder instanceof KvCoder)) { - throw new IllegalStateException(); - } - @SuppressWarnings("unchecked") - Coder<K> keyCoder = ((KvCoder<K, WindowedValue<V>>) coder).getKeyCoder(); - return keyCoder; - } - - @Override - public void processElement(WindowedValue<KV<K, WindowedValue<V>>> element) { - KV<K, WindowedValue<V>> kv = element.getValue(); - K key = kv.getKey(); - byte[] encodedKey; - try { - encodedKey = encodeToByteArray(keyCoder, key); - } catch (CoderException exn) { - // TODO: Put in better element printing: - // truncate if too long. - throw new IllegalArgumentException( - String.format("unable to encode key %s of input to %s using %s", key, this, keyCoder), - exn); - } - GroupingKey<K> groupingKey = new GroupingKey<>(key, encodedKey); - List<WindowedValue<V>> values = groupingMap.get(groupingKey); - if (values == null) { - values = new ArrayList<WindowedValue<V>>(); - groupingMap.put(groupingKey, values); - } - values.add(kv.getValue()); - } - - @Override - public InProcessTransformResult finishBundle() { - Builder resultBuilder = StepTransformResult.withoutHold(application); - for (Map.Entry<GroupingKey<K>, List<WindowedValue<V>>> groupedEntry : - groupingMap.entrySet()) { - K key = groupedEntry.getKey().key; - KeyedWorkItem<K, V> groupedKv = - KeyedWorkItems.elementsWorkItem(key, groupedEntry.getValue()); - UncommittedBundle<KeyedWorkItem<K, V>> bundle = - evaluationContext.createKeyedBundle(inputBundle, key, application.getOutput()); - bundle.add(WindowedValue.valueInGlobalWindow(groupedKv)); - resultBuilder.addOutput(bundle); - } - return resultBuilder.build(); - } - - private static class GroupingKey<K> { - private K key; - private byte[] encodedKey; - - public GroupingKey(K key, byte[] encodedKey) { - this.key = key; - this.encodedKey = encodedKey; - } - - @Override - public boolean equals(Object o) { - if (o instanceof GroupingKey) { - GroupingKey<?> that = (GroupingKey<?>) o; - return Arrays.equals(this.encodedKey, that.encodedKey); - } else { - return false; - } - } - - @Override - public int hashCode() { - return Arrays.hashCode(encodedKey); - } - } - } - - /** - * A {@link PTransformOverrideFactory} for {@link GroupByKey} PTransforms. - */ - public static final class InProcessGroupByKeyOverrideFactory - implements PTransformOverrideFactory { - @Override - public <InputT extends PInput, OutputT extends POutput> PTransform<InputT, OutputT> override( - PTransform<InputT, OutputT> transform) { - if (transform instanceof GroupByKey) { - @SuppressWarnings({"rawtypes", "unchecked"}) - PTransform<InputT, OutputT> override = new InProcessGroupByKey((GroupByKey) transform); - return override; - } - return transform; - } - } - - /** - * An in-memory implementation of the {@link GroupByKey} primitive as a composite - * {@link PTransform}. - */ - private static final class InProcessGroupByKey<K, V> - extends ForwardingPTransform<PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>> { - private final GroupByKey<K, V> original; - - private InProcessGroupByKey(GroupByKey<K, V> from) { - this.original = from; - } - - @Override - public PTransform<PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>> delegate() { - return original; - } - - @Override - public PCollection<KV<K, Iterable<V>>> apply(PCollection<KV<K, V>> input) { - KvCoder<K, V> inputCoder = (KvCoder<K, V>) input.getCoder(); - - // This operation groups by the combination of key and window, - // merging windows as needed, using the windows assigned to the - // key/value input elements and the window merge operation of the - // window function associated with the input PCollection. - WindowingStrategy<?, ?> windowingStrategy = input.getWindowingStrategy(); - - // Use the default GroupAlsoByWindow implementation - DoFn<KeyedWorkItem<K, V>, KV<K, Iterable<V>>> groupAlsoByWindow = - groupAlsoByWindow(windowingStrategy, inputCoder.getValueCoder()); - - // By default, implement GroupByKey via a series of lower-level operations. - return input - // Make each input element's timestamp and assigned windows - // explicit, in the value part. - .apply(new ReifyTimestampsAndWindows<K, V>()) - - .apply(new InProcessGroupByKeyOnly<K, V>()) - .setCoder(KeyedWorkItemCoder.of(inputCoder.getKeyCoder(), - inputCoder.getValueCoder(), input.getWindowingStrategy().getWindowFn().windowCoder())) - - // Group each key's values by window, merging windows as needed. - .apply("GroupAlsoByWindow", ParDo.of(groupAlsoByWindow)) - - // And update the windowing strategy as appropriate. - .setWindowingStrategyInternal(original.updateWindowingStrategy(windowingStrategy)) - .setCoder( - KvCoder.of(inputCoder.getKeyCoder(), IterableCoder.of(inputCoder.getValueCoder()))); - } - - private <W extends BoundedWindow> - DoFn<KeyedWorkItem<K, V>, KV<K, Iterable<V>>> groupAlsoByWindow( - final WindowingStrategy<?, W> windowingStrategy, final Coder<V> inputCoder) { - return GroupAlsoByWindowViaWindowSetDoFn.create( - windowingStrategy, SystemReduceFn.<K, V, W>buffering(inputCoder)); - } - } - - /** - * An implementation primitive to use in the evaluation of a {@link GroupByKey} - * {@link PTransform}. - */ - public static final class InProcessGroupByKeyOnly<K, V> - extends PTransform<PCollection<KV<K, WindowedValue<V>>>, PCollection<KeyedWorkItem<K, V>>> { - @Override - public PCollection<KeyedWorkItem<K, V>> apply(PCollection<KV<K, WindowedValue<V>>> input) { - return PCollection.<KeyedWorkItem<K, V>>createPrimitiveOutputInternal( - input.getPipeline(), input.getWindowingStrategy(), input.isBounded()); - } - - @VisibleForTesting - InProcessGroupByKeyOnly() {} - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ImmutabilityCheckingBundleFactory.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ImmutabilityCheckingBundleFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ImmutabilityCheckingBundleFactory.java deleted file mode 100644 index 04ece1c..0000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ImmutabilityCheckingBundleFactory.java +++ /dev/null @@ -1,131 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.runners.inprocess; - -import static com.google.common.base.Preconditions.checkNotNull; - -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.CoderException; -import org.apache.beam.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle; -import org.apache.beam.sdk.runners.inprocess.InProcessPipelineRunner.UncommittedBundle; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.util.IllegalMutationException; -import org.apache.beam.sdk.util.MutationDetector; -import org.apache.beam.sdk.util.MutationDetectors; -import org.apache.beam.sdk.util.UserCodeException; -import org.apache.beam.sdk.util.WindowedValue; -import org.apache.beam.sdk.values.PCollection; - -import com.google.api.client.util.Throwables; -import com.google.common.collect.HashMultimap; -import com.google.common.collect.SetMultimap; - -import org.joda.time.Instant; - -/** - * A {@link BundleFactory} that ensures that elements added to it are not mutated after being - * output. Immutability checks are enforced at the time {@link UncommittedBundle#commit(Instant)} is - * called, checking the value at that time against the value at the time the element was added. All - * elements added to the bundle will be encoded by the {@link Coder} of the underlying - * {@link PCollection}. - * - * <p>This catches errors during the execution of a {@link DoFn} caused by modifying an element - * after it is added to an output {@link PCollection}. - */ -class ImmutabilityCheckingBundleFactory implements BundleFactory { - /** - * Create a new {@link ImmutabilityCheckingBundleFactory} that uses the underlying - * {@link BundleFactory} to create the output bundle. - */ - public static ImmutabilityCheckingBundleFactory create(BundleFactory underlying) { - return new ImmutabilityCheckingBundleFactory(underlying); - } - - private final BundleFactory underlying; - - private ImmutabilityCheckingBundleFactory(BundleFactory underlying) { - this.underlying = checkNotNull(underlying); - } - - @Override - public <T> UncommittedBundle<T> createRootBundle(PCollection<T> output) { - return new ImmutabilityEnforcingBundle<>(underlying.createRootBundle(output)); - } - - @Override - public <T> UncommittedBundle<T> createBundle(CommittedBundle<?> input, PCollection<T> output) { - return new ImmutabilityEnforcingBundle<>(underlying.createBundle(input, output)); - } - - @Override - public <T> UncommittedBundle<T> createKeyedBundle( - CommittedBundle<?> input, Object key, PCollection<T> output) { - return new ImmutabilityEnforcingBundle<>(underlying.createKeyedBundle(input, key, output)); - } - - private static class ImmutabilityEnforcingBundle<T> implements UncommittedBundle<T> { - private final UncommittedBundle<T> underlying; - private final SetMultimap<WindowedValue<T>, MutationDetector> mutationDetectors; - private Coder<T> coder; - - public ImmutabilityEnforcingBundle(UncommittedBundle<T> underlying) { - this.underlying = underlying; - mutationDetectors = HashMultimap.create(); - coder = getPCollection().getCoder(); - } - - @Override - public PCollection<T> getPCollection() { - return underlying.getPCollection(); - } - - @Override - public UncommittedBundle<T> add(WindowedValue<T> element) { - try { - mutationDetectors.put( - element, MutationDetectors.forValueWithCoder(element.getValue(), coder)); - } catch (CoderException e) { - throw Throwables.propagate(e); - } - underlying.add(element); - return this; - } - - @Override - public CommittedBundle<T> commit(Instant synchronizedProcessingTime) { - for (MutationDetector detector : mutationDetectors.values()) { - try { - detector.verifyUnmodified(); - } catch (IllegalMutationException exn) { - throw UserCodeException.wrap( - new IllegalMutationException( - String.format( - "PTransform %s mutated value %s after it was output (new value was %s)." - + " Values must not be mutated in any way after being output.", - underlying.getPCollection().getProducingTransformInternal().getFullName(), - exn.getSavedValue(), - exn.getNewValue()), - exn.getSavedValue(), - exn.getNewValue(), - exn)); - } - } - return underlying.commit(synchronizedProcessingTime); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ImmutabilityEnforcementFactory.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ImmutabilityEnforcementFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ImmutabilityEnforcementFactory.java deleted file mode 100644 index 2f21032..0000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ImmutabilityEnforcementFactory.java +++ /dev/null @@ -1,103 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.runners.inprocess; - -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.CoderException; -import org.apache.beam.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle; -import org.apache.beam.sdk.transforms.AppliedPTransform; -import org.apache.beam.sdk.util.IllegalMutationException; -import org.apache.beam.sdk.util.MutationDetector; -import org.apache.beam.sdk.util.MutationDetectors; -import org.apache.beam.sdk.util.UserCodeException; -import org.apache.beam.sdk.util.WindowedValue; - -import java.util.IdentityHashMap; -import java.util.Map; - -/** - * {@link ModelEnforcement} that enforces elements are not modified over the course of processing - * an element. - * - * <p>Implies {@link EncodabilityEnforcment}. - */ -class ImmutabilityEnforcementFactory implements ModelEnforcementFactory { - public static ModelEnforcementFactory create() { - return new ImmutabilityEnforcementFactory(); - } - - @Override - public <T> ModelEnforcement<T> forBundle( - CommittedBundle<T> input, AppliedPTransform<?, ?, ?> consumer) { - return new ImmutabilityCheckingEnforcement<T>(input, consumer); - } - - private static class ImmutabilityCheckingEnforcement<T> extends AbstractModelEnforcement<T> { - private final AppliedPTransform<?, ?, ?> transform; - private final Map<WindowedValue<T>, MutationDetector> mutationElements; - private final Coder<T> coder; - - private ImmutabilityCheckingEnforcement( - CommittedBundle<T> input, AppliedPTransform<?, ?, ?> transform) { - this.transform = transform; - coder = input.getPCollection().getCoder(); - mutationElements = new IdentityHashMap<>(); - } - - @Override - public void beforeElement(WindowedValue<T> element) { - try { - mutationElements.put( - element, MutationDetectors.forValueWithCoder(element.getValue(), coder)); - } catch (CoderException e) { - throw UserCodeException.wrap(e); - } - } - - @Override - public void afterElement(WindowedValue<T> element) { - verifyUnmodified(mutationElements.get(element)); - } - - @Override - public void afterFinish( - CommittedBundle<T> input, - InProcessTransformResult result, - Iterable<? extends CommittedBundle<?>> outputs) { - for (MutationDetector detector : mutationElements.values()) { - verifyUnmodified(detector); - } - } - - private void verifyUnmodified(MutationDetector detector) { - try { - detector.verifyUnmodified(); - } catch (IllegalMutationException e) { - throw new IllegalMutationException( - String.format( - "PTransform %s illegaly mutated value %s of class %s." - + " Input values must not be mutated in any way.", - transform.getFullName(), - e.getSavedValue(), - e.getSavedValue().getClass()), - e.getSavedValue(), - e.getNewValue()); - } - } - } -}