Hello,
Probably error is in your tuple tag classes, which are anonymous classes.
It means that your test is trying to serialise testpipeline.

Best regards
Aleksandr Gortujev



3. nov 2017 3:33 PM kirjutas kuupƤeval "Matthias Baetens" <
[email protected]>:

Hi all,

I'm currently trying to write a TestStream to validate the windowing logic
in a Beam pipeline.

I'm creating a teststream of Strings and applying the different PTransforms
to the stream, ending with a PAssert on some of the events I created

TestStream<String> events = TestStream.create(AvroCoder.of(String.class))
                                .addElements("", "")
                                .advanceWatermarkToInfinity();

PCollection<KV<String, ArrayList<String>>> eventsSessionised = p.apply(events)

                                .apply(new Processing(new 
TupleTag<invalidJSON>() {
                                }, new TupleTag<Event>() {
                                }, new TupleTag<Event>() {
                                }, eventsEnrichedKeyedTag, "", "", 
"")).get(eventsEnrichedKeyedTag)
                                .apply(new 
Sessionisation(SESSION_GAP_SIZE_HOURS, SESSION_CUT_OFF,
ALLOWED_LATENESS_MINUTES))
                                .apply(new Aggregation(uniqueEventsTag, new 
TupleTag<EventEnriched>() {
                                })).get(uniqueEventsTag).apply(ParDo.of(new 
EventToKV()));


PAssert.that(eventsSessionised).inOnTimePane(new
IntervalWindow(baseTime, endWindow1)).containsInAnyOrder(e1,
                                e2);

Running the test function with in a main functions (new
IngestionPipeLineTest().testOnTimeEvents();) causes the following error:

Exception in thread "main" java.lang.IllegalArgumentException: unable
to serialize

pointing at a custom DoFn which runs fine running the main pipeline.


Not sure why this error gets thrown all of a sudden, any pointers /
help would be greatly appreciated.

Full stacktrace:

Exception in thread "main" java.lang.IllegalArgumentException: unable
to serialize xxx
        at 
org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:53)
        at 
org.apache.beam.sdk.util.SerializableUtils.clone(SerializableUtils.java:90)
        at 
org.apache.beam.sdk.transforms.ParDo$SingleOutput.<init>(ParDo.java:591)
        at org.apache.beam.sdk.transforms.ParDo.of(ParDo.java:435)
        at xxx.transforms.Processing.expand(Processing.java:52)
        at xxx.transforms.Processing.expand(Processing.java:1)
        at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:514)
        at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:454)
        at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:284)
        at 
xxx.IngestionPipeLineTest.testOnTimeEvents(IngestionPipeLineTest.java:96)
        at xxx.IngestionPipeLineTest.main(IngestionPipeLineTest.java:155)
Caused by: java.io.NotSerializableException:
org.apache.beam.sdk.testing.TestPipeline
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
        at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
        at 
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
        at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
        at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
        at 
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
        at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
        at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
        at 
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
        at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
        at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
        at 
org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:49)
        ... 10 more


Best,

Matthias

Reply via email to