Repository: incubator-beam
Updated Branches:
  refs/heads/apex-runner fa3a6aa8d -> 968eb32b8


Encode bundle elements in the DirectRunner

This ensures that any changes that are caused when an element is encoded
and decoded is caught within the pipeline.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/2ceaa3ef
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/2ceaa3ef
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/2ceaa3ef

Branch: refs/heads/apex-runner
Commit: 2ceaa3effa8a6d9de3753a05db9d1648e8eed576
Parents: c03e3e9
Author: Thomas Groh <tg...@google.com>
Authored: Tue Sep 20 11:43:40 2016 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Tue Oct 25 11:03:43 2016 -0700

----------------------------------------------------------------------
 .../runners/direct/CloningBundleFactory.java    |  98 ++++++++++
 .../beam/runners/direct/DirectRunner.java       |   5 +-
 .../direct/ImmutableListBundleFactory.java      |   4 +-
 .../direct/CloningBundleFactoryTest.java        | 177 +++++++++++++++++++
 .../EncodabilityEnforcementFactoryTest.java     |   6 +-
 5 files changed, 285 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2ceaa3ef/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CloningBundleFactory.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CloningBundleFactory.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CloningBundleFactory.java
new file mode 100644
index 0000000..33241e3
--- /dev/null
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CloningBundleFactory.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.direct;
+
+import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
+import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.util.CoderUtils;
+import org.apache.beam.sdk.util.UserCodeException;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.PCollection;
+import org.joda.time.Instant;
+
+/**
+ * A {@link BundleFactory} where a created {@link UncommittedBundle} clones 
all elements added to it
+ * using the coder of the {@link PCollection}.
+ */
+class CloningBundleFactory implements BundleFactory {
+  private static final CloningBundleFactory INSTANCE = new 
CloningBundleFactory();
+
+  public static CloningBundleFactory create() {
+    return INSTANCE;
+  }
+
+  private final ImmutableListBundleFactory underlying;
+  private CloningBundleFactory() {
+    this.underlying = ImmutableListBundleFactory.create();
+  }
+
+  @Override
+  public <T> UncommittedBundle<T> createRootBundle() {
+    // The DirectRunner is responsible for these elements, but they need not 
be encodable.
+    return underlying.createRootBundle();
+  }
+
+  @Override
+  public <T> UncommittedBundle<T> createBundle(
+      PCollection<T> output) {
+    return new CloningBundle<>(underlying.createBundle(output));
+  }
+
+  @Override
+  public <K, T> UncommittedBundle<T> createKeyedBundle(
+      StructuralKey<K> key, PCollection<T> output) {
+    return new CloningBundle<>(underlying.createKeyedBundle(key, output));
+  }
+
+  private static class CloningBundle<T> implements UncommittedBundle<T> {
+    private final UncommittedBundle<T> underlying;
+    private final Coder<T> coder;
+
+    private CloningBundle(UncommittedBundle<T> underlying) {
+      this.underlying = underlying;
+      this.coder = underlying.getPCollection().getCoder();
+    }
+
+    @Override
+    public PCollection<T> getPCollection() {
+      return underlying.getPCollection();
+    }
+
+    @Override
+    public UncommittedBundle<T> add(WindowedValue<T> element) {
+      try {
+        // Use the cloned value to ensure that if the coder behaves poorly 
(e.g. a NoOpCoder that
+        // does not expect to be used) that is reflected in the values given 
to downstream
+        // transforms
+        WindowedValue<T> clone = element.withValue(CoderUtils.clone(coder, 
element.getValue()));
+        underlying.add(clone);
+      } catch (CoderException e) {
+        throw UserCodeException.wrap(e);
+      }
+      return this;
+    }
+
+    @Override
+    public CommittedBundle<T> commit(Instant synchronizedProcessingTime) {
+      return underlying.commit(synchronizedProcessingTime);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2ceaa3ef/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
index b79a42f..e02c8a6 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
@@ -319,7 +319,10 @@ public class DirectRunner
   }
 
   private BundleFactory createBundleFactory(DirectOptions pipelineOptions) {
-    BundleFactory bundleFactory = ImmutableListBundleFactory.create();
+    BundleFactory bundleFactory =
+        pipelineOptions.isEnforceEncodability()
+            ? CloningBundleFactory.create()
+            : ImmutableListBundleFactory.create();
     if (pipelineOptions.isEnforceImmutability()) {
       bundleFactory = ImmutabilityCheckingBundleFactory.create(bundleFactory);
     }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2ceaa3ef/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutableListBundleFactory.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutableListBundleFactory.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutableListBundleFactory.java
index db92542..abc6dd8 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutableListBundleFactory.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutableListBundleFactory.java
@@ -32,8 +32,10 @@ import org.joda.time.Instant;
  * A factory that produces bundles that perform no additional validation.
  */
 class ImmutableListBundleFactory implements BundleFactory {
+  private static final ImmutableListBundleFactory FACTORY = new 
ImmutableListBundleFactory();
+
   public static ImmutableListBundleFactory create() {
-    return new ImmutableListBundleFactory();
+    return FACTORY;
   }
 
   private ImmutableListBundleFactory() {}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2ceaa3ef/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CloningBundleFactoryTest.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CloningBundleFactoryTest.java
 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CloningBundleFactoryTest.java
new file mode 100644
index 0000000..03846d9
--- /dev/null
+++ 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CloningBundleFactoryTest.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.direct;
+
+import static org.hamcrest.Matchers.anyOf;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.isA;
+import static org.hamcrest.Matchers.not;
+import static org.hamcrest.Matchers.theInstance;
+import static org.junit.Assert.assertThat;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
+import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
+import 
org.apache.beam.runners.direct.EncodabilityEnforcementFactoryTest.Record;
+import 
org.apache.beam.runners.direct.EncodabilityEnforcementFactoryTest.RecordNoDecodeCoder;
+import 
org.apache.beam.runners.direct.EncodabilityEnforcementFactoryTest.RecordNoEncodeCoder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.coders.VarIntCoder;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.WithKeys;
+import org.apache.beam.sdk.util.UserCodeException;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.hamcrest.Matchers;
+import org.joda.time.Instant;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests for {@link CloningBundleFactory}.
+ */
+@RunWith(JUnit4.class)
+public class CloningBundleFactoryTest {
+  @Rule public ExpectedException thrown = ExpectedException.none();
+  private CloningBundleFactory factory = CloningBundleFactory.create();
+
+  @Test
+  public void rootBundleSucceedsIgnoresCoder() {
+    WindowedValue<Record> one = WindowedValue.valueInGlobalWindow(new 
Record());
+    WindowedValue<Record> two = WindowedValue.valueInGlobalWindow(new 
Record());
+    CommittedBundle<Record> root =
+        
factory.<Record>createRootBundle().add(one).add(two).commit(Instant.now());
+
+    assertThat(root.getElements(), containsInAnyOrder(one, two));
+  }
+
+  @Test
+  public void bundleWorkingCoderSucceedsClonesOutput() {
+    TestPipeline p = TestPipeline.create();
+    PCollection<Integer> created = p.apply(Create.of(1, 
3).withCoder(VarIntCoder.of()));
+    PCollection<KV<String, Integer>> kvs =
+        created
+            .apply(WithKeys.<String, Integer>of("foo"))
+            .setCoder(KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()));
+    WindowedValue<KV<String, Integer>> fooOne = 
WindowedValue.valueInGlobalWindow(KV.of("foo", 1));
+    WindowedValue<KV<String, Integer>> fooThree =
+        WindowedValue.valueInGlobalWindow(KV.of("foo", 3));
+    CommittedBundle<KV<String, Integer>> bundle =
+        
factory.createBundle(kvs).add(fooOne).add(fooThree).commit(Instant.now());
+
+    assertThat(bundle.getElements(), containsInAnyOrder(fooOne, fooThree));
+    assertThat(
+        bundle.getElements(), not(containsInAnyOrder(theInstance(fooOne), 
theInstance(fooThree))));
+    for (WindowedValue<KV<String, Integer>> foo : bundle.getElements()) {
+      assertThat(
+          foo.getValue(),
+          not(anyOf(theInstance(fooOne.getValue()), 
theInstance(fooThree.getValue()))));
+    }
+    assertThat(bundle.getPCollection(), equalTo(kvs));
+  }
+
+  @Test
+  public void keyedBundleWorkingCoderSucceedsClonesOutput() {
+    TestPipeline p = TestPipeline.create();
+    PCollection<Integer> created = p.apply(Create.of(1, 
3).withCoder(VarIntCoder.of()));
+
+    PCollection<KV<String, Iterable<Integer>>> keyed =
+        created
+            .apply(WithKeys.<String, Integer>of("foo"))
+            .setCoder(KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()))
+            .apply(GroupByKey.<String, Integer>create());
+    WindowedValue<KV<String, Iterable<Integer>>> foos =
+        WindowedValue.valueInGlobalWindow(
+            KV.<String, Iterable<Integer>>of("foo", ImmutableList.of(1, 3)));
+    CommittedBundle<KV<String, Iterable<Integer>>> keyedBundle =
+        factory
+            .createKeyedBundle(StructuralKey.of("foo", StringUtf8Coder.of()), 
keyed)
+            .add(foos)
+            .commit(Instant.now());
+
+    assertThat(keyedBundle.getElements(), containsInAnyOrder(foos));
+    assertThat(
+        Iterables.getOnlyElement(keyedBundle.getElements()).getValue(),
+        not(theInstance(foos.getValue())));
+    assertThat(keyedBundle.getPCollection(), equalTo(keyed));
+    assertThat(
+        keyedBundle.getKey(),
+        Matchers.<StructuralKey<?>>equalTo(StructuralKey.of("foo", 
StringUtf8Coder.of())));
+  }
+
+  @Test
+  public void bundleEncodeFailsAddFails() {
+    TestPipeline p = TestPipeline.create();
+    PCollection<Record> pc = p.apply(Create.<Record>of().withCoder(new 
RecordNoEncodeCoder()));
+    UncommittedBundle<Record> bundle = factory.createBundle(pc);
+
+    thrown.expect(UserCodeException.class);
+    thrown.expectCause(isA(CoderException.class));
+    thrown.expectMessage("Encode not allowed");
+    bundle.add(WindowedValue.valueInGlobalWindow(new Record()));
+  }
+
+  @Test
+  public void bundleDecodeFailsAddFails() {
+    TestPipeline p = TestPipeline.create();
+    PCollection<Record> pc = p.apply(Create.<Record>of().withCoder(new 
RecordNoDecodeCoder()));
+    UncommittedBundle<Record> bundle = factory.createBundle(pc);
+
+    thrown.expect(UserCodeException.class);
+    thrown.expectCause(isA(CoderException.class));
+    thrown.expectMessage("Decode not allowed");
+    bundle.add(WindowedValue.valueInGlobalWindow(new Record()));
+  }
+
+  @Test
+  public void keyedBundleEncodeFailsAddFails() {
+    TestPipeline p = TestPipeline.create();
+    PCollection<Record> pc = p.apply(Create.<Record>of().withCoder(new 
RecordNoEncodeCoder()));
+    UncommittedBundle<Record> bundle =
+        factory.createKeyedBundle(StructuralKey.of("foo", 
StringUtf8Coder.of()), pc);
+
+    thrown.expect(UserCodeException.class);
+    thrown.expectCause(isA(CoderException.class));
+    thrown.expectMessage("Encode not allowed");
+    bundle.add(WindowedValue.valueInGlobalWindow(new Record()));
+  }
+
+  @Test
+  public void keyedBundleDecodeFailsAddFails() {
+    TestPipeline p = TestPipeline.create();
+    PCollection<Record> pc = p.apply(Create.<Record>of().withCoder(new 
RecordNoDecodeCoder()));
+    UncommittedBundle<Record> bundle =
+        factory.createKeyedBundle(StructuralKey.of("foo", 
StringUtf8Coder.of()), pc);
+
+    thrown.expect(UserCodeException.class);
+    thrown.expectCause(isA(CoderException.class));
+    thrown.expectMessage("Decode not allowed");
+    bundle.add(WindowedValue.valueInGlobalWindow(new Record()));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2ceaa3ef/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EncodabilityEnforcementFactoryTest.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EncodabilityEnforcementFactoryTest.java
 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EncodabilityEnforcementFactoryTest.java
index e62bf01..e6bdbd0 100644
--- 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EncodabilityEnforcementFactoryTest.java
+++ 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EncodabilityEnforcementFactoryTest.java
@@ -208,8 +208,8 @@ public class EncodabilityEnforcementFactoryTest {
         Collections.<CommittedBundle<?>>emptyList());
   }
 
-  private static class Record {}
-  private static class RecordNoEncodeCoder extends AtomicCoder<Record> {
+  static class Record {}
+  static class RecordNoEncodeCoder extends AtomicCoder<Record> {
 
     @Override
     public void encode(
@@ -228,7 +228,7 @@ public class EncodabilityEnforcementFactoryTest {
     }
   }
 
-  private static class RecordNoDecodeCoder extends AtomicCoder<Record> {
+  static class RecordNoDecodeCoder extends AtomicCoder<Record> {
     @Override
     public void encode(
         Record value,

Reply via email to