Fix EmptyFlattenAsCreateFactory The input types of Flatten and Create don't match up, so a composite must be provided instead of Create directly.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/0f910688 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/0f910688 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/0f910688 Branch: refs/heads/master Commit: 0f910688cb1d6dcf1bc701fbc4e4124cef190f10 Parents: 2723d38 Author: Thomas Groh <tg...@google.com> Authored: Fri Mar 24 15:38:20 2017 -0700 Committer: Thomas Groh <tg...@google.com> Committed: Fri Mar 24 17:46:53 2017 -0700 ---------------------------------------------------------------------- .../EmptyFlattenAsCreateFactory.java | 15 ++- .../EmptyFlattenAsCreateFactoryTest.java | 96 ++++++++++++++++++++ 2 files changed, 109 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/0f910688/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/EmptyFlattenAsCreateFactory.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/EmptyFlattenAsCreateFactory.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/EmptyFlattenAsCreateFactory.java index 0168039..4328cf3 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/EmptyFlattenAsCreateFactory.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/EmptyFlattenAsCreateFactory.java @@ -52,14 +52,17 @@ public class EmptyFlattenAsCreateFactory<T> @Override public PTransform<PCollectionList<T>, PCollection<T>> getReplacementTransform( Flatten.PCollections<T> transform) { - return (PTransform) Create.empty(VoidCoder.of()); + return new CreateEmptyFromList<>(); } @Override public PCollectionList<T> getInput( List<TaggedPValue> inputs, Pipeline p) { checkArgument( - inputs.isEmpty(), "Must have an empty input to use %s", getClass().getSimpleName()); + inputs.isEmpty(), + "Unexpected nonempty input %s for %s", + inputs, + getClass().getSimpleName()); return PCollectionList.empty(p); } @@ -68,4 +71,12 @@ public class EmptyFlattenAsCreateFactory<T> List<TaggedPValue> outputs, PCollection<T> newOutput) { return ReplacementOutputs.singleton(outputs, newOutput); } + + private static class CreateEmptyFromList<T> + extends PTransform<PCollectionList<T>, PCollection<T>> { + @Override + public PCollection<T> expand(PCollectionList<T> input) { + return (PCollection) input.getPipeline().apply(Create.empty(VoidCoder.of())); + } + } } http://git-wip-us.apache.org/repos/asf/beam/blob/0f910688/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/EmptyFlattenAsCreateFactoryTest.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/EmptyFlattenAsCreateFactoryTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/EmptyFlattenAsCreateFactoryTest.java new file mode 100644 index 0000000..ad9d908 --- /dev/null +++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/EmptyFlattenAsCreateFactoryTest.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.core.construction; + +import static org.hamcrest.Matchers.equalTo; +import static org.junit.Assert.assertThat; + +import com.google.common.collect.Iterables; +import java.util.Collections; +import java.util.Map; +import org.apache.beam.sdk.io.CountingInput; +import org.apache.beam.sdk.runners.PTransformOverrideFactory.ReplacementOutput; +import org.apache.beam.sdk.testing.NeedsRunner; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.Flatten; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionList; +import org.apache.beam.sdk.values.PValue; +import org.apache.beam.sdk.values.TaggedPValue; +import org.hamcrest.Matchers; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * Tests for {@link EmptyFlattenAsCreateFactory}. + */ +@RunWith(JUnit4.class) +public class EmptyFlattenAsCreateFactoryTest { + @Rule public TestPipeline pipeline = TestPipeline.create(); + @Rule public ExpectedException thrown = ExpectedException.none(); + + private EmptyFlattenAsCreateFactory<Long> factory = EmptyFlattenAsCreateFactory.instance(); + + @Test + public void getInputEmptySucceeds() { + assertThat( + factory.getInput(Collections.<TaggedPValue>emptyList(), pipeline).size(), equalTo(0)); + } + + @Test + public void getInputNonEmptyThrows() { + PCollectionList<Long> nonEmpty = + PCollectionList.of(pipeline.apply(CountingInput.unbounded())) + .and(pipeline.apply(CountingInput.upTo(100L))); + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage(nonEmpty.expand().toString()); + thrown.expectMessage(EmptyFlattenAsCreateFactory.class.getSimpleName()); + factory.getInput(nonEmpty.expand(), pipeline); + } + + @Test + public void mapOutputsSucceeds() { + PCollection<Long> original = pipeline.apply("Original", CountingInput.unbounded()); + PCollection<Long> replacement = pipeline.apply("Replacement", CountingInput.unbounded()); + Map<PValue, ReplacementOutput> mapping = factory.mapOutputs(original.expand(), replacement); + + assertThat( + mapping, + Matchers.<PValue, ReplacementOutput>hasEntry( + replacement, + ReplacementOutput.of( + Iterables.getOnlyElement(original.expand()), + Iterables.getOnlyElement(replacement.expand())))); + } + + @Test + @Category(NeedsRunner.class) + public void testOverride() { + PCollectionList<Long> empty = PCollectionList.empty(pipeline); + PCollection<Long> emptyFlattened = + empty.apply(factory.getReplacementTransform(Flatten.<Long>pCollections())); + PAssert.that(emptyFlattened).empty(); + pipeline.run(); + } +}