[ 
https://issues.apache.org/jira/browse/BEAM-5409?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

haden lee updated BEAM-5409:
----------------------------
    Description: 
I may be missing something obvious, but for some reason I can't make 
{{PAssert}} & {{TestPipeline}} work with {{CoGroupByKey}} -- but without it, it 
works fine.
 Here is a reference test file that can reproduce the issue I'm facing. I 
tested with both beam sdk 2.4 and 2.5.

([For the record this was posted on StackOverflow 
before|https://stackoverflow.com/questions/51334429/beam-java-sdk-2-4-2-5-passert-with-cogroupbykey].)

For comparison, {{testWorking}} works as intended, and {{testBroken}} has an 
additional step like this:
{code:java}
// code placeholder
// The following four lines causes an issue.
PCollectionTuple tuple =
    KeyedPCollectionTuple.of(inTag1, pc1.apply("String to KV<String, String>", 
ParDo.of(new String2KV())))
        .and(inTag2, pc2).apply("CoGroupByKey", 
CoGroupByKey.<String>create()).apply("Some Merge DoFn",
            ParDo.of(new MergeDoFn(inTag1, inTag2, 
outTag2)).withOutputTags(outTag1, TupleTagList.of(outTag2)));
{code}
The error I get can be found after the code below.

Has anyone had a similar issue with test pipeline before? I haven't tested it 
yet extensively, but I couldn't find relevant information on {{CoGroupByKey}} & 
{{TestPipeline}} together. In production, the same code works fine for my team, 
and we wanted to add a few unit tests using {{TestPipeline}} and {{PAssert}}. 
That's how we ended up with this issue.

Any help will be appreciated!

*NOTE: Resolved, after adding 'implements Serializable' to the main Test class 
as shown below. Without it, it will throw an exception. I'll leave the original 
contents for reference.*
{code:java}
// code placeholder
public class ReferenceTest implements Serializable {
  @Rule
  public final transient TestPipeline pipe1 = TestPipeline.create();
  @Rule
  public final transient TestPipeline pipe2 = TestPipeline.create();

  public static class String2KV extends DoFn<String, KV<String, String>> {
    @ProcessElement
    public void processElement(ProcessContext c) {
      // "key1:value1" -> ["key1", "value1"]
      String[] tokens = c.element().split(":");
      c.output(KV.of(tokens[0], tokens[1]));
    }
  }

  public static class MergeDoFn extends DoFn<KV<String, CoGbkResult>, String> {
    final TupleTag<String> inTag1;
    final TupleTag<String> inTag2;
    final TupleTag<String> outTag2;

    public MergeDoFn(TupleTag<String> inTag1, TupleTag<String> inTag2, 
TupleTag<String> outTag2) {
      this.inTag1 = inTag1;
      this.inTag2 = inTag2;
      this.outTag2 = outTag2;
    }

    @ProcessElement
    public void processElement(ProcessContext c) {
      String val1 = c.element().getValue().getOnly(inTag1);
      String val2 = c.element().getValue().getOnly(inTag2);

      // outTag1 = main output
      // outTag2 = side output
      c.output(outTag2, val1 + "," + val2);
    }
  }

  @Test
  public void testWorking() {
    // Create two PCs for test.
    PCollection<String> pc1 =
        pipe1.apply("create pc1", 
Create.<String>of("key1:value1").withCoder(StringUtf8Coder.of()));
    PCollection<KV<String, String>> pc2 =
        pipe1.apply("create pc2", Create.<KV<String, String>>of(KV.of("key1", 
"key1:value2"))
            .withCoder(KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of())));

    // Sanity check.
    PAssert.that(pc1).containsInAnyOrder("key1:value1");
    PAssert.that(pc2).containsInAnyOrder(KV.of("key1", "key1:value2"));

    pipe1.run();
  }

  // Disabled as of 2018-07-13.
  // 
https://stackoverflow.com/questions/51334429/beam-java-sdk-2-4-2-5-passert-with-cogroupbykey
  @Test
  public void testBroken() {
    // Create two PCs for test.
    PCollection<String> pc1 =
        pipe2.apply("create pc1", 
Create.<String>of("key1:value1").withCoder(StringUtf8Coder.of()));
    PCollection<KV<String, String>> pc2 =
        pipe2.apply("create pc2", Create.<KV<String, String>>of(KV.of("key1", 
"value2"))
            .withCoder(KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of())));

    // Sanity check.
    PAssert.that(pc1).containsInAnyOrder("key1:value1");
    PAssert.that(pc2).containsInAnyOrder(KV.of("key1", "value2"));

    TupleTag<String> inTag1 = new TupleTag<String>() {
      private static final long serialVersionUID = 1L;
    };
    TupleTag<String> inTag2 = new TupleTag<String>() {
      private static final long serialVersionUID = 1L;
    };
    TupleTag<String> outTag1 = new TupleTag<String>() {
      private static final long serialVersionUID = 1L;
    };
    TupleTag<String> outTag2 = new TupleTag<String>() {
      private static final long serialVersionUID = 1L;
    };

    // The following four lines causes an issue.
    PCollectionTuple tuple =
        KeyedPCollectionTuple.of(inTag1, pc1.apply("String to KV<String, 
String>", ParDo.of(new String2KV())))
            .and(inTag2, pc2).apply("CoGroupByKey", 
CoGroupByKey.<String>create()).apply("Some Merge DoFn",
                ParDo.of(new MergeDoFn(inTag1, inTag2, 
outTag2)).withOutputTags(outTag1, TupleTagList.of(outTag2)));

    PAssert.that(tuple.get(outTag1)).empty();
    PAssert.that(tuple.get(outTag2)).containsInAnyOrder("value1,value2");

    pipe2.run();
  }
}
{code}
Here's the error:
  
{code:java}
// code placeholder
java.lang.IllegalArgumentException: unable to serialize 
org.apache.beam.sdk.transforms.join.CoGbkResult$CoGbkResultCoder@217aa4f

    at 
org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:57)
    at 
org.apache.beam.runners.direct.repackaged.runners.core.construction.CoderTranslation.toCustomCoder(CoderTranslation.java:121)
    at 
org.apache.beam.runners.direct.repackaged.runners.core.construction.CoderTranslation.toProto(CoderTranslation.java:85)
    at 
org.apache.beam.runners.direct.repackaged.runners.core.construction.SdkComponents.registerCoder(SdkComponents.java:183)
    at 
org.apache.beam.runners.direct.repackaged.runners.core.construction.CoderTranslation.registerComponents(CoderTranslation.java:107)
    at 
org.apache.beam.runners.direct.repackaged.runners.core.construction.CoderTranslation.toKnownCoder(CoderTranslation.java:91)
    at 
org.apache.beam.runners.direct.repackaged.runners.core.construction.CoderTranslation.toProto(CoderTranslation.java:83)
    at 
org.apache.beam.runners.direct.repackaged.runners.core.construction.SdkComponents.registerCoder(SdkComponents.java:183)
    at 
org.apache.beam.runners.direct.repackaged.runners.core.construction.PCollectionTranslation.toProto(PCollectionTranslation.java:36)
    at 
org.apache.beam.runners.direct.repackaged.runners.core.construction.SdkComponents.registerPCollection(SdkComponents.java:138)
    at 
org.apache.beam.runners.direct.repackaged.runners.core.construction.PTransformTranslation.toProto(PTransformTranslation.java:173)
    at 
org.apache.beam.runners.direct.repackaged.runners.core.construction.ParDoTranslation.getParDoPayload(ParDoTranslation.java:515)
    at 
org.apache.beam.runners.direct.repackaged.runners.core.construction.ParDoTranslation.isSplittable(ParDoTranslation.java:525)
    at 
org.apache.beam.runners.direct.repackaged.runners.core.construction.PTransformMatchers$4.matches(PTransformMatchers.java:194)
    at org.apache.beam.sdk.Pipeline$2.visitPrimitiveTransform(Pipeline.java:278)
    at 
org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:668)
    at 
org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:660)
    at 
