Hello, Probably error is in your tuple tag classes, which are anonymous classes. It means that your test is trying to serialise testpipeline.
Best regards Aleksandr Gortujev 3. nov 2017 3:33 PM kirjutas kuupƤeval "Matthias Baetens" < [email protected]>: Hi all, I'm currently trying to write a TestStream to validate the windowing logic in a Beam pipeline. I'm creating a teststream of Strings and applying the different PTransforms to the stream, ending with a PAssert on some of the events I created TestStream<String> events = TestStream.create(AvroCoder.of(String.class)) .addElements("", "") .advanceWatermarkToInfinity(); PCollection<KV<String, ArrayList<String>>> eventsSessionised = p.apply(events) .apply(new Processing(new TupleTag<invalidJSON>() { }, new TupleTag<Event>() { }, new TupleTag<Event>() { }, eventsEnrichedKeyedTag, "", "", "")).get(eventsEnrichedKeyedTag) .apply(new Sessionisation(SESSION_GAP_SIZE_HOURS, SESSION_CUT_OFF, ALLOWED_LATENESS_MINUTES)) .apply(new Aggregation(uniqueEventsTag, new TupleTag<EventEnriched>() { })).get(uniqueEventsTag).apply(ParDo.of(new EventToKV())); PAssert.that(eventsSessionised).inOnTimePane(new IntervalWindow(baseTime, endWindow1)).containsInAnyOrder(e1, e2); Running the test function with in a main functions (new IngestionPipeLineTest().testOnTimeEvents();) causes the following error: Exception in thread "main" java.lang.IllegalArgumentException: unable to serialize pointing at a custom DoFn which runs fine running the main pipeline. Not sure why this error gets thrown all of a sudden, any pointers / help would be greatly appreciated. Full stacktrace: Exception in thread "main" java.lang.IllegalArgumentException: unable to serialize xxx at org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:53) at org.apache.beam.sdk.util.SerializableUtils.clone(SerializableUtils.java:90) at org.apache.beam.sdk.transforms.ParDo$SingleOutput.<init>(ParDo.java:591) at org.apache.beam.sdk.transforms.ParDo.of(ParDo.java:435) at xxx.transforms.Processing.expand(Processing.java:52) at xxx.transforms.Processing.expand(Processing.java:1) at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:514) at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:454) at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:284) at xxx.IngestionPipeLineTest.testOnTimeEvents(IngestionPipeLineTest.java:96) at xxx.IngestionPipeLineTest.main(IngestionPipeLineTest.java:155) Caused by: java.io.NotSerializableException: org.apache.beam.sdk.testing.TestPipeline at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) at org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:49) ... 10 more Best, Matthias
