Hi Aleksandr,

Awesome. That did the trick. Did bump in to this relevant SO answer
<https://stackoverflow.com/a/28033607/6316101>, but overlooked those
TupleTags, good spot!

@Eugene: thanks a lot for the tip. Will give it a try soon ;)

Cheers,
Matthias

On Fri, Nov 3, 2017 at 9:00 PM, Eugene Kirpichov <kirpic...@google.com>
wrote:

> When debugging serialization exceptions, I always find it very helpful to
> use  -Dsun.io.serialization.extendedDebugInfo=true .
>
> On Fri, Nov 3, 2017 at 9:21 AM Aleksandr <aleksandr...@gmail.com> wrote:
>
>> Hello,
>> Probably error is in your tuple tag classes, which are anonymous classes.
>> It means that your test is trying to serialise testpipeline.
>>
>> Best regards
>> Aleksandr Gortujev
>>
>>
>>
>> 3. nov 2017 3:33 PM kirjutas kuupƤeval "Matthias Baetens" <
>> matthias.baet...@datatonic.com>:
>>
>> Hi all,
>>
>> I'm currently trying to write a TestStream to validate the windowing
>> logic in a Beam pipeline.
>>
>> I'm creating a teststream of Strings and applying the different
>> PTransforms to the stream, ending with a PAssert on some of the events I
>> created
>>
>> TestStream<String> events = TestStream.create(AvroCoder.of(String.class))
>>                              .addElements("", "")
>>                              .advanceWatermarkToInfinity();
>>
>> PCollection<KV<String, ArrayList<String>>> eventsSessionised = 
>> p.apply(events)
>>
>>                              .apply(new Processing(new 
>> TupleTag<invalidJSON>() {
>>                              }, new TupleTag<Event>() {
>>                              }, new TupleTag<Event>() {
>>                              }, eventsEnrichedKeyedTag, "", "", 
>> "")).get(eventsEnrichedKeyedTag)
>>                              .apply(new 
>> Sessionisation(SESSION_GAP_SIZE_HOURS, SESSION_CUT_OFF, 
>> ALLOWED_LATENESS_MINUTES))
>>                              .apply(new Aggregation(uniqueEventsTag, new 
>> TupleTag<EventEnriched>() {
>>                              })).get(uniqueEventsTag).apply(ParDo.of(new 
>> EventToKV()));
>>
>>
>> PAssert.that(eventsSessionised).inOnTimePane(new IntervalWindow(baseTime, 
>> endWindow1)).containsInAnyOrder(e1,
>>                              e2);
>>
>> Running the test function with in a main functions (new
>> IngestionPipeLineTest().testOnTimeEvents();) causes the following error:
>>
>> Exception in thread "main" java.lang.IllegalArgumentException: unable to 
>> serialize
>>
>> pointing at a custom DoFn which runs fine running the main pipeline.
>>
>>
>> Not sure why this error gets thrown all of a sudden, any pointers / help 
>> would be greatly appreciated.
>>
>> Full stacktrace:
>>
>> Exception in thread "main" java.lang.IllegalArgumentException: unable to 
>> serialize xxx
>>      at 
>> org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:53)
>>      at 
>> org.apache.beam.sdk.util.SerializableUtils.clone(SerializableUtils.java:90)
>>      at 
>> org.apache.beam.sdk.transforms.ParDo$SingleOutput.<init>(ParDo.java:591)
>>      at org.apache.beam.sdk.transforms.ParDo.of(ParDo.java:435)
>>      at xxx.transforms.Processing.expand(Processing.java:52)
>>      at xxx.transforms.Processing.expand(Processing.java:1)
>>      at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:514)
>>      at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:454)
>>      at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:284)
>>      at 
>> xxx.IngestionPipeLineTest.testOnTimeEvents(IngestionPipeLineTest.java:96)
>>      at xxx.IngestionPipeLineTest.main(IngestionPipeLineTest.java:155)
>> Caused by: java.io.NotSerializableException: 
>> org.apache.beam.sdk.testing.TestPipeline
>>      at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
>>      at 
>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
>>      at 
>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
>>      at 
>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>>      at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>>      at 
>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
>>      at 
>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
>>      at 
>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>>      at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>>      at 
>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
>>      at 
>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
>>      at 
>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>>      at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>>      at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
>>      at 
>> org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:49)
>>      ... 10 more
>>
>>
>> Best,
>>
>> Matthias
>>
>>
>>


-- 


*Matthias Baetens*


*datatonic | data power unleashed*

office +44 203 668 3680  |  mobile +44 74 918 20646

Level24 | 1 Canada Square | Canary Wharf | E14 5AB London


We've been announced
<https://blog.google/topics/google-cloud/investing-vibrant-google-cloud-ecosystem-new-programs-and-partnerships/>
as
one of the top global Google Cloud Machine Learning partners.

Reply via email to