org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:660)
    at 
org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:660)
    at 
org.apache.beam.sdk.runners.TransformHierarchy$Node.access$600(TransformHierarchy.java:311)
    at 
org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:245)
    at org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:458)
    at org.apache.beam.sdk.Pipeline.replace(Pipeline.java:256)
    at org.apache.beam.sdk.Pipeline.replaceAll(Pipeline.java:209)
    at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:173)
    at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:62)
    at org.apache.beam.sdk.Pipeline.run(Pipeline.java:311)
    at org.apache.beam.sdk.testing.TestPipeline.run(TestPipeline.java:353)
    at org.apache.beam.sdk.testing.TestPipeline.run(TestPipeline.java:335)
    at exp.moloco.dataflow2.ReferenceTest.testBroken(ReferenceTest.java:110)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
    at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
    at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
    at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
    at 
org.apache.beam.sdk.testing.TestPipeline$1.evaluate(TestPipeline.java:324)
    at 
org.apache.beam.sdk.testing.TestPipeline$1.evaluate(TestPipeline.java:324)
    at org.junit.rules.RunRules.evaluate(RunRules.java:20)
    at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
    at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
    at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
    at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
    at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
    at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
    at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
    at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
    at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
    at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
    at 
com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
    at 
com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47)
    at 
com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242)
    at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)
