Remove KeyedResourcePool
This interface is no longer used. Instead, the runner ensures that
bundles will be provided containing the appropriate input to the
TestStreamEvaluatorFactory.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/41fb16f0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/41fb16f0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/41fb16f0
Branch: refs/heads/master
Commit: 41fb16f014a79d2b9c149c5b369db12b61c4c774
Parents: 7306e16
Author: Thomas Groh <tg...@google.com>
Authored: Wed Oct 5 13:12:48 2016 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Thu Oct 6 15:14:38 2016 -0700
----------------------------------------------------------------------
.../direct/BoundedReadEvaluatorFactory.java | 40 +++--
.../beam/runners/direct/DirectRunner.java | 2 +
.../beam/runners/direct/EmptyInputProvider.java | 49 ++++++
.../direct/ExecutorServiceParallelExecutor.java | 27 ++-
.../runners/direct/FlattenEvaluatorFactory.java | 18 +-
.../beam/runners/direct/KeyedResourcePool.java | 47 ------
.../runners/direct/LockedKeyedResourcePool.java | 95 -----------
.../beam/runners/direct/RootInputProvider.java | 41 +++++
.../runners/direct/RootProviderRegistry.java | 65 ++++++++
.../direct/RootTransformEvaluatorFactory.java | 42 -----
.../direct/TestStreamEvaluatorFactory.java | 39 +++--
.../direct/TransformEvaluatorRegistry.java | 17 +-
.../direct/UnboundedReadEvaluatorFactory.java | 56 ++++---
.../direct/BoundedReadEvaluatorFactoryTest.java | 3 +-
.../direct/FlattenEvaluatorFactoryTest.java | 3 +-
.../direct/LockedKeyedResourcePoolTest.java | 163 -------------------
.../direct/TestStreamEvaluatorFactoryTest.java | 3 +-
.../UnboundedReadEvaluatorFactoryTest.java | 8 +-
18 files changed, 269 insertions(+), 449 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41fb16f0/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java
----------------------------------------------------------------------
diff --git
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java
index 4936ad9..326a535 100644
---
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java
+++
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java
@@ -39,28 +39,13 @@ import org.apache.beam.sdk.values.PCollection;
* A {@link TransformEvaluatorFactory} that produces {@link TransformEvaluator
TransformEvaluators}
* for the {@link Bounded Read.Bounded} primitive {@link PTransform}.
*/
-final class BoundedReadEvaluatorFactory implements
RootTransformEvaluatorFactory {
+final class BoundedReadEvaluatorFactory implements TransformEvaluatorFactory {
private final EvaluationContext evaluationContext;
BoundedReadEvaluatorFactory(EvaluationContext evaluationContext) {
this.evaluationContext = evaluationContext;
}
- @Override
- public Collection<CommittedBundle<?>> getInitialInputs(AppliedPTransform<?, ?,
?> transform) {
- return createInitialSplits((AppliedPTransform) transform);
- }
-
- private <OutputT> Collection<CommittedBundle<?>> createInitialSplits(
- AppliedPTransform<?, ?, Read.Bounded<OutputT>> transform) {
- BoundedSource<OutputT> source = transform.getTransform().getSource();
- return Collections.<CommittedBundle<?>>singleton(
- evaluationContext
- .<BoundedSourceShard<OutputT>>createRootBundle()
-
.add(WindowedValue.valueInGlobalWindow(BoundedSourceShard.of(source)))
- .commit(BoundedWindow.TIMESTAMP_MAX_VALUE));
- }
-
@SuppressWarnings({"unchecked", "rawtypes"})
@Override
@Nullable
@@ -132,4 +117,27 @@ final class BoundedReadEvaluatorFactory implements
RootTransformEvaluatorFactory
abstract BoundedSource<T> getSource();
}
+
+ static class InputProvider implements RootInputProvider {
+ private final EvaluationContext evaluationContext;
+
+ InputProvider(EvaluationContext evaluationContext) {
+ this.evaluationContext = evaluationContext;
+ }
+
+ @Override
+ public Collection<CommittedBundle<?>> getInitialInputs(AppliedPTransform<?,
?, ?> transform) {
+ return createInitialSplits((AppliedPTransform) transform);
+ }
+
+ private <OutputT> Collection<CommittedBundle<?>> createInitialSplits(
+ AppliedPTransform<?, ?, Read.Bounded<OutputT>> transform) {
+ BoundedSource<OutputT> source = transform.getTransform().getSource();
+ return Collections.<CommittedBundle<?>>singleton(
+ evaluationContext
+ .<BoundedSourceShard<OutputT>>createRootBundle()
+
.add(WindowedValue.valueInGlobalWindow(BoundedSourceShard.of(source)))
+ .commit(BoundedWindow.TIMESTAMP_MAX_VALUE));
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41fb16f0/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
----------------------------------------------------------------------
diff --git
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
index 2ec4f08..67ec3e6 100644
---
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
+++
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
@@ -248,12 +248,14 @@ public class DirectRunner
// independent executor service for each run
ExecutorService executorService = executorServiceSupplier.get();
+ RootInputProvider rootInputProvider =
RootProviderRegistry.defaultRegistry(context);
TransformEvaluatorRegistry registry =
TransformEvaluatorRegistry.defaultRegistry(context);
PipelineExecutor executor =
ExecutorServiceParallelExecutor.create(
executorService,
consumerTrackingVisitor.getValueToConsumers(),
keyedPValueVisitor.getKeyedPValues(),
+ rootInputProvider,
registry,
defaultModelEnforcements(options),
context);
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41fb16f0/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EmptyInputProvider.java
----------------------------------------------------------------------
diff --git
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EmptyInputProvider.java
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EmptyInputProvider.java
new file mode 100644
index 0000000..10d63e9
--- /dev/null
+++
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EmptyInputProvider.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.direct;
+
+import java.util.Collection;
+import java.util.Collections;
+import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
+import org.apache.beam.sdk.transforms.AppliedPTransform;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+
+/**
+ * A {@link RootInputProvider} that provides a singleton empty bundle.
+ */
+class EmptyInputProvider implements RootInputProvider {
+ private final EvaluationContext evaluationContext;
+
+ EmptyInputProvider(EvaluationContext evaluationContext) {
+ this.evaluationContext = evaluationContext;
+ }
+
+ /**
+ * {@inheritDoc}.
+ *
+ * <p>Returns a single empty bundle. This bundle ensures that any {@link
PTransform PTransforms}
+ * that consume from the output of the provided {@link AppliedPTransform}
have watermarks updated
+ * as appropriate.
+ */
+ @Override
+ public Collection<CommittedBundle<?>> getInitialInputs(AppliedPTransform<?, ?,
?> transform) {
+ return Collections.<CommittedBundle<?>>singleton(
+
evaluationContext.createRootBundle().commit(BoundedWindow.TIMESTAMP_MAX_VALUE));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41fb16f0/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
----------------------------------------------------------------------
diff --git
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
index bb89699..52c45c3 100644
---
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
+++
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
@@ -17,6 +17,8 @@
*/
package org.apache.beam.runners.direct;
+import static com.google.common.base.Preconditions.checkState;
+
import com.google.auto.value.AutoValue;
import com.google.common.base.MoreObjects;
import com.google.common.base.Optional;
@@ -67,6 +69,7 @@ final class ExecutorServiceParallelExecutor implements
PipelineExecutor {
private final Map<PValue, Collection<AppliedPTransform<?, ?, ?>>>
valueToConsumers;
private final Set<PValue> keyedPValues;
+ private final RootInputProvider rootInputProvider;
private final TransformEvaluatorRegistry registry;
@SuppressWarnings("rawtypes")
private final Map<Class<? extends PTransform>,
Collection<ModelEnforcementFactory>>
@@ -101,18 +104,27 @@ final class ExecutorServiceParallelExecutor implements
PipelineExecutor {
ExecutorService executorService,
Map<PValue, Collection<AppliedPTransform<?, ?, ?>>> valueToConsumers,
Set<PValue> keyedPValues,
+ RootInputProvider rootInputProvider,
TransformEvaluatorRegistry registry,
@SuppressWarnings("rawtypes")
- Map<Class<? extends PTransform>, Collection<ModelEnforcementFactory>>
transformEnforcements,
+ Map<Class<? extends PTransform>, Collection<ModelEnforcementFactory>>
+ transformEnforcements,
EvaluationContext context) {
return new ExecutorServiceParallelExecutor(
- executorService, valueToConsumers, keyedPValues, registry,
transformEnforcements, context);
+ executorService,
+ valueToConsumers,
+ keyedPValues,
+ rootInputProvider,
+ registry,
+ transformEnforcements,
+ context);
}
private ExecutorServiceParallelExecutor(
ExecutorService executorService,
Map<PValue, Collection<AppliedPTransform<?, ?, ?>>> valueToConsumers,
Set<PValue> keyedPValues,
+ RootInputProvider rootInputProvider,
TransformEvaluatorRegistry registry,
@SuppressWarnings("rawtypes")
Map<Class<? extends PTransform>, Collection<ModelEnforcementFactory>>
transformEnforcements,
@@ -120,6 +132,7 @@ final class ExecutorServiceParallelExecutor implements
PipelineExecutor {
this.executorService = executorService;
this.valueToConsumers = valueToConsumers;
this.keyedPValues = keyedPValues;
+ this.rootInputProvider = rootInputProvider;
this.registry = registry;
this.transformEnforcements = transformEnforcements;
this.evaluationContext = context;
@@ -153,7 +166,12 @@ final class ExecutorServiceParallelExecutor implements
PipelineExecutor {
public void start(Collection<AppliedPTransform<?, ?, ?>> roots) {
for (AppliedPTransform<?, ?, ?> root : roots) {
ConcurrentLinkedQueue<CommittedBundle<?>> pending = new
ConcurrentLinkedQueue<>();
- pending.addAll(registry.getInitialInputs(root));
+ Collection<CommittedBundle<?>> initialInputs =
rootInputProvider.getInitialInputs(root);
+ checkState(
+ !initialInputs.isEmpty(),
+ "All root transforms must have initial inputs. Got 0 for %s",
+ root.getFullName());
+ pending.addAll(initialInputs);
pendingRootBundles.put(root, pending);
}
evaluationContext.initialize(pendingRootBundles);
@@ -385,7 +403,8 @@ final class ExecutorServiceParallelExecutor implements
PipelineExecutor {
LOG.debug("Executor Update: {}", update);
if (update.getBundle().isPresent()) {
if (ExecutorState.ACTIVE == startingState
- || (ExecutorState.PROCESSING == startingState &&
noWorkOutstanding)) {
+ || (ExecutorState.PROCESSING == startingState
+ && noWorkOutstanding)) {
scheduleConsumers(update);
} else {
allUpdates.offer(update);
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41fb16f0/runners/direct-java/src/main/java/org/apache/beam/runners/direct/FlattenEvaluatorFactory.java
----------------------------------------------------------------------
diff --git
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/FlattenEvaluatorFactory.java
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/FlattenEvaluatorFactory.java
index 90db040..57d5628 100644
---
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/FlattenEvaluatorFactory.java
+++
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/FlattenEvaluatorFactory.java
@@ -17,15 +17,12 @@
*/
package org.apache.beam.runners.direct;
-import java.util.Collection;
-import java.util.Collections;
import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
import org.apache.beam.sdk.transforms.AppliedPTransform;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.Flatten.FlattenPCollectionList;
import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;
@@ -34,26 +31,13 @@ import org.apache.beam.sdk.values.PCollectionList;
* The {@link DirectRunner} {@link TransformEvaluatorFactory} for the {@link
Flatten}
* {@link PTransform}.
*/
-class FlattenEvaluatorFactory implements RootTransformEvaluatorFactory {
+class FlattenEvaluatorFactory implements TransformEvaluatorFactory {
priva