When debugging serialization exceptions, I always find it very helpful to
use  -Dsun.io.serialization.extendedDebugInfo=true .

On Fri, Nov 3, 2017 at 9:21 AM Aleksandr <[email protected]> wrote:

> Hello,
> Probably error is in your tuple tag classes, which are anonymous classes.
> It means that your test is trying to serialise testpipeline.
>
> Best regards
> Aleksandr Gortujev
>
>
>
> 3. nov 2017 3:33 PM kirjutas kuupƤeval "Matthias Baetens" <
> [email protected]>:
>
> Hi all,
>
> I'm currently trying to write a TestStream to validate the windowing logic
> in a Beam pipeline.
>
> I'm creating a teststream of Strings and applying the different
> PTransforms to the stream, ending with a PAssert on some of the events I
> created
>
> TestStream<String> events = TestStream.create(AvroCoder.of(String.class))
>                               .addElements("", "")
>                               .advanceWatermarkToInfinity();
>
> PCollection<KV<String, ArrayList<String>>> eventsSessionised = p.apply(events)
>
>                               .apply(new Processing(new 
> TupleTag<invalidJSON>() {
>                               }, new TupleTag<Event>() {
>                               }, new TupleTag<Event>() {
>                               }, eventsEnrichedKeyedTag, "", "", 
> "")).get(eventsEnrichedKeyedTag)
>                               .apply(new 
> Sessionisation(SESSION_GAP_SIZE_HOURS, SESSION_CUT_OFF, 
> ALLOWED_LATENESS_MINUTES))
>                               .apply(new Aggregation(uniqueEventsTag, new 
> TupleTag<EventEnriched>() {
>                               })).get(uniqueEventsTag).apply(ParDo.of(new 
> EventToKV()));
>
>
> PAssert.that(eventsSessionised).inOnTimePane(new IntervalWindow(baseTime, 
> endWindow1)).containsInAnyOrder(e1,
>                               e2);
>
> Running the test function with in a main functions (new
> IngestionPipeLineTest().testOnTimeEvents();) causes the following error:
>
> Exception in thread "main" java.lang.IllegalArgumentException: unable to 
> serialize
>
> pointing at a custom DoFn which runs fine running the main pipeline.
>
>
> Not sure why this error gets thrown all of a sudden, any pointers / help 
> would be greatly appreciated.
>
> Full stacktrace:
>
> Exception in thread "main" java.lang.IllegalArgumentException: unable to 
> serialize xxx
>       at 
> org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:53)
>       at 
> org.apache.beam.sdk.util.SerializableUtils.clone(SerializableUtils.java:90)
>       at 
> org.apache.beam.sdk.transforms.ParDo$SingleOutput.<init>(ParDo.java:591)
>       at org.apache.beam.sdk.transforms.ParDo.of(ParDo.java:435)
>       at xxx.transforms.Processing.expand(Processing.java:52)
>       at xxx.transforms.Processing.expand(Processing.java:1)
>       at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:514)
>       at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:454)
>       at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:284)
>       at 
> xxx.IngestionPipeLineTest.testOnTimeEvents(IngestionPipeLineTest.java:96)
>       at xxx.IngestionPipeLineTest.main(IngestionPipeLineTest.java:155)
> Caused by: java.io.NotSerializableException: 
> org.apache.beam.sdk.testing.TestPipeline
>       at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
>       at 
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
>       at 
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
>       at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>       at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>       at 
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
>       at 
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
>       at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>       at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>       at 
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
>       at 
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
>       at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>       at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>       at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
>       at 
> org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:49)
>       ... 10 more
>
>
> Best,
>
> Matthias
>
>
>

Reply via email to