Repository: incubator-beam Updated Branches: refs/heads/apex-runner fa3a6aa8d -> 968eb32b8
Encode bundle elements in the DirectRunner This ensures that any changes that are caused when an element is encoded and decoded is caught within the pipeline. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/2ceaa3ef Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/2ceaa3ef Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/2ceaa3ef Branch: refs/heads/apex-runner Commit: 2ceaa3effa8a6d9de3753a05db9d1648e8eed576 Parents: c03e3e9 Author: Thomas Groh <tg...@google.com> Authored: Tue Sep 20 11:43:40 2016 -0700 Committer: Thomas Groh <tg...@google.com> Committed: Tue Oct 25 11:03:43 2016 -0700 ---------------------------------------------------------------------- .../runners/direct/CloningBundleFactory.java | 98 ++++++++++ .../beam/runners/direct/DirectRunner.java | 5 +- .../direct/ImmutableListBundleFactory.java | 4 +- .../direct/CloningBundleFactoryTest.java | 177 +++++++++++++++++++ .../EncodabilityEnforcementFactoryTest.java | 6 +- 5 files changed, 285 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2ceaa3ef/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CloningBundleFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CloningBundleFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CloningBundleFactory.java new file mode 100644 index 0000000..33241e3 --- /dev/null +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CloningBundleFactory.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.direct; + +import org.apache.beam.runners.direct.DirectRunner.CommittedBundle; +import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderException; +import org.apache.beam.sdk.util.CoderUtils; +import org.apache.beam.sdk.util.UserCodeException; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.values.PCollection; +import org.joda.time.Instant; + +/** + * A {@link BundleFactory} where a created {@link UncommittedBundle} clones all elements added to it + * using the coder of the {@link PCollection}. + */ +class CloningBundleFactory implements BundleFactory { + private static final CloningBundleFactory INSTANCE = new CloningBundleFactory(); + + public static CloningBundleFactory create() { + return INSTANCE; + } + + private final ImmutableListBundleFactory underlying; + private CloningBundleFactory() { + this.underlying = ImmutableListBundleFactory.create(); + } + + @Override + public <T> UncommittedBundle<T> createRootBundle() { + // The DirectRunner is responsible for these elements, but they need not be encodable. + return underlying.createRootBundle(); + } + + @Override + public <T> UncommittedBundle<T> createBundle( + PCollection<T> output) { + return new CloningBundle<>(underlying.createBundle(output)); + } + + @Override + public <K, T> UncommittedBundle<T> createKeyedBundle( + StructuralKey<K> key, PCollection<T> output) { + return new CloningBundle<>(underlying.createKeyedBundle(key, output)); + } + + private static class CloningBundle<T> implements UncommittedBundle<T> { + private final UncommittedBundle<T> underlying; + private final Coder<T> coder; + + private CloningBundle(UncommittedBundle<T> underlying) { + this.underlying = underlying; + this.coder = underlying.getPCollection().getCoder(); + } + + @Override + public PCollection<T> getPCollection() { + return underlying.getPCollection(); + } + + @Override + public UncommittedBundle<T> add(WindowedValue<T> element) { + try { + // Use the cloned value to ensure that if the coder behaves poorly (e.g. a NoOpCoder that + // does not expect to be used) that is reflected in the values given to downstream + // transforms + WindowedValue<T> clone = element.withValue(CoderUtils.clone(coder, element.getValue())); + underlying.add(clone); + } catch (CoderException e) { + throw UserCodeException.wrap(e); + } + return this; + } + + @Override + public CommittedBundle<T> commit(Instant synchronizedProcessingTime) { + return underlying.commit(synchronizedProcessingTime); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2ceaa3ef/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java index b79a42f..e02c8a6 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java @@ -319,7 +319,10 @@ public class DirectRunner } private BundleFactory createBundleFactory(DirectOptions pipelineOptions) { - BundleFactory bundleFactory = ImmutableListBundleFactory.create(); + BundleFactory bundleFactory = + pipelineOptions.isEnforceEncodability() + ? CloningBundleFactory.create() + : ImmutableListBundleFactory.create(); if (pipelineOptions.isEnforceImmutability()) { bundleFactory = ImmutabilityCheckingBundleFactory.create(bundleFactory); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2ceaa3ef/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutableListBundleFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutableListBundleFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutableListBundleFactory.java index db92542..abc6dd8 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutableListBundleFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutableListBundleFactory.java @@ -32,8 +32,10 @@ import org.joda.time.Instant; * A factory that produces bundles that perform no additional validation. */ class ImmutableListBundleFactory implements BundleFactory { + private static final ImmutableListBundleFactory FACTORY = new ImmutableListBundleFactory(); + public static ImmutableListBundleFactory create() { - return new ImmutableListBundleFactory(); + return FACTORY; } private ImmutableListBundleFactory() {} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2ceaa3ef/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CloningBundleFactoryTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CloningBundleFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CloningBundleFactoryTest.java new file mode 100644 index 0000000..03846d9 --- /dev/null +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CloningBundleFactoryTest.java @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.direct; + +import static org.hamcrest.Matchers.anyOf; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.isA; +import static org.hamcrest.Matchers.not; +import static org.hamcrest.Matchers.theInstance; +import static org.junit.Assert.assertThat; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Iterables; +import org.apache.beam.runners.direct.DirectRunner.CommittedBundle; +import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle; +import org.apache.beam.runners.direct.EncodabilityEnforcementFactoryTest.Record; +import org.apache.beam.runners.direct.EncodabilityEnforcementFactoryTest.RecordNoDecodeCoder; +import org.apache.beam.runners.direct.EncodabilityEnforcementFactoryTest.RecordNoEncodeCoder; +import org.apache.beam.sdk.coders.CoderException; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.coders.VarIntCoder; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.GroupByKey; +import org.apache.beam.sdk.transforms.WithKeys; +import org.apache.beam.sdk.util.UserCodeException; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.hamcrest.Matchers; +import org.joda.time.Instant; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * Tests for {@link CloningBundleFactory}. + */ +@RunWith(JUnit4.class) +public class CloningBundleFactoryTest { + @Rule public ExpectedException thrown = ExpectedException.none(); + private CloningBundleFactory factory = CloningBundleFactory.create(); + + @Test + public void rootBundleSucceedsIgnoresCoder() { + WindowedValue<Record> one = WindowedValue.valueInGlobalWindow(new Record()); + WindowedValue<Record> two = WindowedValue.valueInGlobalWindow(new Record()); + CommittedBundle<Record> root = + factory.<Record>createRootBundle().add(one).add(two).commit(Instant.now()); + + assertThat(root.getElements(), containsInAnyOrder(one, two)); + } + + @Test + public void bundleWorkingCoderSucceedsClonesOutput() { + TestPipeline p = TestPipeline.create(); + PCollection<Integer> created = p.apply(Create.of(1, 3).withCoder(VarIntCoder.of())); + PCollection<KV<String, Integer>> kvs = + created + .apply(WithKeys.<String, Integer>of("foo")) + .setCoder(KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of())); + WindowedValue<KV<String, Integer>> fooOne = WindowedValue.valueInGlobalWindow(KV.of("foo", 1)); + WindowedValue<KV<String, Integer>> fooThree = + WindowedValue.valueInGlobalWindow(KV.of("foo", 3)); + CommittedBundle<KV<String, Integer>> bundle = + factory.createBundle(kvs).add(fooOne).add(fooThree).commit(Instant.now()); + + assertThat(bundle.getElements(), containsInAnyOrder(fooOne, fooThree)); + assertThat( + bundle.getElements(), not(containsInAnyOrder(theInstance(fooOne), theInstance(fooThree)))); + for (WindowedValue<KV<String, Integer>> foo : bundle.getElements()) { + assertThat( + foo.getValue(), + not(anyOf(theInstance(fooOne.getValue()), theInstance(fooThree.getValue())))); + } + assertThat(bundle.getPCollection(), equalTo(kvs)); + } + + @Test + public void keyedBundleWorkingCoderSucceedsClonesOutput() { + TestPipeline p = TestPipeline.create(); + PCollection<Integer> created = p.apply(Create.of(1, 3).withCoder(VarIntCoder.of())); + + PCollection<KV<String, Iterable<Integer>>> keyed = + created + .apply(WithKeys.<String, Integer>of("foo")) + .setCoder(KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of())) + .apply(GroupByKey.<String, Integer>create()); + WindowedValue<KV<String, Iterable<Integer>>> foos = + WindowedValue.valueInGlobalWindow( + KV.<String, Iterable<Integer>>of("foo", ImmutableList.of(1, 3))); + CommittedBundle<KV<String, Iterable<Integer>>> keyedBundle = + factory + .createKeyedBundle(StructuralKey.of("foo", StringUtf8Coder.of()), keyed) + .add(foos) + .commit(Instant.now()); + + assertThat(keyedBundle.getElements(), containsInAnyOrder(foos)); + assertThat( + Iterables.getOnlyElement(keyedBundle.getElements()).getValue(), + not(theInstance(foos.getValue()))); + assertThat(keyedBundle.getPCollection(), equalTo(keyed)); + assertThat( + keyedBundle.getKey(), + Matchers.<StructuralKey<?>>equalTo(StructuralKey.of("foo", StringUtf8Coder.of()))); + } + + @Test + public void bundleEncodeFailsAddFails() { + TestPipeline p = TestPipeline.create(); + PCollection<Record> pc = p.apply(Create.<Record>of().withCoder(new RecordNoEncodeCoder())); + UncommittedBundle<Record> bundle = factory.createBundle(pc); + + thrown.expect(UserCodeException.class); + thrown.expectCause(isA(CoderException.class)); + thrown.expectMessage("Encode not allowed"); + bundle.add(WindowedValue.valueInGlobalWindow(new Record())); + } + + @Test + public void bundleDecodeFailsAddFails() { + TestPipeline p = TestPipeline.create(); + PCollection<Record> pc = p.apply(Create.<Record>of().withCoder(new RecordNoDecodeCoder())); + UncommittedBundle<Record> bundle = factory.createBundle(pc); + + thrown.expect(UserCodeException.class); + thrown.expectCause(isA(CoderException.class)); + thrown.expectMessage("Decode not allowed"); + bundle.add(WindowedValue.valueInGlobalWindow(new Record())); + } + + @Test + public void keyedBundleEncodeFailsAddFails() { + TestPipeline p = TestPipeline.create(); + PCollection<Record> pc = p.apply(Create.<Record>of().withCoder(new RecordNoEncodeCoder())); + UncommittedBundle<Record> bundle = + factory.createKeyedBundle(StructuralKey.of("foo", StringUtf8Coder.of()), pc); + + thrown.expect(UserCodeException.class); + thrown.expectCause(isA(CoderException.class)); + thrown.expectMessage("Encode not allowed"); + bundle.add(WindowedValue.valueInGlobalWindow(new Record())); + } + + @Test + public void keyedBundleDecodeFailsAddFails() { + TestPipeline p = TestPipeline.create(); + PCollection<Record> pc = p.apply(Create.<Record>of().withCoder(new RecordNoDecodeCoder())); + UncommittedBundle<Record> bundle = + factory.createKeyedBundle(StructuralKey.of("foo", StringUtf8Coder.of()), pc); + + thrown.expect(UserCodeException.class); + thrown.expectCause(isA(CoderException.class)); + thrown.expectMessage("Decode not allowed"); + bundle.add(WindowedValue.valueInGlobalWindow(new Record())); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2ceaa3ef/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EncodabilityEnforcementFactoryTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EncodabilityEnforcementFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EncodabilityEnforcementFactoryTest.java index e62bf01..e6bdbd0 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EncodabilityEnforcementFactoryTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EncodabilityEnforcementFactoryTest.java @@ -208,8 +208,8 @@ public class EncodabilityEnforcementFactoryTest { Collections.<CommittedBundle<?>>emptyList()); } - private static class Record {} - private static class RecordNoEncodeCoder extends AtomicCoder<Record> { + static class Record {} + static class RecordNoEncodeCoder extends AtomicCoder<Record> { @Override public void encode( @@ -228,7 +228,7 @@ public class EncodabilityEnforcementFactoryTest { } } - private static class RecordNoDecodeCoder extends AtomicCoder<Record> { + static class RecordNoDecodeCoder extends AtomicCoder<Record> { @Override public void encode( Record value,