I tried the 2.0.0rc2 and I started facing weird serialization issue that's not
been happening on 0.6.0. I am able to reproduce it using the attached unit test
- see first serialization attempt which is passing ok and second one slightly
different (transform output type is generic) that's failing just on 2.0.0
Can anyone find a way around it? In my real case I depend on generics output
types.
thanks,
antony.
package sertest;
import org.apache.beam.sdk.coders.DefaultCoder;
import org.apache.beam.sdk.coders.SerializableCoder;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.util.SerializableUtils;
import org.apache.beam.sdk.values.PCollection;
import org.junit.Rule;
import org.junit.Test;
import java.io.Serializable;
public class SerTest {
@Rule public final transient TestPipeline pipeline = TestPipeline.create();
@DefaultCoder(SerializableCoder.class) private static class Output<T extends Serializable> implements Serializable {
// body irrelevant
}
private static class GenericOutput<T extends Serializable> extends DoFn<Integer, Output<T>> {
@ProcessElement public void processElement(ProcessContext c) {
// body irrelevant
}
}
private static class NonGenericOutput<T extends Serializable> extends DoFn<Integer, Output> {
@ProcessElement public void processElement(ProcessContext c) {
// body irrelevant
}
}
// this passes OK
@Test public void testNonGeneric() {
PCollection<Integer> input = this.pipeline.apply(Create.of(1));
SerializableUtils.serializeToByteArray(input.apply(ParDo.of(new NonGenericOutput<>())).apply("passing", View.asSingleton()));
this.pipeline.run();
}
// this fails on beam 2.0 while it works OK on 0.6.0
@Test public void testGeneric() {
PCollection<Integer> input = this.pipeline.apply(Create.of(1));
SerializableUtils.serializeToByteArray(input.apply(ParDo.of(new GenericOutput<>())).apply("failing", View.asSingleton()));
this.pipeline.run();
}
}