Alexey Diomin created BEAM-1255: ----------------------------------- Summary: java.io.NotSerializableException in flink on UnboundedSource Key: BEAM-1255 URL: https://issues.apache.org/jira/browse/BEAM-1255 Project: Beam Issue Type: Bug Components: runner-flink Affects Versions: 0.5.0 Reporter: Alexey Diomin Assignee: Maximilian Michels
After introduce new Coders with TypeDescriptor on flink runner we have issue: {code} Caused by: java.io.NotSerializableException: sun.reflect.generics.reflectiveObjects.TypeVariableImpl - element of array (index: 0) - array (class "[Ljava.lang.Object;", size: 2) - field (class "com.google.common.collect.ImmutableList$SerializedForm", name: "elements", type: "class [Ljava.lang.Object;") - object (class "com.google.common.collect.ImmutableList$SerializedForm", com.google.common.collect.ImmutableList$SerializedForm@30af5b6b) - field (class "com.google.common.reflect.Types$ParameterizedTypeImpl", name: "argumentsList", type: "class com.google.common.collect.ImmutableList") - object (class "com.google.common.reflect.Types$ParameterizedTypeImpl", org.apache.beam.sdk.io.UnboundedSource<OutputT, CheckpointMarkT>) - field (class "com.google.common.reflect.TypeToken", name: "runtimeType", type: "interface java.lang.reflect.Type") - object (class "com.google.common.reflect.TypeToken$SimpleTypeToken", org.apache.beam.sdk.io.UnboundedSource<OutputT, CheckpointMarkT>) - field (class "org.apache.beam.sdk.values.TypeDescriptor", name: "token", type: "class com.google.common.reflect.TypeToken") - object (class "org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper$1", org.apache.beam.sdk.io.UnboundedSource<OutputT, CheckpointMarkT>) - field (class "org.apache.beam.sdk.coders.SerializableCoder", name: "typeDescriptor", type: "class org.apache.beam.sdk.values.TypeDescriptor") - object (class "org.apache.beam.sdk.coders.SerializableCoder", SerializableCoder) - field (class "org.apache.beam.sdk.coders.KvCoder", name: "keyCoder", type: "interface org.apache.beam.sdk.coders.Coder") - object (class "org.apache.beam.sdk.coders.KvCoder", KvCoder(SerializableCoder,AvroCoder)) - field (class "org.apache.beam.sdk.coders.IterableLikeCoder", name: "elementCoder", type: "interface org.apache.beam.sdk.coders.Coder") - object (class "org.apache.beam.sdk.coders.ListCoder", ListCoder(KvCoder(SerializableCoder,AvroCoder))) - field (class "org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper", name: "checkpointCoder", type: "class org.apache.beam.sdk.coders.ListCoder") - root object (class "org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper", org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper@b2c5e07) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1182) {code} bug introduced after commit: 7b98fa08d14e8121e8885f00a9a9a878b73f81a6 pull request: https://github.com/apache/beam/pull/1537 Code for reproduce error {code} import com.google.common.collect.ImmutableList; import org.apache.beam.runners.flink.FlinkPipelineOptions; import org.apache.beam.runners.flink.FlinkRunner; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.io.kafka.KafkaIO; import org.apache.beam.sdk.options.PipelineOptionsFactory; public class FlinkSerialisationError { public static void main(String[] args) { FlinkPipelineOptions options = PipelineOptionsFactory.as(FlinkPipelineOptions.class); options.setRunner(FlinkRunner.class); options.setStreaming(true); Pipeline pipeline = Pipeline.create(options); pipeline.apply( KafkaIO.read() .withBootstrapServers("localhost:9092") .withTopics(ImmutableList.of("test")) // set ConsumerGroup .withoutMetadata()); pipeline.run(); } } {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)