Caused by: java.io.NotSerializableException: exp.moloco.dataflow2.ReferenceTest
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
    at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
    at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
    at java.util.HashMap.internalWriteEntries(HashMap.java:1789)
    at java.util.HashMap.writeObject(HashMap.java:1363)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1128)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
    at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
    at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
    at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
    at 
org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:53)
    ... 54 more


Process finished with exit code 255
{code}
 

  was:
I may be missing something obvious, but for some reason I can't make 
{{PAssert}} & {{TestPipeline}} work with {{CoGroupByKey}} -- but without it, it 
works fine.
 Here is a reference test file that can reproduce the issue I'm facing. I 
tested with both beam sdk 2.4 and 2.5.

([For the record this was posted on StackOverflow 
before|https://stackoverflow.com/questions/51334429/beam-java-sdk-2-4-2-5-passert-with-cogroupbykey].)

For comparison, {{testWorking}} works as intended, and {{testBroken}} has an 
additional step like this:
{code:java}
// code placeholder
// The following four lines causes an issue.
PCollectionTuple tuple =
    KeyedPCollectionTuple.of(inTag1, pc1.apply("String to KV<String, String>", 
ParDo.of(new String2KV())))
        .and(inTag2, pc2).apply("CoGroupByKey", 
CoGroupByKey.<String>create()).apply("Some Merge DoFn",
            ParDo.of(new MergeDoFn(inTag1, inTag2, 
outTag2)).withOutputTags(outTag1, TupleTagList.of(outTag2)));
{code}
The error I get can be found after the code below.

Has anyone had a similar issue with test pipeline before? I haven't tested it 
yet extensively, but I couldn't find relevant information on {{CoGroupByKey}} & 
{{TestPipeline}} together. In production, the same code works fine for my team, 
and we wanted to add a few unit tests using {{TestPipeline}} and {{PAssert}}. 
That's how we ended up with this issue.

Any help will be appreciated!

 

 
{code:java}
// code placeholder
public class ReferenceTest {
  @Rule
  public final transient TestPipeline pipe1 = TestPipeline.create();
  @Rule
  public final transient TestPipeline pipe2 = TestPipeline.create();

  public static class String2KV extends DoFn<String, KV<String, String>> {
    @ProcessElement
    public void processElement(ProcessContext c) {
      // "key1:value1" -> ["key1", "value1"]
      String[] tokens = c.element().split(":");
      c.output(KV.of(tokens[0], tokens[1]));
    }
  }

  public static class MergeDoFn extends DoFn<KV<String, CoGbkResult>, String> {
    final TupleTag<String> inTag1;
    final TupleTag<String> inTag2;
    final TupleTag<String> outTag2;

    public MergeDoFn(TupleTag<String> inTag1, TupleTag<String> inTag2, 
TupleTag<String> outTag2) {
      this.inTag1 = inTag1;
      this.inTag2 = inTag2;
      this.outTag2 = outTag2;
    }

    @ProcessElement
    public void processElement(ProcessContext c) {
      String val1 = c.element().getValue().getOnly(inTag1);
      String val2 = c.element().getValue().getOnly(inTag2);

      // outTag1 = main output
      // outTag2 = side output
      c.output(outTag2, val1 + "," + val2);
    }
  }

  @Test
  public void testWorking() {
    // Create two PCs for test.
    PCollection<String> pc1 =
        pipe1.apply("create pc1", 
Create.<String>of("key1:value1").withCoder(StringUtf8Coder.of()));
    PCollection<KV<String, String>> pc2 =
        pipe1.apply("create pc2", Create.<KV<String, String>>of(KV.of("key1", 
"key1:value2"))
            .withCoder(KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of())));

    // Sanity check.
    PAssert.that(pc1).containsInAnyOrder("key1:value1");
    PAssert.that(pc2).containsInAnyOrder(KV.of("key1", "key1:value2"));

    pipe1.run();
  }

  @Test
  public void testBroken() {
    // Create two PCs for test.
    PCollection<String> pc1 =
        pipe2.apply("create pc1", 
Create.<String>of("key1:value1").withCoder(StringUtf8Coder.of()));
    PCollection<KV<String, String>> pc2 =
        pipe2.apply("create pc2", Create.<KV<String, String>>of(KV.of("key1", 
"key1:value2"))
            .withCoder(KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of())));

    // Sanity check.
    PAssert.that(pc1).containsInAnyOrder("key1:value1");
    PAssert.that(pc2).containsInAnyOrder(KV.of("key1", "key1:value2"));

    TupleTag<String> inTag1 = new TupleTag<String>() {
      private static final long serialVersionUID = 1L;
    };
    TupleTag<String> inTag2 = new TupleTag<String>() {
      private static final long serialVersionUID = 1L;
    };
    TupleTag<String> outTag1 = new TupleTag<String>() {
      private static final long serialVersionUID = 1L;
    };
    TupleTag<String> outTag2 = new TupleTag<String>() {
      private static final long serialVersionUID = 1L;
    };

    // The following four lines causes an issue.
    PCollectionTuple tuple =
        KeyedPCollectionTuple.of(inTag1, pc1.apply("String to KV<String, 
String>", ParDo.of(new String2KV())))
            .and(inTag2, pc2).apply("CoGroupByKey", 
CoGroupByKey.<String>create()).apply("Some Merge DoFn",
                ParDo.of(new MergeDoFn(inTag1, inTag2, 
outTag2)).withOutputTags(outTag1, TupleTagList.of(outTag2)));

    // Without the following two PAsserts, the CoGBK step above seems to cause 
an issue.
    PAssert.that(tuple.get(outTag1)).empty();
    PAssert.that(tuple.get(outTag2)).containsInAnyOrder("value1,value2");

    pipe2.run();
  }
}
{code}
Here's the error:
  
{code:java}
// code placeholder
java.lang.IllegalArgumentException: unable to serialize 
org.apache.beam.sdk.transforms.join.CoGbkResult$CoGbkResultCoder@217aa4f

    at 
org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:57)
    at 
org.apache.beam.runners.direct.repackaged.runners.core.construction.CoderTranslation.toCustomCoder(CoderTranslation.java:121)
    at 
org.apache.beam.runners.direct.repackaged.runners.core.construction.CoderTranslation.toProto(CoderTranslation.java:85)
    at 
org.apache.beam.runners.direct.repackaged.runners.core.construction.SdkComponents.registerCoder(SdkComponents.java:183)
    at 
org.apache.beam.runners.direct.repackaged.runners.core.construction.CoderTranslation.registerComponents(CoderTranslation.java:107)
    at 
org.apache.beam.runners.direct.repackaged.runners.core.construction.CoderTranslation.toKnownCoder(CoderTranslation.java:91)
    at 
org.apache.beam.runners.direct.repackaged.runners.core.construction.CoderTranslation.toProto(CoderTranslation.java:83)
    at 
org.apache.beam.runners.direct.repackaged.runners.core.construction.SdkComponents.registerCoder(SdkComponents.java:183)
    at 
org.apache.beam.runners.direct.repackaged.runners.core.construction.PCollectionTranslation.toProto(PCollectionTranslation.java:36)
    at 
org.apache.beam.runners.direct.repackaged.runners.core.construction.SdkComponents.registerPCollection(SdkComponents.java:138)
    at 
org.apache.beam.runners.direct.repackaged.runners.core.construction.PTransformTranslation.toProto(PTransformTranslation.java:173)
    at 
org.apache.beam.runners.direct.repackaged.runners.core.construction.ParDoTranslation.getParDoPayload(ParDoTranslation.java:515)
    at 
org.apache.beam.runners.direct.repackaged.runners.core.construction.ParDoTranslation.isSplittable(ParDoTranslation.java:525)
    at 
org.apache.beam.runners.direct.repackaged.runners.core.construction.PTransformMatchers$4.matches(PTransformMatchers.java:194)
    at org.apache.beam.sdk.Pipeline$2.visitPrimitiveTransform(Pipeline.java:278)
    at 
org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:668)
    at 
