please find another unit test with more realistic usecase. I am compiling it 
using Java 8 and the stacktrace I get from the testGeneric is attached.
thx,a. 

    On Friday, 12 May 2017, 1:52, Thomas Groh <[email protected]> wrote:
 

 Hey Antony;
I've tried to update your code to compile in Java 7 and removed the 
SerializableUtils Dan mentioned, and I can't seem to reproduce the issue. Can 
you share more about the specific issue you're having? A stack trace would be 
really helpful.
Thanks,
Thomas
On Thu, May 11, 2017 at 4:42 PM, Dan Halperin <[email protected]> wrote:

Hi Anthony,
I'm a little confused by your code snippet:
SerializableUtils.serializeToB yteArray(    input.apply(ParDo.of(new 
NonGenericOutput<>())).apply(" passing", View.asSingleton()));
is serializing a PCollectionView object. Those are not necessarily serializable 
and it's not clear that it makes sense to do this at all.
Can you say more about what you're trying to do?
Thanks,Dan
On Thu, May 11, 2017 at 4:21 PM, Antony Mayi <[email protected]> wrote:

I tried the 2.0.0rc2 and I started facing weird serialization issue that's not 
been happening on 0.6.0. I am able to reproduce it using the attached unit test 
- see first serialization attempt which is passing ok and second one slightly 
different (transform output type is generic) that's failing just on 2.0.0

Can anyone find a way around it? In my real case I depend on generics output 
types.

thanks,
antony.





   
package sertest;

import org.apache.beam.sdk.coders.DefaultCoder;
import org.apache.beam.sdk.coders.SerializableCoder;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
import org.junit.Rule;
import org.junit.Test;

import java.io.Serializable;

public class SerTest {
    @Rule public final transient TestPipeline pipeline = TestPipeline.create();

    @DefaultCoder(SerializableCoder.class) private static class Output<T extends Serializable> implements Serializable {
        // body irrelevant
    }

    private static class GenericOutput<T extends Serializable> extends DoFn<Integer, Output<T>> {
        @ProcessElement public void processElement(ProcessContext c) {
            // body irrelevant
        }
    }

    private static class NonGenericOutput<T extends Serializable> extends DoFn<Integer, Output> {
        @ProcessElement public void processElement(ProcessContext c) {
            // body irrelevant
        }
    }

    private static class Consumer<T extends Serializable> extends DoFn<Integer, Void> {
        private final PCollectionView<T> view;

        public Consumer(PCollectionView<T> view) {
            this.view = view;
        }

        @ProcessElement public void processElement(ProcessContext c) {
            // body irrelevant
        }
    }

    // this passes OK
    @Test public void testNonGeneric() {
        PCollection<Integer> input = this.pipeline.apply(Create.of(1));
        PCollectionView<Output> view = input.apply(ParDo.of(new NonGenericOutput<>())).apply("passing", View.asSingleton());
        input.apply(ParDo.of(new Consumer<>(view)).withSideInputs(view));
        this.pipeline.run();
    }

    // this fails on beam 2.0 while it works OK on 0.6.0
    @Test public void testGeneric() {
        PCollection<Integer> input = this.pipeline.apply(Create.of(1));
        PCollectionView<Output<Integer>> view = input.apply(ParDo.of(new GenericOutput<Integer>())).apply("failing", View.asSingleton());
        input.apply(ParDo.of(new Consumer<>(view)).withSideInputs(view));
        this.pipeline.run();
    }
}
java.lang.IllegalArgumentException: unable to serialize 
sertest.SerTest$Consumer@2371aaca
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
        at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
        at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
        at 
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
        at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
        at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
        at 
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
        at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
        at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
        at 
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
        at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
        at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
        at 
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
        at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
        at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
        at 
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
        at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
        at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
        at 
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
        at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
        at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
        at 
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
        at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
        at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
        at 
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
        at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
        at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
        at 
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
        at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
        at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
        at 
org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:49)
        at 
org.apache.beam.sdk.util.SerializableUtils.clone(SerializableUtils.java:90)
        at 
org.apache.beam.sdk.transforms.ParDo$SingleOutput.<init>(ParDo.java:569)
        at org.apache.beam.sdk.transforms.ParDo.of(ParDo.java:434)
        at sertest.SerTest.testGeneric(SerTest.java:60)

Reply via email to