Alexey Diomin created BEAM-1255:
-----------------------------------

             Summary: java.io.NotSerializableException in flink on 
UnboundedSource
                 Key: BEAM-1255
                 URL: https://issues.apache.org/jira/browse/BEAM-1255
             Project: Beam
          Issue Type: Bug
          Components: runner-flink
    Affects Versions: 0.5.0
            Reporter: Alexey Diomin
            Assignee: Maximilian Michels


After introduce new Coders with TypeDescriptor on flink runner we have issue:

{code}
Caused by: java.io.NotSerializableException: 
sun.reflect.generics.reflectiveObjects.TypeVariableImpl
        - element of array (index: 0)
        - array (class "[Ljava.lang.Object;", size: 2)
        - field (class 
"com.google.common.collect.ImmutableList$SerializedForm", name: "elements", 
type: "class [Ljava.lang.Object;")
        - object (class 
"com.google.common.collect.ImmutableList$SerializedForm", 
com.google.common.collect.ImmutableList$SerializedForm@30af5b6b)
        - field (class "com.google.common.reflect.Types$ParameterizedTypeImpl", 
name: "argumentsList", type: "class com.google.common.collect.ImmutableList")
        - object (class 
"com.google.common.reflect.Types$ParameterizedTypeImpl", 
org.apache.beam.sdk.io.UnboundedSource<OutputT, CheckpointMarkT>)
        - field (class "com.google.common.reflect.TypeToken", name: 
"runtimeType", type: "interface java.lang.reflect.Type")
        - object (class "com.google.common.reflect.TypeToken$SimpleTypeToken", 
org.apache.beam.sdk.io.UnboundedSource<OutputT, CheckpointMarkT>)
        - field (class "org.apache.beam.sdk.values.TypeDescriptor", name: 
"token", type: "class com.google.common.reflect.TypeToken")
        - object (class 
"org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper$1",
 org.apache.beam.sdk.io.UnboundedSource<OutputT, CheckpointMarkT>)
        - field (class "org.apache.beam.sdk.coders.SerializableCoder", name: 
"typeDescriptor", type: "class org.apache.beam.sdk.values.TypeDescriptor")
        - object (class "org.apache.beam.sdk.coders.SerializableCoder", 
SerializableCoder)
        - field (class "org.apache.beam.sdk.coders.KvCoder", name: "keyCoder", 
type: "interface org.apache.beam.sdk.coders.Coder")
        - object (class "org.apache.beam.sdk.coders.KvCoder", 
KvCoder(SerializableCoder,AvroCoder))
        - field (class "org.apache.beam.sdk.coders.IterableLikeCoder", name: 
"elementCoder", type: "interface org.apache.beam.sdk.coders.Coder")
        - object (class "org.apache.beam.sdk.coders.ListCoder", 
ListCoder(KvCoder(SerializableCoder,AvroCoder)))
        - field (class 
"org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper",
 name: "checkpointCoder", type: "class org.apache.beam.sdk.coders.ListCoder")
        - root object (class 
"org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper",
 
org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper@b2c5e07)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1182)
{code}

bug introduced after commit:
7b98fa08d14e8121e8885f00a9a9a878b73f81a6

pull request:
https://github.com/apache/beam/pull/1537

Code for reproduce error
{code}
import com.google.common.collect.ImmutableList;
import org.apache.beam.runners.flink.FlinkPipelineOptions;
import org.apache.beam.runners.flink.FlinkRunner;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.kafka.KafkaIO;
import org.apache.beam.sdk.options.PipelineOptionsFactory;

public class FlinkSerialisationError {

    public static void main(String[] args) {
        FlinkPipelineOptions options = 
PipelineOptionsFactory.as(FlinkPipelineOptions.class);
        options.setRunner(FlinkRunner.class);
        options.setStreaming(true);


        Pipeline pipeline = Pipeline.create(options);

        pipeline.apply(
                KafkaIO.read()
                    .withBootstrapServers("localhost:9092")
                    .withTopics(ImmutableList.of("test"))
                    // set ConsumerGroup
                    .withoutMetadata());

        pipeline.run();
    }
}
{code}





--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to