org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:660)
    at 
org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:660)
    at 
org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:660)
    at 
org.apache.beam.sdk.runners.TransformHierarchy$Node.access$600(TransformHierarchy.java:311)
    at 
org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:245)
    at org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:458)
    at org.apache.beam.sdk.Pipeline.replace(Pipeline.java:256)
    at org.apache.beam.sdk.Pipeline.replaceAll(Pipeline.java:209)
    at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:173)
    at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:62)
    at org.apache.beam.sdk.Pipeline.run(Pipeline.java:311)
    at org.apache.beam.sdk.testing.TestPipeline.run(TestPipeline.java:353)
    at org.apache.beam.sdk.testing.TestPipeline.run(TestPipeline.java:335)
    at exp.moloco.dataflow2.ReferenceTest.testBroken(ReferenceTest.java:110)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
    at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
    at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
    at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
    at 
org.apache.beam.sdk.testing.TestPipeline$1.evaluate(TestPipeline.java:324)
    at 
org.apache.beam.sdk.testing.TestPipeline$1.evaluate(TestPipeline.java:324)
    at org.junit.rules.RunRules.evaluate(RunRules.java:20)
    at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
    at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
    at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
    at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
    at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
    at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
    at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
    at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
    at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
    at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
    at 
com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
    at 
