[ https://issues.apache.org/jira/browse/BEAM-2899?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16267752#comment-16267752 ]
ASF GitHub Bot commented on BEAM-2899: -------------------------------------- tgroh closed pull request #4151: [BEAM-2899] Decompose Direct Execution Components URL: https://github.com/apache/beam/pull/4151 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BundleProcessor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BundleProcessor.java new file mode 100644 index 00000000000..59d3043450a --- /dev/null +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BundleProcessor.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.direct; + +import org.apache.beam.runners.local.Bundle; + +/** + * An executor that is capable of processing some bundle of input over some executable stage or + * step. + */ +interface BundleProcessor<BundleT extends Bundle<?>, ExecutableT> { + /** + * Execute the provided bundle using the provided Executable, calling back to the {@link + * CompletionCallback} when execution completes. + */ + void process(BundleT bundle, ExecutableT consumer, CompletionCallback onComplete); +} diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java index d041a5a84ae..89f75a5e12e 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java @@ -188,16 +188,14 @@ public DirectPipelineResult run(Pipeline originalPipeline) { graph, keyedPValueVisitor.getKeyedPValues()); - RootProviderRegistry rootInputProvider = RootProviderRegistry.defaultRegistry(context); TransformEvaluatorRegistry registry = TransformEvaluatorRegistry.defaultRegistry(context); PipelineExecutor executor = ExecutorServiceParallelExecutor.create( - options.getTargetParallelism(), graph, - rootInputProvider, + options.getTargetParallelism(), registry, Enforcement.defaultModelEnforcements(enabledEnforcements), context); - executor.start(graph.getRootTransforms()); + executor.start(graph, RootProviderRegistry.defaultRegistry(context)); DirectPipelineResult result = new DirectPipelineResult(executor, context); if (options.isBlockOnRun()) { diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java index fe3765b8d2b..1650bb67ecd 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java @@ -17,45 +17,32 @@ */ package org.apache.beam.runners.direct; -import static com.google.common.base.Preconditions.checkState; - -import com.google.auto.value.AutoValue; import com.google.common.base.Optional; import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheLoader; import com.google.common.cache.LoadingCache; import com.google.common.cache.RemovalListener; import com.google.common.cache.RemovalNotification; -import com.google.common.collect.Iterables; +import com.google.common.collect.ImmutableMap; import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.ThreadFactoryBuilder; -import java.util.ArrayList; import java.util.Collection; -import java.util.Collections; import java.util.Map; -import java.util.Queue; import java.util.concurrent.BlockingQueue; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import javax.annotation.Nullable; -import org.apache.beam.runners.core.KeyedWorkItem; -import org.apache.beam.runners.core.KeyedWorkItems; -import org.apache.beam.runners.core.TimerInternals.TimerData; -import org.apache.beam.runners.direct.WatermarkManager.FiredTimers; +import org.apache.beam.runners.local.ExecutionDriver; +import org.apache.beam.runners.local.ExecutionDriver.DriverState; +import org.apache.beam.runners.local.PipelineMessageReceiver; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.PipelineResult.State; import org.apache.beam.sdk.runners.AppliedPTransform; import org.apache.beam.sdk.util.UserCodeException; -import org.apache.beam.sdk.util.WindowedValue; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.PValue; import org.joda.time.Duration; import org.joda.time.Instant; @@ -63,17 +50,16 @@ import org.slf4j.LoggerFactory; /** - * An {@link PipelineExecutor} that uses an underlying {@link ExecutorService} and - * {@link EvaluationContext} to execute a {@link Pipeline}. + * An {@link PipelineExecutor} that uses an underlying {@link ExecutorService} and {@link + * EvaluationContext} to execute a {@link Pipeline}. */ -final class ExecutorServiceParallelExecutor implements PipelineExecutor { +final class ExecutorServiceParallelExecutor + implements PipelineExecutor, BundleProcessor<CommittedBundle<?>, AppliedPTransform<?, ?, ?>> { private static final Logger LOG = LoggerFactory.getLogger(ExecutorServiceParallelExecutor.class); private final int targetParallelism; private final ExecutorService executorService; - private final DirectGraph graph; - private final RootProviderRegistry rootProviderRegistry; private final TransformEvaluatorRegistry registry; private final EvaluationContext evaluationContext; @@ -82,48 +68,21 @@ private final TransformExecutorService parallelExecutorService; private final LoadingCache<StepAndKey, TransformExecutorService> serialExecutorServices; - private final Queue<ExecutorUpdate> allUpdates; - private final BlockingQueue<VisibleExecutorUpdate> visibleUpdates; - - private final CompletionCallback defaultCompletionCallback; - - private final ConcurrentMap<AppliedPTransform<?, ?, ?>, ConcurrentLinkedQueue<CommittedBundle<?>>> - pendingRootBundles; - - private final AtomicReference<ExecutorState> state = - new AtomicReference<>(ExecutorState.QUIESCENT); - - /** - * Measures the number of {@link TransformExecutor TransformExecutors} that have been - * scheduled but not yet completed. - * - * <p>Before a {@link TransformExecutor} is scheduled, this value is incremented. All - * methods in {@link CompletionCallback} decrement this value. - */ - private final AtomicLong outstandingWork = new AtomicLong(); + private final QueueMessageReceiver visibleUpdates; private AtomicReference<State> pipelineState = new AtomicReference<>(State.RUNNING); public static ExecutorServiceParallelExecutor create( int targetParallelism, - DirectGraph graph, - RootProviderRegistry rootProviderRegistry, TransformEvaluatorRegistry registry, Map<String, Collection<ModelEnforcementFactory>> transformEnforcements, EvaluationContext context) { return new ExecutorServiceParallelExecutor( - targetParallelism, - graph, - rootProviderRegistry, - registry, - transformEnforcements, - context); + targetParallelism, registry, transformEnforcements, context); } private ExecutorServiceParallelExecutor( int targetParallelism, - DirectGraph graph, - RootProviderRegistry rootProviderRegistry, TransformEvaluatorRegistry registry, Map<String, Collection<ModelEnforcementFactory>> transformEnforcements, EvaluationContext context) { @@ -137,8 +96,6 @@ private ExecutorServiceParallelExecutor( .setThreadFactory(MoreExecutors.platformThreadFactory()) .setNameFormat("direct-runner-worker") .build()); - this.graph = graph; - this.rootProviderRegistry = rootProviderRegistry; this.registry = registry; this.evaluationContext = context; @@ -151,13 +108,9 @@ private ExecutorServiceParallelExecutor( .removalListener(shutdownExecutorServiceListener()) .build(serialTransformExecutorServiceCacheLoader()); - this.allUpdates = new ConcurrentLinkedQueue<>(); - this.visibleUpdates = new LinkedBlockingQueue<>(); + this.visibleUpdates = new QueueMessageReceiver(); parallelExecutorService = TransformExecutorServices.parallel(executorService); - defaultCompletionCallback = - new TimerIterableCompletionCallback(Collections.<TimerData>emptyList()); - this.pendingRootBundles = new ConcurrentHashMap<>(); executorFactory = new DirectTransformExecutor.Factory(context, registry, transformEnforcements); } @@ -185,9 +138,11 @@ public void onRemoval( } @Override - public void start(Collection<AppliedPTransform<?, ?, ?>> roots) { + public void start(DirectGraph graph, RootProviderRegistry rootProviderRegistry) { int numTargetSplits = Math.max(3, targetParallelism); - for (AppliedPTransform<?, ?, ?> root : roots) { + ImmutableMap.Builder<AppliedPTransform<?, ?, ?>, ConcurrentLinkedQueue<CommittedBundle<?>>> + pendingRootBundles = ImmutableMap.builder(); + for (AppliedPTransform<?, ?, ?> root : graph.getRootTransforms()) { ConcurrentLinkedQueue<CommittedBundle<?>> pending = new ConcurrentLinkedQueue<>(); try { Collection<CommittedBundle<?>> initialInputs = @@ -198,15 +153,44 @@ public void start(Collection<AppliedPTransform<?, ?, ?>> roots) { } pendingRootBundles.put(root, pending); } - evaluationContext.initialize(pendingRootBundles); - Runnable monitorRunnable = new MonitorRunnable(); - executorService.submit(monitorRunnable); + evaluationContext.initialize(pendingRootBundles.build()); + final ExecutionDriver executionDriver = + QuiescenceDriver.create( + evaluationContext, graph, this, visibleUpdates, pendingRootBundles.build()); + executorService.submit( + new Runnable() { + @Override + public void run() { + DriverState drive = executionDriver.drive(); + if (drive.isTermainal()) { + State newPipelineState = State.UNKNOWN; + switch (drive) { + case FAILED: + newPipelineState = State.FAILED; + break; + case SHUTDOWN: + newPipelineState = State.DONE; + break; + case CONTINUE: + throw new IllegalStateException( + String.format("%s should not be a terminal state", DriverState.CONTINUE)); + default: + throw new IllegalArgumentException( + String.format("Unknown %s %s", DriverState.class.getSimpleName(), drive)); + } + shutdownIfNecessary(newPipelineState); + } else { + executorService.submit(this); + } + } + }); } @SuppressWarnings("unchecked") - private void scheduleConsumption( - AppliedPTransform<?, ?, ?> consumer, + @Override + public void process( CommittedBundle<?> bundle, + AppliedPTransform<?, ?, ?> consumer, CompletionCallback onComplete) { evaluateBundle(consumer, bundle, onComplete); } @@ -231,7 +215,6 @@ private void scheduleConsumption( TransformExecutor callable = executorFactory.create(bundle, transform, onComplete, transformExecutor); - outstandingWork.incrementAndGet(); if (!pipelineState.get().isTerminal()) { transformExecutor.schedule(callable); } @@ -241,13 +224,6 @@ private boolean isKeyed(PValue pvalue) { return evaluationContext.isKeyed(pvalue); } - private void scheduleConsumers(ExecutorUpdate update) { - CommittedBundle<?> bundle = update.getBundle().get(); - for (AppliedPTransform<?, ?, ?> consumer : update.getConsumers()) { - scheduleConsumption(consumer, bundle, defaultCompletionCallback); - } - } - @Override public State waitUntilFinish(Duration duration) throws Exception { Instant completionTime; @@ -262,7 +238,7 @@ public State waitUntilFinish(Duration duration) throws Exception { && (update == null || isTerminalStateUpdate(update))) { // Get an update; don't block forever if another thread has handled it. The call to poll will // wait the entire timeout; this call primarily exists to relinquish any core. - update = visibleUpdates.poll(25L, TimeUnit.MILLISECONDS); + update = visibleUpdates.tryNext(Duration.millis(25L)); if (update == null && pipelineState.get().isTerminal()) { // there are no updates to process and no updates will ever be published because the // executor is shutdown @@ -293,10 +269,7 @@ private boolean isTerminalStateUpdate(VisibleExecutorUpdate update) { @Override public void stop() { shutdownIfNecessary(State.CANCELLED); - while (!visibleUpdates.offer(VisibleExecutorUpdate.cancelled())) { - // Make sure "This Pipeline was Cancelled" notification arrives. - visibleUpdates.poll(); - } + visibleUpdates.cancelled(); } private void shutdownIfNecessary(State newState) { @@ -314,114 +287,17 @@ private void shutdownIfNecessary(State newState) { try { registry.cleanup(); } catch (Exception e) { - visibleUpdates.add(VisibleExecutorUpdate.fromException(e)); + visibleUpdates.failed(e); } } /** - * The base implementation of {@link CompletionCallback} that provides implementations for - * {@link #handleResult(CommittedBundle, TransformResult)} and - * {@link #handleException(CommittedBundle, Exception)}. - */ - private class TimerIterableCompletionCallback implements CompletionCallback { - private final Iterable<TimerData> timers; - - protected TimerIterableCompletionCallback(Iterable<TimerData> timers) { - this.timers = timers; - } - - @Override - public final CommittedResult handleResult( - CommittedBundle<?> inputBundle, TransformResult<?> result) { - CommittedResult committedResult = evaluationContext.handleResult(inputBundle, timers, result); - for (CommittedBundle<?> outputBundle : committedResult.getOutputs()) { - allUpdates.offer( - ExecutorUpdate.fromBundle( - outputBundle, graph.getPerElementConsumers(outputBundle.getPCollection()))); - } - Optional<? extends CommittedBundle<?>> unprocessedInputs = - committedResult.getUnprocessedInputs(); - if (unprocessedInputs.isPresent()) { - if (inputBundle.getPCollection() == null) { - // TODO: Split this logic out of an if statement - pendingRootBundles.get(result.getTransform()).offer(unprocessedInputs.get()); - } else { - allUpdates.offer( - ExecutorUpdate.fromBundle( - unprocessedInputs.get(), - Collections.<AppliedPTransform<?, ?, ?>>singleton( - committedResult.getTransform()))); - } - } - if (!committedResult.getProducedOutputTypes().isEmpty()) { - state.set(ExecutorState.ACTIVE); - } - outstandingWork.decrementAndGet(); - return committedResult; - } - - @Override - public void handleEmpty(AppliedPTransform<?, ?, ?> transform) { - outstandingWork.decrementAndGet(); - } - - @Override - public final void handleException(CommittedBundle<?> inputBundle, Exception e) { - allUpdates.offer(ExecutorUpdate.fromException(e)); - outstandingWork.decrementAndGet(); - } - - @Override - public void handleError(Error err) { - visibleUpdates.add(VisibleExecutorUpdate.fromError(err)); - } - } - - /** - * An internal status update on the state of the executor. - * - * <p>Used to signal when the executor should be shut down (due to an exception). - */ - @AutoValue - abstract static class ExecutorUpdate { - public static ExecutorUpdate fromBundle( - CommittedBundle<?> bundle, - Collection<AppliedPTransform<?, ?, ?>> consumers) { - return new AutoValue_ExecutorServiceParallelExecutor_ExecutorUpdate( - Optional.of(bundle), - consumers, - Optional.<Exception>absent()); - } - - public static ExecutorUpdate fromException(Exception e) { - return new AutoValue_ExecutorServiceParallelExecutor_ExecutorUpdate( - Optional.<CommittedBundle<?>>absent(), - Collections.<AppliedPTransform<?, ?, ?>>emptyList(), - Optional.of(e)); - } - - /** - * Returns the bundle that produced this update. - */ - public abstract Optional<? extends CommittedBundle<?>> getBundle(); - - /** - * Returns the transforms to process the bundle. If nonempty, {@link #getBundle()} will return - * a present {@link Optional}. - */ - public abstract Collection<AppliedPTransform<?, ?, ?>> getConsumers(); - - public abstract Optional<? extends Exception> getException(); - } - - /** - * An update of interest to the user. Used in {@link #waitUntilFinish} to decide whether to - * return normally or throw an exception. + * An update of interest to the user. Used in {@link #waitUntilFinish} to decide whether to return + * normally or throw an exception. */ private static class VisibleExecutorUpdate { private final Optional<? extends Throwable> thrown; - @Nullable - private final State newState; + @Nullable private final State newState; public static VisibleExecutorUpdate fromException(Exception e) { return new VisibleExecutorUpdate(null, e); @@ -444,193 +320,41 @@ private VisibleExecutorUpdate(State newState, @Nullable Throwable exception) { this.newState = newState; } - public State getNewState() { + State getNewState() { return newState; } } - private class MonitorRunnable implements Runnable { - private final String runnableName = String.format("%s$%s-monitor", - evaluationContext.getPipelineOptions().getAppName(), - ExecutorServiceParallelExecutor.class.getSimpleName()); - - private boolean exceptionThrown = false; + private static class QueueMessageReceiver implements PipelineMessageReceiver { + // If the type of BlockingQueue changes, ensure the findbugs filter is updated appropriately + private final BlockingQueue<VisibleExecutorUpdate> updates = new LinkedBlockingQueue<>(); @Override - public void run() { - String oldName = Thread.currentThread().getName(); - Thread.currentThread().setName(runnableName); - try { - boolean noWorkOutstanding = outstandingWork.get() == 0L; - ExecutorState startingState = state.get(); - if (startingState == ExecutorState.ACTIVE) { - // The remainder of this call will add all available work to the Executor, and there will - // be no new work available - state.compareAndSet(ExecutorState.ACTIVE, ExecutorState.PROCESSING); - } else if (startingState == ExecutorState.PROCESSING && noWorkOutstanding) { - // The executor has consumed all new work and no new work was added - state.compareAndSet(ExecutorState.PROCESSING, ExecutorState.QUIESCING); - } else if (startingState == ExecutorState.QUIESCING && noWorkOutstanding) { - // The executor re-ran all blocked work and nothing could make progress. - state.compareAndSet(ExecutorState.QUIESCING, ExecutorState.QUIESCENT); - } - fireTimers(); - Collection<ExecutorUpdate> updates = new ArrayList<>(); - // Pull all available updates off of the queue before adding additional work. This ensures - // both loops terminate. - ExecutorUpdate pendingUpdate = allUpdates.poll(); - while (pendingUpdate != null) { - updates.add(pendingUpdate); - pendingUpdate = allUpdates.poll(); - } - for (ExecutorUpdate update : updates) { - applyUpdate(noWorkOutstanding, startingState, update); - } - addWorkIfNecessary(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - LOG.error("Monitor died due to being interrupted"); - while (!visibleUpdates.offer(VisibleExecutorUpdate.fromException(e))) { - visibleUpdates.poll(); - } - } catch (Exception t) { - LOG.error("Monitor thread died due to exception", t); - while (!visibleUpdates.offer(VisibleExecutorUpdate.fromException(t))) { - visibleUpdates.poll(); - } - } finally { - if (!shouldShutdown()) { - // The monitor thread should always be scheduled; but we only need to be scheduled once - executorService.submit(this); - } - Thread.currentThread().setName(oldName); - } + public void failed(Exception e) { + updates.offer(VisibleExecutorUpdate.fromException(e)); } - private void applyUpdate( - boolean noWorkOutstanding, ExecutorState startingState, ExecutorUpdate update) { - LOG.debug("Executor Update: {}", update); - if (update.getBundle().isPresent()) { - if (ExecutorState.ACTIVE == startingState - || (ExecutorState.PROCESSING == startingState - && noWorkOutstanding)) { - scheduleConsumers(update); - } else { - allUpdates.offer(update); - } - } else if (update.getException().isPresent()) { - checkState( - visibleUpdates.offer(VisibleExecutorUpdate.fromException(update.getException().get())), - "VisibleUpdates should always be able to receive an offered update"); - exceptionThrown = true; - } + @Override + public void failed(Error e) { + updates.offer(VisibleExecutorUpdate.fromError(e)); } - /** - * Fires any available timers. - */ - private void fireTimers() throws Exception { - try { - for (FiredTimers transformTimers : evaluationContext.extractFiredTimers()) { - Collection<TimerData> delivery = transformTimers.getTimers(); - KeyedWorkItem<?, Object> work = - KeyedWorkItems.timersWorkItem(transformTimers.getKey().getKey(), delivery); - @SuppressWarnings({"unchecked", "rawtypes"}) - CommittedBundle<?> bundle = - evaluationContext - .createKeyedBundle( - transformTimers.getKey(), - (PCollection) - Iterables.getOnlyElement( - transformTimers.getTransform().getInputs().values())) - .add(WindowedValue.valueInGlobalWindow(work)) - .commit(evaluationContext.now()); - scheduleConsumption( - transformTimers.getTransform(), - bundle, - new TimerIterableCompletionCallback(delivery)); - state.set(ExecutorState.ACTIVE); - } - } catch (Exception e) { - LOG.error("Internal Error while delivering timers", e); - throw e; - } + @Override + public void cancelled() { + updates.offer(VisibleExecutorUpdate.cancelled()); } - private boolean shouldShutdown() { - State nextState = State.UNKNOWN; - if (exceptionThrown) { - nextState = State.FAILED; - } else if (evaluationContext.isDone()) { - visibleUpdates.offer(VisibleExecutorUpdate.finished()); - nextState = State.DONE; - } - shutdownIfNecessary(nextState); - return pipelineState.get().isTerminal(); + @Override + public void completed() { + updates.offer(VisibleExecutorUpdate.finished()); } /** - * If all active {@link DirectTransformExecutor TransformExecutors} are in a blocked state, - * add more work from root nodes that may have additional work. This ensures that if a pipeline - * has elements available from the root nodes it will add those elements when necessary. + * Try to get the next unconsumed message in this {@link QueueMessageReceiver}. */ - private void addWorkIfNecessary() { - // If any timers have fired, they will add more work; We don't need to add more - if (state.get() == ExecutorState.QUIESCENT) { - // All current TransformExecutors are blocked; add more work from the roots. - for (Map.Entry<AppliedPTransform<?, ?, ?>, ConcurrentLinkedQueue<CommittedBundle<?>>> - pendingRootEntry : pendingRootBundles.entrySet()) { - Collection<CommittedBundle<?>> bundles = new ArrayList<>(); - // Pull all available work off of the queue, then schedule it all, so this loop - // terminates - while (!pendingRootEntry.getValue().isEmpty()) { - CommittedBundle<?> bundle = pendingRootEntry.getValue().poll(); - bundles.add(bundle); - } - for (CommittedBundle<?> bundle : bundles) { - scheduleConsumption(pendingRootEntry.getKey(), bundle, defaultCompletionCallback); - state.set(ExecutorState.ACTIVE); - } - } - } + @Nullable + private VisibleExecutorUpdate tryNext(Duration timeout) throws InterruptedException { + return updates.poll(timeout.getMillis(), TimeUnit.MILLISECONDS); } } - - - /** - * The state of the executor. The state of the executor determines the behavior of the - * {@link MonitorRunnable} when it runs. - */ - private enum ExecutorState { - /** - * Output has been produced since the last time the monitor ran. Work exists that has not yet - * been evaluated, and all pending, including potentially blocked work, should be evaluated. - * - * <p>The executor becomes active whenever a timer fires, a {@link PCollectionView} is updated, - * or output is produced by the evaluation of a {@link DirectTransformExecutor}. - */ - ACTIVE, - /** - * The Executor does not have any unevaluated work available to it, but work is in progress. - * Work should not be added until the Executor becomes active or no work is outstanding. - * - * <p>If all outstanding work completes without the executor becoming {@code ACTIVE}, the - * Executor enters state {@code QUIESCING}. Previously evaluated work must be reevaluated, in - * case a side input has made progress. - */ - PROCESSING, - /** - * All outstanding work is work that may be blocked on a side input. When there is no - * outstanding work, the executor becomes {@code QUIESCENT}. - */ - QUIESCING, - /** - * All elements are either buffered in state or are blocked on a side input. There are no - * timers that are permitted to fire but have not. There is no outstanding work. - * - * <p>The pipeline will not make progress without the progression of watermarks, the progression - * of processing time, or the addition of elements. - */ - QUIESCENT - } } diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/PipelineExecutor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/PipelineExecutor.java index d954fa206eb..5932ef59c93 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/PipelineExecutor.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/PipelineExecutor.java @@ -17,7 +17,6 @@ */ package org.apache.beam.runners.direct; -import java.util.Collection; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.PipelineResult.State; import org.apache.beam.sdk.runners.AppliedPTransform; @@ -30,12 +29,11 @@ */ interface PipelineExecutor { /** - * Starts this executor. The provided collection is the collection of root transforms to - * initially schedule. - * - * @param rootTransforms + * Starts this executor on the provided graph. The {@link RootProviderRegistry} will be used to + * create initial inputs for the provide {@link DirectGraph graph}. */ - void start(Collection<AppliedPTransform<?, ?, ?>> rootTransforms); + void start( + DirectGraph graph, RootProviderRegistry rootProviderRegistry); /** * Blocks until the job being executed enters a terminal state. A job is completed after all diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/QuiescenceDriver.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/QuiescenceDriver.java new file mode 100644 index 00000000000..7c0888c76d2 --- /dev/null +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/QuiescenceDriver.java @@ -0,0 +1,333 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.direct; + +import com.google.auto.value.AutoValue; +import com.google.common.base.Optional; +import com.google.common.collect.Iterables; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Map; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; +import org.apache.beam.runners.core.KeyedWorkItem; +import org.apache.beam.runners.core.KeyedWorkItems; +import org.apache.beam.runners.core.TimerInternals.TimerData; +import org.apache.beam.runners.direct.WatermarkManager.FiredTimers; +import org.apache.beam.runners.local.ExecutionDriver; +import org.apache.beam.runners.local.PipelineMessageReceiver; +import org.apache.beam.sdk.runners.AppliedPTransform; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionView; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Pushes additional work onto a {@link BundleProcessor} based on the fact that a pipeline has + * quiesced. + */ +class QuiescenceDriver implements ExecutionDriver { + private static final Logger LOG = LoggerFactory.getLogger(QuiescenceDriver.class); + + public static ExecutionDriver create( + EvaluationContext context, + DirectGraph graph, + BundleProcessor<CommittedBundle<?>, AppliedPTransform<?, ?, ?>> bundleProcessor, + PipelineMessageReceiver messageReceiver, + Map<AppliedPTransform<?, ?, ?>, ConcurrentLinkedQueue<CommittedBundle<?>>> initialBundles) { + return new QuiescenceDriver(context, graph, bundleProcessor, messageReceiver, initialBundles); + } + + private final EvaluationContext evaluationContext; + private final DirectGraph graph; + private final BundleProcessor<CommittedBundle<?>, AppliedPTransform<?, ?, ?>> bundleProcessor; + private final PipelineMessageReceiver pipelineMessageReceiver; + + private final CompletionCallback defaultCompletionCallback = + new TimerIterableCompletionCallback(Collections.<TimerData>emptyList()); + + private final Map<AppliedPTransform<?, ?, ?>, ConcurrentLinkedQueue<CommittedBundle<?>>> + pendingRootBundles; + private final Queue<WorkUpdate> pendingWork = new ConcurrentLinkedQueue<>(); + + private final AtomicReference<ExecutorState> state = + new AtomicReference<>(ExecutorState.QUIESCENT); + private final AtomicLong outstandingWork = new AtomicLong(0L); + private boolean exceptionThrown = false; + + private QuiescenceDriver( + EvaluationContext evaluationContext, + DirectGraph graph, + BundleProcessor<CommittedBundle<?>, AppliedPTransform<?, ?, ?>> bundleProcessor, + PipelineMessageReceiver pipelineMessageReceiver, + Map<AppliedPTransform<?, ?, ?>, ConcurrentLinkedQueue<CommittedBundle<?>>> + pendingRootBundles) { + this.evaluationContext = evaluationContext; + this.graph = graph; + this.bundleProcessor = bundleProcessor; + this.pipelineMessageReceiver = pipelineMessageReceiver; + this.pendingRootBundles = pendingRootBundles; + } + + @Override + public DriverState drive() { + boolean noWorkOutstanding = outstandingWork.get() == 0L; + ExecutorState startingState = state.get(); + if (startingState == ExecutorState.ACTIVE) { + // The remainder of this call will add all available work to the Executor, and there will + // be no new work available + state.compareAndSet(ExecutorState.ACTIVE, ExecutorState.PROCESSING); + } else if (startingState == ExecutorState.PROCESSING && noWorkOutstanding) { + // The executor has consumed all new work and no new work was added + state.compareAndSet(ExecutorState.PROCESSING, ExecutorState.QUIESCING); + } else if (startingState == ExecutorState.QUIESCING && noWorkOutstanding) { + // The executor re-ran all blocked work and nothing could make progress. + state.compareAndSet(ExecutorState.QUIESCING, ExecutorState.QUIESCENT); + } + fireTimers(); + Collection<WorkUpdate> updates = new ArrayList<>(); + // Pull all available updates off of the queue before adding additional work. This ensures + // both loops terminate. + WorkUpdate pendingUpdate = pendingWork.poll(); + while (pendingUpdate != null) { + updates.add(pendingUpdate); + pendingUpdate = pendingWork.poll(); + } + for (WorkUpdate update : updates) { + applyUpdate(noWorkOutstanding, startingState, update); + } + addWorkIfNecessary(); + + if (exceptionThrown) { + return DriverState.FAILED; + } else if (evaluationContext.isDone()) { + return DriverState.SHUTDOWN; + } else { + return DriverState.CONTINUE; + } + } + + private void applyUpdate( + boolean noWorkOutstanding, ExecutorState startingState, WorkUpdate update) { + LOG.debug("Executor Update: {}", update); + if (update.getBundle().isPresent()) { + if (ExecutorState.ACTIVE == startingState + || (ExecutorState.PROCESSING == startingState && noWorkOutstanding)) { + CommittedBundle<?> bundle = update.getBundle().get(); + for (AppliedPTransform<?, ?, ?> consumer : update.getConsumers()) { + outstandingWork.incrementAndGet(); + bundleProcessor.process(bundle, consumer, defaultCompletionCallback); + } + } else { + pendingWork.offer(update); + } + } else if (update.getException().isPresent()) { + pipelineMessageReceiver.failed(update.getException().get()); + exceptionThrown = true; + } + } + + /** Fires any available timers. */ + private void fireTimers() { + try { + for (FiredTimers transformTimers : evaluationContext.extractFiredTimers()) { + Collection<TimerData> delivery = transformTimers.getTimers(); + KeyedWorkItem<?, Object> work = + KeyedWorkItems.timersWorkItem(transformTimers.getKey().getKey(), delivery); + @SuppressWarnings({"unchecked", "rawtypes"}) + CommittedBundle<?> bundle = + evaluationContext + .createKeyedBundle( + transformTimers.getKey(), + (PCollection) + Iterables.getOnlyElement( + transformTimers.getTransform().getInputs().values())) + .add(WindowedValue.valueInGlobalWindow(work)) + .commit(evaluationContext.now()); + outstandingWork.incrementAndGet(); + bundleProcessor.process( + bundle, transformTimers.getTransform(), new TimerIterableCompletionCallback(delivery)); + state.set(ExecutorState.ACTIVE); + } + } catch (Exception e) { + LOG.error("Internal Error while delivering timers", e); + pipelineMessageReceiver.failed(e); + exceptionThrown = true; + } + } + + /** + * If all active {@link DirectTransformExecutor TransformExecutors} are in a blocked state, add + * more work from root nodes that may have additional work. This ensures that if a pipeline has + * elements available from the root nodes it will add those elements when necessary. + */ + private void addWorkIfNecessary() { + // If any timers have fired, they will add more work; We don't need to add more + if (state.get() == ExecutorState.QUIESCENT) { + // All current TransformExecutors are blocked; add more work from the roots. + for (Map.Entry<AppliedPTransform<?, ?, ?>, ConcurrentLinkedQueue<CommittedBundle<?>>> + pendingRootEntry : pendingRootBundles.entrySet()) { + Collection<CommittedBundle<?>> bundles = new ArrayList<>(); + // Pull all available work off of the queue, then schedule it all, so this loop + // terminates + while (!pendingRootEntry.getValue().isEmpty()) { + CommittedBundle<?> bundle = pendingRootEntry.getValue().poll(); + bundles.add(bundle); + } + for (CommittedBundle<?> bundle : bundles) { + outstandingWork.incrementAndGet(); + bundleProcessor.process(bundle, pendingRootEntry.getKey(), defaultCompletionCallback); + state.set(ExecutorState.ACTIVE); + } + } + } + } + + /** + * The state of the executor. The state of the executor determines the behavior of the {@link + * QuiescenceDriver} when it runs. + */ + private enum ExecutorState { + /** + * Output has been produced since the last time the monitor ran. Work exists that has not yet + * been evaluated, and all pending, including potentially blocked work, should be evaluated. + * + * <p>The executor becomes active whenever a timer fires, a {@link PCollectionView} is updated, + * or output is produced by the evaluation of a {@link DirectTransformExecutor}. + */ + ACTIVE, + /** + * The Executor does not have any unevaluated work available to it, but work is in progress. + * Work should not be added until the Executor becomes active or no work is outstanding. + * + * <p>If all outstanding work completes without the executor becoming {@code ACTIVE}, the + * Executor enters state {@code QUIESCING}. Previously evaluated work must be reevaluated, in + * case a side input has made progress. + */ + PROCESSING, + /** + * All outstanding work is work that may be blocked on a side input. When there is no + * outstanding work, the executor becomes {@code QUIESCENT}. + */ + QUIESCING, + /** + * All elements are either buffered in state or are blocked on a side input. There are no timers + * that are permitted to fire but have not. There is no outstanding work. + * + * <p>The pipeline will not make progress without the progression of watermarks, the progression + * of processing time, or the addition of elements. + */ + QUIESCENT + } + + /** + * The base implementation of {@link CompletionCallback} that provides implementations for {@link + * #handleResult(CommittedBundle, TransformResult)} and {@link #handleException(CommittedBundle, + * Exception)}. + */ + private class TimerIterableCompletionCallback implements CompletionCallback { + private final Iterable<TimerData> timers; + + TimerIterableCompletionCallback(Iterable<TimerData> timers) { + this.timers = timers; + } + + @Override + public final CommittedResult handleResult( + CommittedBundle<?> inputBundle, TransformResult<?> result) { + CommittedResult committedResult = evaluationContext.handleResult(inputBundle, timers, result); + for (CommittedBundle<?> outputBundle : committedResult.getOutputs()) { + pendingWork.offer( + WorkUpdate.fromBundle( + outputBundle, graph.getPerElementConsumers(outputBundle.getPCollection()))); + } + Optional<? extends CommittedBundle<?>> unprocessedInputs = + committedResult.getUnprocessedInputs(); + if (unprocessedInputs.isPresent()) { + if (inputBundle.getPCollection() == null) { + // TODO: Split this logic out of an if statement + pendingRootBundles.get(result.getTransform()).offer(unprocessedInputs.get()); + } else { + pendingWork.offer( + WorkUpdate.fromBundle( + unprocessedInputs.get(), + Collections.<AppliedPTransform<?, ?, ?>>singleton( + committedResult.getTransform()))); + } + } + if (!committedResult.getProducedOutputTypes().isEmpty()) { + state.set(ExecutorState.ACTIVE); + } + outstandingWork.decrementAndGet(); + return committedResult; + } + + @Override + public void handleEmpty(AppliedPTransform<?, ?, ?> transform) { + outstandingWork.decrementAndGet(); + } + + @Override + public final void handleException(CommittedBundle<?> inputBundle, Exception e) { + pendingWork.offer(WorkUpdate.fromException(e)); + outstandingWork.decrementAndGet(); + } + + @Override + public void handleError(Error err) { + pipelineMessageReceiver.failed(err); + } + } + + /** + * An internal status update on the state of the executor. + * + * <p>Used to signal when the executor should be shut down (due to an exception). + */ + @AutoValue + abstract static class WorkUpdate { + private static WorkUpdate fromBundle( + CommittedBundle<?> bundle, Collection<AppliedPTransform<?, ?, ?>> consumers) { + return new AutoValue_QuiescenceDriver_WorkUpdate( + Optional.of(bundle), consumers, Optional.<Exception>absent()); + } + + private static WorkUpdate fromException(Exception e) { + return new AutoValue_QuiescenceDriver_WorkUpdate( + Optional.<CommittedBundle<?>>absent(), + Collections.<AppliedPTransform<?, ?, ?>>emptyList(), + Optional.of(e)); + } + + /** Returns the bundle that produced this update. */ + public abstract Optional<? extends CommittedBundle<?>> getBundle(); + + /** + * Returns the transforms to process the bundle. If nonempty, {@link #getBundle()} will return a + * present {@link Optional}. + */ + public abstract Collection<AppliedPTransform<?, ?, ?>> getConsumers(); + + public abstract Optional<? extends Exception> getException(); + } +} diff --git a/runners/local-java/src/main/java/org/apache/beam/runners/local/ExecutionDriver.java b/runners/local-java/src/main/java/org/apache/beam/runners/local/ExecutionDriver.java new file mode 100644 index 00000000000..0b65e4df92f --- /dev/null +++ b/runners/local-java/src/main/java/org/apache/beam/runners/local/ExecutionDriver.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.local; + +/** + * Drives the execution of a {@code Pipeline} by scheduling work. + */ +public interface ExecutionDriver { + DriverState drive(); + + /** + * The state of the driver. If the state is terminal, the driver can no longer make progress. + */ + enum DriverState { + CONTINUE(false), + FAILED(true), + SHUTDOWN(true); + + private final boolean terminal; + + DriverState(boolean terminal) { + this.terminal = terminal; + } + + public boolean isTermainal() { + return terminal; + } + } +} diff --git a/runners/local-java/src/main/java/org/apache/beam/runners/local/PipelineMessageReceiver.java b/runners/local-java/src/main/java/org/apache/beam/runners/local/PipelineMessageReceiver.java new file mode 100644 index 00000000000..c40a4ff26b5 --- /dev/null +++ b/runners/local-java/src/main/java/org/apache/beam/runners/local/PipelineMessageReceiver.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.local; + +/** + * Handles failures in the form of exceptions. + */ +public interface PipelineMessageReceiver { + /** + * Report that a failure has occurred. + */ + void failed(Exception e); + + /** + * Report that a failure has occurred. + */ + void failed(Error e); + + /** + * Report that the pipeline has been cancelled. + */ + void cancelled(); + + /** + * Report that the pipeline has successfully completed. + */ + void completed(); +} diff --git a/sdks/java/build-tools/src/main/resources/beam/findbugs-filter.xml b/sdks/java/build-tools/src/main/resources/beam/findbugs-filter.xml index 04d4baf1ba0..2035f854203 100644 --- a/sdks/java/build-tools/src/main/resources/beam/findbugs-filter.xml +++ b/sdks/java/build-tools/src/main/resources/beam/findbugs-filter.xml @@ -163,10 +163,14 @@ </Match> <Match> - <Class name="org.apache.beam.runners.direct.ExecutorServiceParallelExecutor$MonitorRunnable" /> - <Method name="shouldShutdown" /> + <Class name="org.apache.beam.runners.direct.ExecutorServiceParallelExecutor$QueueMessageReceiver" /> + <Or> + <Method name="failed" /> + <Method name="cancelled" /> + <Method name="completed" /> + </Or> <Bug pattern="RV_RETURN_VALUE_IGNORED_BAD_PRACTICE" /> - <!-- visibleUpdates is a non-capacity-limited LinkedBlockingQueue, which + <!-- updates is a non-capacity-limited LinkedBlockingQueue, which can never refuse an offered update --> </Match> ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Universal Local Runner > ---------------------- > > Key: BEAM-2899 > URL: https://issues.apache.org/jira/browse/BEAM-2899 > Project: Beam > Issue Type: Improvement > Components: runner-core > Reporter: Henning Rohde > Assignee: Thomas Groh > Labels: portability > > To make the portability effort tractable, we should implement a Universal > Local Runner (ULR) in Java that runs in a single server process plus docker > containers for the SDK harness containers. It would serve multiple purposes: > (1) A reference implementation for other runners. Ideally, any new feature > should be implemented in the ULR first. > (2) A fully-featured test runner for SDKs who participate in the > portability framework. It thus complements the direct runners. > (3) A test runner for user code that depends on or customizes the runtime > environment. For example, a DoFn that shells out has a dependency that may be > satisfied on the user's desktop (and thus works fine on the direct runner), > but perhaps not by the container harness image. The ULR allows for an easy > way to find out. > The Java direct runner presumably has lots of pieces that can be reused. -- This message was sent by Atlassian JIRA (v6.4.14#64029)