Repository: incubator-beam Updated Branches: refs/heads/master 57d9bbd79 -> 7984fe3fc
Add Parameters to finishSpecifying Remove the need to use getProducingTransformInternal in TypedPValue. Ensure that all nodes are finished specifying before a call to TransformHierarchy#visit. This ensures that all nodes are fully specified without requiring the Pipeline or Runner to do so explicitly. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/038950df Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/038950df Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/038950df Branch: refs/heads/master Commit: 038950df02fa553cbb91f57978efe125a9ebc80f Parents: b053be4 Author: Thomas Groh <tg...@google.com> Authored: Thu Dec 8 14:33:36 2016 -0800 Committer: Thomas Groh <tg...@google.com> Committed: Wed Dec 21 15:26:17 2016 -0800 ---------------------------------------------------------------------- .../translation/ParDoBoundTranslatorTest.java | 32 ++++---- .../beam/runners/direct/DirectGraphVisitor.java | 21 ------ .../beam/runners/direct/DirectRunner.java | 1 - .../runners/direct/DirectGraphVisitorTest.java | 32 +------- .../direct/FlattenEvaluatorFactoryTest.java | 2 + .../direct/KeyedPValueTrackingVisitorTest.java | 17 ++++- .../beam/runners/spark/ForceStreamingTest.java | 2 - .../main/java/org/apache/beam/sdk/Pipeline.java | 3 + .../beam/sdk/runners/TransformHierarchy.java | 45 ++++++----- .../transforms/join/KeyedPCollectionTuple.java | 32 ++++---- .../java/org/apache/beam/sdk/values/PBegin.java | 5 -- .../apache/beam/sdk/values/PCollectionList.java | 13 +--- .../beam/sdk/values/PCollectionTuple.java | 13 +--- .../java/org/apache/beam/sdk/values/PInput.java | 9 --- .../org/apache/beam/sdk/values/POutput.java | 20 ++--- .../beam/sdk/values/POutputValueBase.java | 4 +- .../java/org/apache/beam/sdk/values/PValue.java | 15 ++++ .../org/apache/beam/sdk/values/PValueBase.java | 3 +- .../org/apache/beam/sdk/values/TypedPValue.java | 78 +++++++++++--------- .../sdk/runners/TransformHierarchyTest.java | 34 +++++---- .../apache/beam/sdk/transforms/ParDoTest.java | 7 +- .../apache/beam/sdk/values/TypedPValueTest.java | 7 +- 22 files changed, 185 insertions(+), 210 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/038950df/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ParDoBoundTranslatorTest.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ParDoBoundTranslatorTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ParDoBoundTranslatorTest.java index fa94b2a..f88a94d 100644 --- a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ParDoBoundTranslatorTest.java +++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ParDoBoundTranslatorTest.java @@ -45,6 +45,7 @@ import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.SerializableCoder; import org.apache.beam.sdk.coders.VarIntCoder; +import org.apache.beam.sdk.coders.VoidCoder; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.transforms.Create; @@ -286,21 +287,22 @@ public class ParDoBoundTranslatorTest { Arrays.asList(sideInput1, sideInput2), Arrays.<TupleTag<String>>asList()))); - outputs.get(mainOutputTag).apply(ParDo.of(new EmbeddedCollector())); - ApexRunnerResult result = (ApexRunnerResult) pipeline.run(); - - HashSet<String> expected = Sets.newHashSet("processing: 3: [11, 222]", - "processing: -42: [11, 222]", "processing: 666: [11, 222]"); - long timeout = System.currentTimeMillis() + TIMEOUT_MILLIS; - while (System.currentTimeMillis() < timeout) { - if (EmbeddedCollector.RESULTS.containsAll(expected)) { - break; - } - LOG.info("Waiting for expected results."); - Thread.sleep(SLEEP_MILLIS); - } - result.cancel(); - Assert.assertEquals(Sets.newHashSet(expected), EmbeddedCollector.RESULTS); + outputs.get(mainOutputTag).apply(ParDo.of(new EmbeddedCollector())); + outputs.get(sideOutputTag).setCoder(VoidCoder.of()); + ApexRunnerResult result = (ApexRunnerResult) pipeline.run(); + + HashSet<String> expected = Sets.newHashSet("processing: 3: [11, 222]", + "processing: -42: [11, 222]", "processing: 666: [11, 222]"); + long timeout = System.currentTimeMillis() + TIMEOUT_MILLIS; + while (System.currentTimeMillis() < timeout) { + if (EmbeddedCollector.RESULTS.containsAll(expected)) { + break; + } + LOG.info("Waiting for expected results."); + Thread.sleep(SLEEP_MILLIS); + } + result.cancel(); + Assert.assertEquals(Sets.newHashSet(expected), EmbeddedCollector.RESULTS); } private static class TestMultiOutputWithSideInputsFn extends DoFn<Integer, String> { http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/038950df/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGraphVisitor.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGraphVisitor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGraphVisitor.java index 425bbf1..7e6845d 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGraphVisitor.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGraphVisitor.java @@ -27,7 +27,6 @@ import java.util.Map; import java.util.Set; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.Pipeline.PipelineVisitor; -import org.apache.beam.sdk.runners.PipelineRunner; import org.apache.beam.sdk.runners.TransformHierarchy; import org.apache.beam.sdk.transforms.AppliedPTransform; import org.apache.beam.sdk.transforms.PTransform; @@ -51,7 +50,6 @@ class DirectGraphVisitor extends PipelineVisitor.Defaults { private Set<PCollectionView<?>> views = new HashSet<>(); private Set<AppliedPTransform<?, ?, ?>> rootTransforms = new HashSet<>(); private Map<AppliedPTransform<?, ?, ?>, String> stepNames = new HashMap<>(); - private Set<PValue> toFinalize = new HashSet<>(); private int numTransforms = 0; private boolean finalized = false; @@ -80,9 +78,6 @@ class DirectGraphVisitor extends PipelineVisitor.Defaults { @Override public void visitPrimitiveTransform(TransformHierarchy.Node node) { - for (TaggedPValue consumed : node.getInputs()) { - toFinalize.remove(consumed.getValue()); - } AppliedPTransform<?, ?, ?> appliedTransform = getAppliedTransform(node); stepNames.put(appliedTransform, genStepName()); if (node.getInputs().isEmpty()) { @@ -96,8 +91,6 @@ class DirectGraphVisitor extends PipelineVisitor.Defaults { @Override public void visitValue(PValue value, TransformHierarchy.Node producer) { - toFinalize.add(value); - AppliedPTransform<?, ?, ?> appliedTransform = getAppliedTransform(producer); if (value instanceof PCollectionView) { views.add((PCollectionView<?>) value); @@ -118,20 +111,6 @@ class DirectGraphVisitor extends PipelineVisitor.Defaults { } /** - * Returns all of the {@link PValue PValues} that have been produced but not consumed. These - * {@link PValue PValues} should be finalized by the {@link PipelineRunner} before the - * {@link Pipeline} is executed. - */ - public void finishSpecifyingRemainder() { - checkState( - finalized, - "Can't call finishSpecifyingRemainder before the Pipeline has been completely traversed"); - for (PValue unfinalized : toFinalize) { - unfinalized.finishSpecifying(); - } - } - - /** * Get the graph constructed by this {@link DirectGraphVisitor}, which provides * lookups for producers and consumers of {@link PValue PValues}. */ http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/038950df/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java index 7e6ea15..5793b00 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java @@ -301,7 +301,6 @@ public class DirectRunner extends PipelineRunner<DirectPipelineResult> { MetricsEnvironment.setMetricsSupported(true); DirectGraphVisitor graphVisitor = new DirectGraphVisitor(); pipeline.traverseTopologically(graphVisitor); - graphVisitor.finishSpecifyingRemainder(); @SuppressWarnings("rawtypes") KeyedPValueTrackingVisitor keyedPValueVisitor = KeyedPValueTrackingVisitor.create(); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/038950df/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectGraphVisitorTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectGraphVisitorTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectGraphVisitorTest.java index c3bbe2d..01d11a3 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectGraphVisitorTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectGraphVisitorTest.java @@ -20,7 +20,6 @@ package org.apache.beam.runners.direct; import static org.hamcrest.Matchers.emptyIterable; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasSize; -import static org.hamcrest.Matchers.is; import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; @@ -28,6 +27,7 @@ import com.google.common.collect.Iterables; import java.io.Serializable; import java.util.ArrayList; import java.util.List; +import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.io.CountingInput; import org.apache.beam.sdk.io.CountingSource; import org.apache.beam.sdk.io.Read; @@ -111,6 +111,7 @@ public class DirectGraphVisitorTest implements Serializable { FlattenPCollectionList<String> flatten = Flatten.pCollections(); PCollectionList<String> emptyList = PCollectionList.empty(p); PCollection<String> empty = emptyList.apply(flatten); + empty.setCoder(StringUtf8Coder.of()); p.traverseTopologically(visitor); DirectGraph graph = visitor.getGraph(); assertThat( @@ -177,27 +178,6 @@ public class DirectGraphVisitorTest implements Serializable { } @Test - public void getUnfinalizedPValuesContainsDanglingOutputs() { - PCollection<String> created = p.apply(Create.of("1", "2", "3")); - PCollection<String> transformed = - created.apply( - ParDo.of( - new DoFn<String, String>() { - @ProcessElement - public void processElement(DoFn<String, String>.ProcessContext c) - throws Exception { - c.output(Integer.toString(c.element().length())); - } - })); - - assertThat(transformed.isFinishedSpecifyingInternal(), is(false)); - - p.traverseTopologically(visitor); - visitor.finishSpecifyingRemainder(); - assertThat(transformed.isFinishedSpecifyingInternal(), is(true)); - } - - @Test public void getStepNamesContainsAllTransforms() { PCollection<String> created = p.apply(Create.of("1", "2", "3")); PCollection<String> transformed = @@ -254,12 +234,4 @@ public class DirectGraphVisitorTest implements Serializable { thrown.expectMessage("get a graph"); visitor.getGraph(); } - - @Test - public void finishSpecifyingRemainderWithoutVisitingThrows() { - thrown.expect(IllegalStateException.class); - thrown.expectMessage("completely traversed"); - thrown.expectMessage("finishSpecifyingRemainder"); - visitor.finishSpecifyingRemainder(); - } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/038950df/runners/direct-java/src/test/java/org/apache/beam/runners/direct/FlattenEvaluatorFactoryTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/FlattenEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/FlattenEvaluatorFactoryTest.java index cda68f0..e07c9f9 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/FlattenEvaluatorFactoryTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/FlattenEvaluatorFactoryTest.java @@ -26,6 +26,7 @@ import static org.mockito.Mockito.when; import com.google.common.collect.Iterables; import org.apache.beam.runners.direct.DirectRunner.CommittedBundle; import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle; +import org.apache.beam.sdk.coders.VarIntCoder; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.AppliedPTransform; import org.apache.beam.sdk.transforms.Create; @@ -124,6 +125,7 @@ public class FlattenEvaluatorFactoryTest { PCollectionList<Integer> list = PCollectionList.empty(p); PCollection<Integer> flattened = list.apply(Flatten.<Integer>pCollections()); + flattened.setCoder(VarIntCoder.of()); EvaluationContext evaluationContext = mock(EvaluationContext.class); when(evaluationContext.createBundle(flattened)) http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/038950df/runners/direct-java/src/test/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitorTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitorTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitorTest.java index a1fb81b..8fac534 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitorTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitorTest.java @@ -23,6 +23,7 @@ import static org.junit.Assert.assertThat; import java.util.Collections; import org.apache.beam.runners.core.KeyedWorkItem; +import org.apache.beam.runners.core.KeyedWorkItemCoder; import org.apache.beam.sdk.coders.IterableCoder; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; @@ -34,6 +35,7 @@ import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.GroupByKey; import org.apache.beam.sdk.transforms.Keys; import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.transforms.windowing.IntervalWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.util.WindowedValue; @@ -111,7 +113,13 @@ public class KeyedPValueTrackingVisitorTest { KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of())))))); PCollection<KeyedWorkItem<String, KV<String, Integer>>> unkeyed = - input.apply(ParDo.of(new ParDoMultiOverrideFactory.ToKeyedWorkItem<String, Integer>())); + input + .apply(ParDo.of(new ParDoMultiOverrideFactory.ToKeyedWorkItem<String, Integer>())) + .setCoder( + KeyedWorkItemCoder.of( + StringUtf8Coder.of(), + KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()), + GlobalWindow.Coder.INSTANCE)); p.traverseTopologically(visitor); assertThat(visitor.getKeyedPValues(), not(hasItem(unkeyed))); @@ -139,7 +147,12 @@ public class KeyedPValueTrackingVisitorTest { PCollection<KeyedWorkItem<String, KV<String, Integer>>> keyed = input .apply(GroupByKey.<String, WindowedValue<KV<String, Integer>>>create()) - .apply(ParDo.of(new ParDoMultiOverrideFactory.ToKeyedWorkItem<String, Integer>())); + .apply(ParDo.of(new ParDoMultiOverrideFactory.ToKeyedWorkItem<String, Integer>())) + .setCoder( + KeyedWorkItemCoder.of( + StringUtf8Coder.of(), + KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()), + GlobalWindow.Coder.INSTANCE)); p.traverseTopologically(visitor); assertThat(visitor.getKeyedPValues(), hasItem(keyed)); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/038950df/runners/spark/src/test/java/org/apache/beam/runners/spark/ForceStreamingTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/ForceStreamingTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/ForceStreamingTest.java index 1b2ff08..b7b59d1 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/ForceStreamingTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/ForceStreamingTest.java @@ -21,12 +21,10 @@ package org.apache.beam.runners.spark; import static org.hamcrest.MatcherAssert.assertThat; import java.io.IOException; -import java.util.List; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.io.BoundedReadFromUnboundedSource; import org.apache.beam.sdk.io.CountingSource; import org.apache.beam.sdk.io.Read; -import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.runners.TransformHierarchy; import org.apache.beam.sdk.transforms.PTransform; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/038950df/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java index 7a16f9d..eb0b199 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java @@ -171,6 +171,8 @@ public class Pipeline { * Runs the {@link Pipeline} using its {@link PipelineRunner}. */ public PipelineResult run() { + // Ensure all of the nodes are fully specified before a PipelineRunner gets access to the + // pipeline. LOG.debug("Running {} via {}", this, runner); try { return runner.run(this); @@ -281,6 +283,7 @@ public class Pipeline { * <p>Typically invoked by {@link PipelineRunner} subclasses. */ public void traverseTopologically(PipelineVisitor visitor) { + // Ensure all nodes are fully specified before visiting the pipeline Set<PValue> visitedValues = // Visit all the transforms, which should implicitly visit all the values. transforms.visit(visitor); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/038950df/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java index 29e7fcb..3676e1a 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java @@ -28,6 +28,7 @@ import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.Set; import javax.annotation.Nullable; import org.apache.beam.sdk.Pipeline.PipelineVisitor; @@ -46,6 +47,8 @@ import org.apache.beam.sdk.values.TaggedPValue; public class TransformHierarchy { private final Node root; private final Map<POutput, Node> producers; + // A map of PValue to the PInput the producing PTransform is applied to + private final Map<PValue, PInput> producerInput; // Maintain a stack based on the enclosing nodes private Node current; @@ -53,6 +56,7 @@ public class TransformHierarchy { root = new Node(null, null, "", null); current = root; producers = new HashMap<>(); + producerInput = new HashMap<>(); } /** @@ -86,7 +90,13 @@ public class TransformHierarchy { public void finishSpecifyingInput() { // Inputs must be completely specified before they are consumed by a transform. for (TaggedPValue inputValue : current.getInputs()) { - inputValue.getValue().finishSpecifying(); + Node producerNode = getProducer(inputValue.getValue()); + PInput input = producerInput.remove(inputValue.getValue()); + inputValue.getValue().finishSpecifying(input, producerNode.getTransform()); + checkState( + producers.get(inputValue.getValue()) != null, + "Producer unknown for input %s", + inputValue); checkState( producers.get(inputValue.getValue()) != null, "Producer unknown for input %s", @@ -105,12 +115,14 @@ public class TransformHierarchy { * nodes. */ public void setOutput(POutput output) { - output.finishSpecifyingOutput(); for (TaggedPValue value : output.expand()) { if (!producers.containsKey(value.getValue())) { producers.put(value.getValue(), current); } + value.getValue().finishSpecifyingOutput(current.input, current.transform); + producerInput.put(value.getValue(), current.input); } + output.finishSpecifyingOutput(current.input, current.transform); current.setOutput(output); // TODO: Replace with a "generateDefaultNames" method. output.recordAsOutput(current.toAppliedPTransform()); @@ -130,27 +142,26 @@ public class TransformHierarchy { return producers.get(produced); } - /** - * Returns all producing transforms for the {@link PValue PValues} contained - * in {@code output}. - */ - List<Node> getProducingTransforms(POutput output) { - List<Node> producingTransforms = new ArrayList<>(); - for (TaggedPValue value : output.expand()) { - Node producer = getProducer(value.getValue()); - if (producer != null) { - producingTransforms.add(producer); - } - } - return producingTransforms; - } - public Set<PValue> visit(PipelineVisitor visitor) { + finishSpecifying(); Set<PValue> visitedValues = new HashSet<>(); root.visit(visitor, visitedValues); return visitedValues; } + /** + * Finish specifying any remaining nodes within the {@link TransformHierarchy}. These are {@link + * PValue PValues} that are produced as output of some {@link PTransform} but are never consumed + * as input. These values must still be finished specifying. + */ + private void finishSpecifying() { + for (Entry<PValue, PInput> producerInputEntry : producerInput.entrySet()) { + PValue value = producerInputEntry.getKey(); + value.finishSpecifying(producerInputEntry.getValue(), getProducer(value).getTransform()); + } + producerInput.clear(); + } + public Node getCurrent() { return current; } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/038950df/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/KeyedPCollectionTuple.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/KeyedPCollectionTuple.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/KeyedPCollectionTuple.java index 13d4ee1..b373909 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/KeyedPCollectionTuple.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/KeyedPCollectionTuple.java @@ -152,13 +152,21 @@ public class KeyedPCollectionTuple<K> implements PInput { return pipeline; } - @Override - public void finishSpecifying() { - for (TaggedKeyedPCollection<K, ?> taggedPCollection : keyedCollections) { - taggedPCollection.pCollection.finishSpecifying(); + private static <K, V> Coder<K> getKeyCoder(PCollection<KV<K, V>> pc) { + // TODO: This should already have run coder inference for output, but may not have been consumed + // as input yet (and won't be fully specified); This is fine + + // Assumes that the PCollection uses a KvCoder. + Coder<?> entryCoder = pc.getCoder(); + if (!(entryCoder instanceof KvCoder<?, ?>)) { + throw new IllegalArgumentException("PCollection does not use a KvCoder"); } + @SuppressWarnings("unchecked") + KvCoder<K, V> coder = (KvCoder<K, V>) entryCoder; + return coder.getKeyCoder(); } + ///////////////////////////////////////////////////////////////////////////// /** @@ -197,7 +205,7 @@ public class KeyedPCollectionTuple<K> implements PInput { */ private final List<TaggedKeyedPCollection<K, ?>> keyedCollections; - private final Coder<K> keyCoder; + private Coder<K> keyCoder; private final CoGbkResultSchema schema; @@ -221,20 +229,6 @@ public class KeyedPCollectionTuple<K> implements PInput { this.keyCoder = keyCoder; } - private static <K, V> Coder<K> getKeyCoder(PCollection<KV<K, V>> pc) { - // Need to run coder inference on this PCollection before inspecting it. - pc.finishSpecifying(); - - // Assumes that the PCollection uses a KvCoder. - Coder<?> entryCoder = pc.getCoder(); - if (!(entryCoder instanceof KvCoder<?, ?>)) { - throw new IllegalArgumentException("PCollection does not use a KvCoder"); - } - @SuppressWarnings("unchecked") - KvCoder<K, V> coder = (KvCoder<K, V>) entryCoder; - return coder.getKeyCoder(); - } - private static <K> List<TaggedKeyedPCollection<K, ?>> copyAddLast( List<TaggedKeyedPCollection<K, ?>> keyedCollections, TaggedKeyedPCollection<K, ?> taggedCollection) { http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/038950df/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PBegin.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PBegin.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PBegin.java index 9aa4615..2ba0f1c 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PBegin.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PBegin.java @@ -69,11 +69,6 @@ public class PBegin implements PInput { return Collections.emptyList(); } - @Override - public void finishSpecifying() { - // Nothing more to be done. - } - ///////////////////////////////////////////////////////////////////////////// /** http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/038950df/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionList.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionList.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionList.java index e4bb7c5..dcb64a8 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionList.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionList.java @@ -234,17 +234,8 @@ public class PCollectionList<T> implements PInput, POutput { } @Override - public void finishSpecifying() { - for (TaggedPValue pc : pcollections) { - pc.getValue().finishSpecifying(); - } - } - - @Override - public void finishSpecifyingOutput() { - for (TaggedPValue pc : pcollections) { - pc.getValue().finishSpecifyingOutput(); - } + public void finishSpecifyingOutput(PInput input, PTransform<?, ?> transform) { + // All component PCollections will have already been finished. } @Override http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/038950df/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionTuple.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionTuple.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionTuple.java index 6afe59e..d61db51 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionTuple.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionTuple.java @@ -255,17 +255,8 @@ public class PCollectionTuple implements PInput, POutput { } @Override - public void finishSpecifying() { - for (PCollection<?> pc : pcollectionMap.values()) { - pc.finishSpecifying(); - } - } - - @Override - public void finishSpecifyingOutput() { - for (PCollection<?> pc : pcollectionMap.values()) { - pc.finishSpecifyingOutput(); - } + public void finishSpecifyingOutput(PInput input, PTransform<?, ?> transform) { + // All component PCollections will already have been finished } @Override http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/038950df/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PInput.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PInput.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PInput.java index a27b939..30d4297 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PInput.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PInput.java @@ -44,13 +44,4 @@ public interface PInput { * <p>Not intended to be invoked directly by user code. */ List<TaggedPValue> expand(); - - /** - * After building, finalizes this {@code PInput} to make it ready for - * being used as an input to a {@link org.apache.beam.sdk.transforms.PTransform}. - * - * <p>Automatically invoked whenever {@code apply()} is invoked on - * this {@code PInput}, so users do not normally call this explicitly. - */ - void finishSpecifying(); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/038950df/sdks/java/core/src/main/java/org/apache/beam/sdk/values/POutput.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/POutput.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/POutput.java index e5d4504..062f565 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/POutput.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/POutput.java @@ -61,16 +61,18 @@ public interface POutput { void recordAsOutput(AppliedPTransform<?, ?, ?> transform); /** - * As part of applying the producing {@link PTransform}, finalizes this - * output to make it ready for being used as an input and for running. + * As part of applying the producing {@link PTransform}, finalizes this output to make it ready + * for being used as an input and for running. * - * <p>This includes ensuring that all {@link PCollection PCollections} - * have {@link org.apache.beam.sdk.coders.Coder Coders} specified or defaulted. + * <p>This includes ensuring that all {@link PCollection PCollections} have {@link + * org.apache.beam.sdk.coders.Coder Coders} specified or defaulted. * - * <p>Automatically invoked whenever this {@link POutput} is used - * as a {@link PInput} to another {@link PTransform}, or if never - * used as a {@link PInput}, when {@link Pipeline#run} - * is called, so users do not normally call this explicitly. + * <p>Automatically invoked whenever this {@link POutput} is output, after {@link + * PValue#finishSpecifyingOutput(PInput, PTransform)} has been called on each component {@link + * PValue} returned by {@link #expand()}. + * + * @deprecated see BEAM-1199 */ - void finishSpecifyingOutput(); + @Deprecated + void finishSpecifyingOutput(PInput input, PTransform<?, ?> transform); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/038950df/sdks/java/core/src/main/java/org/apache/beam/sdk/values/POutputValueBase.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/POutputValueBase.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/POutputValueBase.java index 4772c47..cdef58c 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/POutputValueBase.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/POutputValueBase.java @@ -89,12 +89,12 @@ public abstract class POutputValueBase implements POutput { } /** - * Default behavior for {@link #finishSpecifyingOutput()} is + * Default behavior for {@link #finishSpecifyingOutput(PInput, PTransform)}} is * to do nothing. Override if your {@link PValue} requires * finalization. */ @Override - public void finishSpecifyingOutput() { } + public void finishSpecifyingOutput(PInput input, PTransform<?, ?> transform) { } /** * The {@link PTransform} that produces this {@link POutputValueBase}. http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/038950df/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PValue.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PValue.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PValue.java index e6dbaf7..052a1f3 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PValue.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PValue.java @@ -36,6 +36,7 @@ public interface PValue extends POutput, PInput { * * <p>For internal use only. */ + @Deprecated AppliedPTransform<?, ?, ?> getProducingTransformInternal(); /** @@ -46,4 +47,18 @@ public interface PValue extends POutput, PInput { */ @Deprecated List<TaggedPValue> expand(); + + /** + * After building, finalizes this {@code PValue} to make it ready for being used as an input to a + * {@link org.apache.beam.sdk.transforms.PTransform}. + * + * <p>Automatically invoked whenever {@code apply()} is invoked on this {@code PValue}, after + * {@link PValue#finishSpecifying(PInput, PTransform)} has been called on each component {@link + * PValue}, so users do not normally call this explicitly. + * + * @param upstreamInput the {@link PInput} the {@link PTransform} was applied to to produce this + * output + * @param upstreamTransform the {@link PTransform} that produced this {@link PValue} + */ + void finishSpecifying(PInput upstreamInput, PTransform<?, ?> upstreamTransform); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/038950df/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PValueBase.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PValueBase.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PValueBase.java index 3a10d5d..7b44737 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PValueBase.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PValueBase.java @@ -138,8 +138,7 @@ public abstract class PValueBase extends POutputValueBase implements PValue { } @Override - public void finishSpecifying() { - finishSpecifyingOutput(); + public void finishSpecifying(PInput input, PTransform<?, ?> transform) { finishedSpecifying = true; } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/038950df/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TypedPValue.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TypedPValue.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TypedPValue.java index 7afd0a1..de1b99c 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TypedPValue.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TypedPValue.java @@ -17,12 +17,15 @@ */ package org.apache.beam.sdk.values; +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkState; + +import javax.annotation.Nullable; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.CannotProvideCoderException; import org.apache.beam.sdk.coders.CannotProvideCoderException.ReasonCode; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.CoderRegistry; -import org.apache.beam.sdk.transforms.AppliedPTransform; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; @@ -45,10 +48,8 @@ public abstract class TypedPValue<T> extends PValueBase implements PValue { * couldn't be inferred. */ public Coder<T> getCoder() { - if (coder == null) { - coder = inferCoderOrFail(); - } - return coder; + checkState(coderOrFailure.coder != null, coderOrFailure.failure); + return coderOrFailure.coder; } /** @@ -60,18 +61,18 @@ public abstract class TypedPValue<T> extends PValueBase implements PValue { * {@code apply()} called on it */ public TypedPValue<T> setCoder(Coder<T> coder) { - if (isFinishedSpecifyingInternal()) { - throw new IllegalStateException( - "cannot change the Coder of " + this + " once it's been used"); - } - if (coder == null) { - throw new IllegalArgumentException( - "Cannot setCoder(null)"); - } - this.coder = coder; + checkState( + !isFinishedSpecifyingInternal(), "cannot change the Coder of %s once it's been used", this); + checkArgument(coder != null, "Cannot setCoder(null)"); + this.coderOrFailure = new CoderOrFailure<>(coder, null); return this; } + @Override + public void finishSpecifyingOutput(PInput input, PTransform<?, ?> transform) { + this.coderOrFailure = inferCoderOrFail(input, transform, getPipeline().getCoderRegistry()); + } + /** * After building, finalizes this {@link PValue} to make it ready for * running. Automatically invoked whenever the {@link PValue} is "used" @@ -79,24 +80,26 @@ public abstract class TypedPValue<T> extends PValueBase implements PValue { * run (useful if this is a {@link PValue} with no consumers). */ @Override - public void finishSpecifying() { + public void finishSpecifying(PInput input, PTransform<?, ?> transform) { if (isFinishedSpecifyingInternal()) { return; } - super.finishSpecifying(); + this.coderOrFailure = inferCoderOrFail(input, transform, getPipeline().getCoderRegistry()); // Ensure that this TypedPValue has a coder by inferring the coder if none exists; If not, // this will throw an exception. getCoder(); + super.finishSpecifying(input, transform); } ///////////////////////////////////////////////////////////////////////////// // Internal details below here. /** - * The {@link Coder} used by this {@link TypedPValue} to encode and decode the - * values stored in it, or null if not specified nor inferred yet. + * The {@link Coder} used by this {@link TypedPValue} to encode and decode the values stored in + * it, or null if not specified nor inferred yet. */ - private Coder<T> coder; + private CoderOrFailure<T> coderOrFailure = + new CoderOrFailure<>(null, "No Coder was specified, and Coder Inference did not occur"); protected TypedPValue(Pipeline p) { super(p); @@ -125,34 +128,31 @@ public abstract class TypedPValue<T> extends PValueBase implements PValue { } /** - * If the coder is not explicitly set, this sets the coder for - * this {@link TypedPValue} to the best coder that can be inferred - * based upon the known {@link TypeDescriptor}. By default, this is null, - * but can and should be improved by subclasses. + * If the coder is not explicitly set, this sets the coder for this {@link TypedPValue} to the + * best coder that can be inferred based upon the known {@link TypeDescriptor}. By default, this + * is null, but can and should be improved by subclasses. */ @SuppressWarnings({"unchecked", "rawtypes"}) - private Coder<T> inferCoderOrFail() { + private CoderOrFailure<T> inferCoderOrFail( + PInput input, PTransform<?, ?> transform, CoderRegistry registry) { // First option for a coder: use the Coder set on this PValue. - if (coder != null) { - return coder; + if (coderOrFailure.coder != null) { + return coderOrFailure; } - AppliedPTransform<?, ?, ?> application = getProducingTransformInternal(); - // Second option for a coder: Look in the coder registry. - CoderRegistry registry = getPipeline().getCoderRegistry(); TypeDescriptor<T> token = getTypeDescriptor(); CannotProvideCoderException inferFromTokenException = null; if (token != null) { try { - return registry.getDefaultCoder(token); + return new CoderOrFailure<>(registry.getDefaultCoder(token), null); } catch (CannotProvideCoderException exc) { inferFromTokenException = exc; // Attempt to detect when the token came from a TupleTag used for a ParDo side output, // and provide a better error message if so. Unfortunately, this information is not // directly available from the TypeDescriptor, so infer based on the type of the PTransform // and the error message itself. - if (application.getTransform() instanceof ParDo.BoundMulti + if (transform instanceof ParDo.BoundMulti && exc.getReason() == ReasonCode.TYPE_ERASURE) { inferFromTokenException = new CannotProvideCoderException(exc.getMessage() + " If this error occurs for a side output of the producing ParDo, verify that the " @@ -165,8 +165,8 @@ public abstract class TypedPValue<T> extends PValueBase implements PValue { // Third option for a coder: use the default Coder from the producing PTransform. CannotProvideCoderException inputCoderException; try { - return ((PTransform) application.getTransform()).getDefaultOutputCoder( - application.getInput(), this); + return new CoderOrFailure<>( + ((PTransform) transform).getDefaultOutputCoder(input, this), null); } catch (CannotProvideCoderException exc) { inputCoderException = exc; } @@ -193,6 +193,16 @@ public abstract class TypedPValue<T> extends PValueBase implements PValue { } // Build and throw the exception. - throw new IllegalStateException(messageBuilder.toString()); + return new CoderOrFailure<>(null, messageBuilder.toString()); + } + + private static class CoderOrFailure<T> { + @Nullable private final Coder<T> coder; + @Nullable private final String failure; + + public CoderOrFailure(@Nullable Coder<T> coder, @Nullable String failure) { + this.coder = coder; + this.failure = failure; + } } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/038950df/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java index d790d39..d373caf 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java @@ -24,6 +24,7 @@ import static org.junit.Assert.assertThat; import com.google.common.base.Function; import com.google.common.collect.Lists; +import java.io.Serializable; import java.util.HashSet; import java.util.List; import java.util.Set; @@ -32,9 +33,9 @@ import org.apache.beam.sdk.io.CountingSource; import org.apache.beam.sdk.io.Read; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.transforms.SimpleFunction; +import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; @@ -54,13 +55,11 @@ import org.junit.runners.JUnit4; * Tests for {@link TransformHierarchy}. */ @RunWith(JUnit4.class) -public class TransformHierarchyTest { - - @Rule public final TestPipeline pipeline = TestPipeline.create(); - @Rule public ExpectedException thrown = ExpectedException.none(); - - private TransformHierarchy hierarchy; +public class TransformHierarchyTest implements Serializable { + @Rule public final transient TestPipeline pipeline = TestPipeline.create(); + @Rule public transient ExpectedException thrown = ExpectedException.none(); + private transient TransformHierarchy hierarchy; @Before public void setup() { @@ -162,18 +161,21 @@ public class TransformHierarchyTest { PCollection.createPrimitiveOutputInternal( pipeline, WindowingStrategy.globalDefault(), IsBounded.BOUNDED); - MapElements<Long, Long> map = MapElements.via(new SimpleFunction<Long, Long>() { - @Override - public Long apply(Long input) { - return input; - } - }); + ParDo.Bound<Long, Long> pardo = + ParDo.of( + new DoFn<Long, Long>() { + @ProcessElement + public void processElement(ProcessContext ctxt) { + ctxt.output(ctxt.element()); + } + }); PCollection<Long> mapped = PCollection.createPrimitiveOutputInternal( pipeline, WindowingStrategy.globalDefault(), IsBounded.BOUNDED); TransformHierarchy.Node compositeNode = hierarchy.pushNode("Create", begin, create); + hierarchy.finishSpecifyingInput(); assertThat(hierarchy.getCurrent(), equalTo(compositeNode)); assertThat(compositeNode.getInputs(), Matchers.emptyIterable()); assertThat(compositeNode.getTransform(), Matchers.<PTransform<?, ?>>equalTo(create)); @@ -183,6 +185,7 @@ public class TransformHierarchyTest { TransformHierarchy.Node primitiveNode = hierarchy.pushNode("Create/Read", begin, read); assertThat(hierarchy.getCurrent(), equalTo(primitiveNode)); + hierarchy.finishSpecifyingInput(); hierarchy.setOutput(created); hierarchy.popNode(); assertThat( @@ -199,7 +202,8 @@ public class TransformHierarchyTest { assertThat(hierarchy.getProducer(created), equalTo(primitiveNode)); hierarchy.popNode(); - TransformHierarchy.Node otherPrimitive = hierarchy.pushNode("ParDo", created, map); + TransformHierarchy.Node otherPrimitive = hierarchy.pushNode("ParDo", created, pardo); + hierarchy.finishSpecifyingInput(); hierarchy.setOutput(mapped); hierarchy.popNode(); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/038950df/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java index fa8874c..d95b2d0 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java @@ -1147,15 +1147,16 @@ public class ParDoTest implements Serializable { final TupleTag<Integer> mainOutputTag = new TupleTag<Integer>("main"); final TupleTag<TestDummy> sideOutputTag = new TupleTag<TestDummy>("unregisteredSide"); - PCollectionTuple outputTuple = input.apply(ParDo.of(new SideOutputDummyFn(sideOutputTag)) - .withOutputTags(mainOutputTag, TupleTagList.of(sideOutputTag))); + ParDo.BoundMulti<Integer, Integer> pardo = ParDo.of(new SideOutputDummyFn(sideOutputTag)) + .withOutputTags(mainOutputTag, TupleTagList.of(sideOutputTag)); + PCollectionTuple outputTuple = input.apply(pardo); outputTuple.get(sideOutputTag).setCoder(new TestDummyCoder()); outputTuple.get(sideOutputTag).apply(View.<TestDummy>asSingleton()); assertEquals(new TestDummyCoder(), outputTuple.get(sideOutputTag).getCoder()); - outputTuple.get(sideOutputTag).finishSpecifyingOutput(); // Check for crashes + outputTuple.get(sideOutputTag).finishSpecifyingOutput(input, pardo); // Check for crashes assertEquals(new TestDummyCoder(), outputTuple.get(sideOutputTag).getCoder()); // Check for corruption } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/038950df/sdks/java/core/src/test/java/org/apache/beam/sdk/values/TypedPValueTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/values/TypedPValueTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/values/TypedPValueTest.java index 8381f12..5e7cc7d 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/values/TypedPValueTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/values/TypedPValueTest.java @@ -159,13 +159,16 @@ public class TypedPValueTest { @Test public void testFinishSpecifyingShouldFailIfNoCoderInferrable() { + p.enableAbandonedNodeEnforcement(false); + PCollection<Integer> created = p.apply(Create.of(1, 2, 3)); + ParDo.Bound<Integer, EmptyClass> uninferrableParDo = ParDo.of(new EmptyClassDoFn()); PCollection<EmptyClass> unencodable = - p.apply(Create.of(1, 2, 3)).apply(ParDo.of(new EmptyClassDoFn())); + created.apply(uninferrableParDo); thrown.expect(IllegalStateException.class); thrown.expectMessage("Unable to return a default Coder"); thrown.expectMessage("Inferring a Coder from the CoderRegistry failed"); - unencodable.finishSpecifying(); + unencodable.finishSpecifying(created, uninferrableParDo); } }