com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47)
    at 
com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242)
    at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)
Caused by: java.io.NotSerializableException: exp.moloco.dataflow2.ReferenceTest
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
    at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
    at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
    at java.util.HashMap.internalWriteEntries(HashMap.java:1789)
    at java.util.HashMap.writeObject(HashMap.java:1363)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1128)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
    at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
    at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
    at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
    at 
org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:53)
    ... 54 more


Process finished with exit code 255
{code}
 


> Beam Java SDK 2.4/2.5 PAssert with CoGroupByKey
> -----------------------------------------------
>
>                 Key: BEAM-5409
>                 URL: https://issues.apache.org/jira/browse/BEAM-5409
>             Project: Beam
>          Issue Type: Bug
>          Components: testing
>    Affects Versions: 2.4.0, 2.5.0
>            Reporter: haden lee
>            Assignee: Jason Kuster
>            Priority: Major
>             Fix For: 2.4.0, 2.5.0
>
>
> I may be missing something obvious, but for some reason I can't make 
> {{PAssert}} & {{TestPipeline}} work with {{CoGroupByKey}} -- but without it, 
> it works fine.
>  Here is a reference test file that can reproduce the issue I'm facing. I 
> tested with both beam sdk 2.4 and 2.5.
> ([For the record this was posted on StackOverflow 
> before|https://stackoverflow.com/questions/51334429/beam-java-sdk-2-4-2-5-passert-with-cogroupbykey].)
> For comparison, {{testWorking}} works as intended, and {{testBroken}} has an 
> additional step like this:
> {code:java}
> // code placeholder
> // The following four lines causes an issue.
> PCollectionTuple tuple =
>     KeyedPCollectionTuple.of(inTag1, pc1.apply("String to KV<String, 
> String>", ParDo.of(new String2KV())))
>         .and(inTag2, pc2).apply("CoGroupByKey", 
> CoGroupByKey.<String>create()).apply("Some Merge DoFn",
>             ParDo.of(new MergeDoFn(inTag1, inTag2, 
> outTag2)).withOutputTags(outTag1, TupleTagList.of(outTag2)));
> {code}
> The error I get can be found after the code below.
> Has anyone had a similar issue with test pipeline before? I haven't tested it 
> yet extensively, but I couldn't find relevant information on {{CoGroupByKey}} 
> & {{TestPipeline}} together. In production, the same code works fine for my 
> team, and we wanted to add a few unit tests using {{TestPipeline}} and 
> {{PAssert}}. That's how we ended up with this issue.
> Any help will be appreciated!
> *NOTE: Resolved, after adding 'implements Serializable' to the main Test 
> class as shown below. Without it, it will throw an exception. I'll leave the 
> original contents for reference.*
> {code:java}
> // code placeholder
> public class ReferenceTest implements Serializable {
>   @Rule
>   public final transient TestPipeline pipe1 = TestPipeline.create();
>   @Rule
>   public final transient TestPipeline pipe2 = TestPipeline.create();
>   public static class String2KV extends DoFn<String, KV<String, String>> {
>     @ProcessElement
>     public void processElement(ProcessContext c) {
>       // "key1:value1" -> ["key1", "value1"]
>       String[] tokens = c.element().split(":");
>       c.output(KV.of(tokens[0], tokens[1]));
>     }
>   }
>   public static class MergeDoFn extends DoFn<KV<String, CoGbkResult>, String> 
> {
>     final TupleTag<String> inTag1;
>     final TupleTag<String> inTag2;
>     final TupleTag<String> outTag2;
>     public MergeDoFn(TupleTag<String> inTag1, TupleTag<String> inTag2, 
> TupleTag<String> outTag2) {
>       this.inTag1 = inTag1;
>       this.inTag2 = inTag2;
>       this.outTag2 = outTag2;
>     }
>     @ProcessElement
>     public void processElement(ProcessContext c) {
>       String val1 = c.element().getValue().getOnly(inTag1);
>       String val2 = c.element().getValue().getOnly(inTag2);
>       // outTag1 = main output
>       // outTag2 = side output
>       c.output(outTag2, val1 + "," + val2);
>     }
>   }
>   @Test
>   public void testWorking() {
>     // Create two PCs for test.
>     PCollection<String> pc1 =
>         pipe1.apply("create pc1", 
> Create.<String>of("key1:value1").withCoder(StringUtf8Coder.of()));
>     PCollection<KV<String, String>> pc2 =
>         pipe1.apply("create pc2", Create.<KV<String, String>>of(KV.of("key1", 
> "key1:value2"))
>             .withCoder(KvCoder.of(StringUtf8Coder.of(), 
> StringUtf8Coder.of())));
>     // Sanity check.
>     PAssert.that(pc1).containsInAnyOrder("key1:value1");
>     PAssert.that(pc2).containsInAnyOrder(KV.of("key1", "key1:value2"));
>     pipe1.run();
>   }
>   // Disabled as of 2018-07-13.
>   // 
> https://stackoverflow.com/questions/51334429/beam-java-sdk-2-4-2-5-passert-with-cogroupbykey
>   @Test
>   public void testBroken() {
>     // Create two PCs for test.
>     PCollection<String> pc1 =
>         pipe2.apply("create pc1", 
> Create.<String>of("key1:value1").withCoder(StringUtf8Coder.of()));
>     PCollection<KV<String, String>> pc2 =
>         pipe2.apply("create pc2", Create.<KV<String, String>>of(KV.of("key1", 
> "value2"))
>             .withCoder(KvCoder.of(StringUtf8Coder.of(), 
> StringUtf8Coder.of())));
>     // Sanity check.
>     PAssert.that(pc1).containsInAnyOrder("key1:value1");
>     PAssert.that(pc2).containsInAnyOrder(KV.of("key1", "value2"));
>     TupleTag<String> inTag1 = new TupleTag<String>() {
>       private static final long serialVersionUID = 1L;
>     };
>     TupleTag<String> inTag2 = new TupleTag<String>() {
>       private static final long serialVersionUID = 1L;
>     };
>     TupleTag<String> outTag1 = new TupleTag<String>() {
>       private static final long serialVersionUID = 1L;
>     };
>     TupleTag<String> outTag2 = new TupleTag<String>() {
>       private static final long serialVersionUID = 1L;
>     };
>     // The following four lines causes an issue.
>     PCollectionTuple tuple =
>         KeyedPCollectionTuple.of(inTag1, pc1.apply("String to KV<String, 
> String>", ParDo.of(new String2KV())))
>             .and(inTag2, pc2).apply("CoGroupByKey", 
> CoGroupByKey.<String>create()).apply("Some Merge DoFn",
>                 ParDo.of(new MergeDoFn(inTag1, inTag2, 
> outTag2)).withOutputTags(outTag1, TupleTagList.of(outTag2)));
>     PAssert.that(tuple.get(outTag1)).empty();
>     PAssert.that(tuple.get(outTag2)).containsInAnyOrder("value1,value2");
>     pipe2.run();
>   }
> }
> {code}
> Here's the error:
>   
> {code:java}
> // code placeholder
> java.lang.IllegalArgumentException: unable to serialize 
> org.apache.beam.sdk.transforms.join.CoGbkResult$CoGbkResultCoder@217aa4f
>     at 
> org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:57)
>     at 
> org.apache.beam.runners.direct.repackaged.runners.core.construction.CoderTranslation.toCustomCoder(CoderTranslation.java:121)
>     at 
> org.apache.beam.runners.direct.repackaged.runners.core.construction.CoderTranslation.toProto(CoderTranslation.java:85)
>     at 
> org.apache.beam.runners.direct.repackaged.runners.core.construction.SdkComponents.registerCoder(SdkComponents.java:183)
>     at 
> org.apache.beam.runners.direct.repackaged.runners.core.construction.CoderTranslation.registerComponents(CoderTranslation.java:107)
>     at 
> org.apache.beam.runners.direct.repackaged.runners.core.construction.CoderTranslation.toKnownCoder(CoderTranslation.java:91)
>     at 
> org.apache.beam.runners.direct.repackaged.runners.core.construction.CoderTranslation.toProto(CoderTranslation.java:83)
>     at 
> org.apache.beam.runners.direct.repackaged.runners.core.construction.SdkComponents.registerCoder(SdkComponents.java:183)
>     at 
> org.apache.beam.runners.direct.repackaged.runners.core.construction.PCollectionTranslation.toProto(PCollectionTranslation.java:36)
>     at 
> org.apache.beam.runners.direct.repackaged.runners.core.construction.SdkComponents.registerPCollection(SdkComponents.java:138)
>     at 
> org.apache.beam.runners.direct.repackaged.runners.core.construction.PTransformTranslation.toProto(PTransformTranslation.java:173)
>     at 
> org.apache.beam.runners.direct.repackaged.runners.core.construction.ParDoTranslation.getParDoPayload(ParDoTranslation.java:515)
>     at 
> org.apache.beam.runners.direct.repackaged.runners.core.construction.ParDoTranslation.isSplittable(ParDoTranslation.java:525)
>     at 
> org.apache.beam.runners.direct.repackaged.runners.core.construction.PTransformMatchers$4.matches(PTransformMatchers.java:194)
>     at 
> org.apache.beam.sdk.Pipeline$2.visitPrimitiveTransform(Pipeline.java:278)
>     at 
> org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:668)
>     at 
> org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:660)
>     at 
> org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:660)
>     at 
> org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:660)
>     at 
> org.apache.beam.sdk.runners.TransformHierarchy$Node.access$600(TransformHierarchy.java:311)
>     at 
> org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:245)
>     at org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:458)
>     at org.apache.beam.sdk.Pipeline.replace(Pipeline.java:256)
>     at org.apache.beam.sdk.Pipeline.replaceAll(Pipeline.java:209)
>     at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:173)
>     at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:62)
>     at org.apache.beam.sdk.Pipeline.run(Pipeline.java:311)
>     at org.apache.beam.sdk.testing.TestPipeline.run(TestPipeline.java:353)
>     at org.apache.beam.sdk.testing.TestPipeline.run(TestPipeline.java:335)
>     at exp.moloco.dataflow2.ReferenceTest.testBroken(ReferenceTest.java:110)
>     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>     at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>     at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>     at java.lang.reflect.Method.invoke(Method.java:498)
>     at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>     at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>     at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>     at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>     at 
> org.apache.beam.sdk.testing.TestPipeline$1.evaluate(TestPipeline.java:324)
>     at 
> org.apache.beam.sdk.testing.TestPipeline$1.evaluate(TestPipeline.java:324)
>     at org.junit.rules.RunRules.evaluate(RunRules.java:20)
>     at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
>     at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
>     at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
>     at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
>     at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
>     at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
>     at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
>     at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
>     at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
>     at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
>     at 
> com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
>     at 
> com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47)
>     at 
> com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242)
>     at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)
> Caused by: java.io.NotSerializableException: 
> exp.moloco.dataflow2.ReferenceTest
>     at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
>     at 
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
>     at 
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
>     at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>     at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>     at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
>     at java.util.HashMap.internalWriteEntries(HashMap.java:1789)
>     at java.util.HashMap.writeObject(HashMap.java:1363)
>     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>     at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>     at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>     at java.lang.reflect.Method.invoke(Method.java:498)
>     at 
> java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1128)
>     at 
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
>     at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>     at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>     at 
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
>     at 
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
>     at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>     at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>     at 
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
>     at 
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
>     at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>     at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>     at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
>     at 
> org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:53)
>     ... 54 more
> Process finished with exit code 255
> {code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to