Hi Aleksandr, Awesome. That did the trick. Did bump in to this relevant SO answer <https://stackoverflow.com/a/28033607/6316101>, but overlooked those TupleTags, good spot!
@Eugene: thanks a lot for the tip. Will give it a try soon ;) Cheers, Matthias On Fri, Nov 3, 2017 at 9:00 PM, Eugene Kirpichov <kirpic...@google.com> wrote: > When debugging serialization exceptions, I always find it very helpful to > use -Dsun.io.serialization.extendedDebugInfo=true . > > On Fri, Nov 3, 2017 at 9:21 AM Aleksandr <aleksandr...@gmail.com> wrote: > >> Hello, >> Probably error is in your tuple tag classes, which are anonymous classes. >> It means that your test is trying to serialise testpipeline. >> >> Best regards >> Aleksandr Gortujev >> >> >> >> 3. nov 2017 3:33 PM kirjutas kuupƤeval "Matthias Baetens" < >> matthias.baet...@datatonic.com>: >> >> Hi all, >> >> I'm currently trying to write a TestStream to validate the windowing >> logic in a Beam pipeline. >> >> I'm creating a teststream of Strings and applying the different >> PTransforms to the stream, ending with a PAssert on some of the events I >> created >> >> TestStream<String> events = TestStream.create(AvroCoder.of(String.class)) >> .addElements("", "") >> .advanceWatermarkToInfinity(); >> >> PCollection<KV<String, ArrayList<String>>> eventsSessionised = >> p.apply(events) >> >> .apply(new Processing(new >> TupleTag<invalidJSON>() { >> }, new TupleTag<Event>() { >> }, new TupleTag<Event>() { >> }, eventsEnrichedKeyedTag, "", "", >> "")).get(eventsEnrichedKeyedTag) >> .apply(new >> Sessionisation(SESSION_GAP_SIZE_HOURS, SESSION_CUT_OFF, >> ALLOWED_LATENESS_MINUTES)) >> .apply(new Aggregation(uniqueEventsTag, new >> TupleTag<EventEnriched>() { >> })).get(uniqueEventsTag).apply(ParDo.of(new >> EventToKV())); >> >> >> PAssert.that(eventsSessionised).inOnTimePane(new IntervalWindow(baseTime, >> endWindow1)).containsInAnyOrder(e1, >> e2); >> >> Running the test function with in a main functions (new >> IngestionPipeLineTest().testOnTimeEvents();) causes the following error: >> >> Exception in thread "main" java.lang.IllegalArgumentException: unable to >> serialize >> >> pointing at a custom DoFn which runs fine running the main pipeline. >> >> >> Not sure why this error gets thrown all of a sudden, any pointers / help >> would be greatly appreciated. >> >> Full stacktrace: >> >> Exception in thread "main" java.lang.IllegalArgumentException: unable to >> serialize xxx >> at >> org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:53) >> at >> org.apache.beam.sdk.util.SerializableUtils.clone(SerializableUtils.java:90) >> at >> org.apache.beam.sdk.transforms.ParDo$SingleOutput.<init>(ParDo.java:591) >> at org.apache.beam.sdk.transforms.ParDo.of(ParDo.java:435) >> at xxx.transforms.Processing.expand(Processing.java:52) >> at xxx.transforms.Processing.expand(Processing.java:1) >> at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:514) >> at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:454) >> at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:284) >> at >> xxx.IngestionPipeLineTest.testOnTimeEvents(IngestionPipeLineTest.java:96) >> at xxx.IngestionPipeLineTest.main(IngestionPipeLineTest.java:155) >> Caused by: java.io.NotSerializableException: >> org.apache.beam.sdk.testing.TestPipeline >> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184) >> at >> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) >> at >> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) >> at >> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) >> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) >> at >> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) >> at >> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) >> at >> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) >> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) >> at >> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) >> at >> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) >> at >> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) >> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) >> at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) >> at >> org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:49) >> ... 10 more >> >> >> Best, >> >> Matthias >> >> >> -- *Matthias Baetens* *datatonic | data power unleashed* office +44 203 668 3680 | mobile +44 74 918 20646 Level24 | 1 Canada Square | Canary Wharf | E14 5AB London We've been announced <https://blog.google/topics/google-cloud/investing-vibrant-google-cloud-ecosystem-new-programs-and-partnerships/> as one of the top global Google Cloud Machine Learning partners.