[jira] [Commented] (BEAM-1255) java.io.NotSerializableException in flink on UnboundedSource

2017-01-11 Thread Jeremie Lenfant-Engelmann (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-1255?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15819386#comment-15819386
 ] 

Jeremie Lenfant-Engelmann commented on BEAM-1255:
-

The issue here is that TypeDescriptor (which relies on TypeToken) is not 
serializable when a variable type is specified (the  
part).

The simplest path is to drop the variable type part:

Coder> sourceCoder =
(Coder>)
  SerializableCoder.of(new TypeDescriptor() {});

You're losing some information at the TypeDescriptor level about the type 
parameters but you still get the fact that you are encoding an UnboundedSource.

You could extend the SerializableCoder and override the 
getEncodedTypeDescriptor and try to return the UnboundedSource with
the variable type defined, but it would become pretty complicated, it'd become 
something like:

@Override
public TypeDescriptor> 
getEncodedTypeDescriptor() {
  return TypeDescriptor.of(getRecordType())
  .where(new TypeParameter() {}, outputTTypeDescriptor) // You 
need a TypeDescriptor for OutputT
  .where(new TypeParameter() {}, 
checkpointMarkTTypeDescriptor); // You need a TypeDescriptor for CheckpointMarkT
}

You would need type descriptors for your type parameters (which themselves 
might not be serializable), or you could create a type descriptor from the 
generic type itself,  but it wouldn't be really useful since it'd say that it's 
an OutputT for example...

So I think the first solution is simpler...

> java.io.NotSerializableException in flink on UnboundedSource
> 
>
> Key: BEAM-1255
> URL: https://issues.apache.org/jira/browse/BEAM-1255
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink
>Affects Versions: 0.5.0
>Reporter: Alexey Diomin
>Assignee: Maximilian Michels
>
> After introduce new Coders with TypeDescriptor on flink runner we have issue:
> {code}
> Caused by: java.io.NotSerializableException: 
> sun.reflect.generics.reflectiveObjects.TypeVariableImpl
>   - element of array (index: 0)
>   - array (class "[Ljava.lang.Object;", size: 2)
>   - field (class 
> "com.google.common.collect.ImmutableList$SerializedForm", name: "elements", 
> type: "class [Ljava.lang.Object;")
>   - object (class 
> "com.google.common.collect.ImmutableList$SerializedForm", 
> com.google.common.collect.ImmutableList$SerializedForm@30af5b6b)
>   - field (class "com.google.common.reflect.Types$ParameterizedTypeImpl", 
> name: "argumentsList", type: "class com.google.common.collect.ImmutableList")
>   - object (class 
> "com.google.common.reflect.Types$ParameterizedTypeImpl", 
> org.apache.beam.sdk.io.UnboundedSource)
>   - field (class "com.google.common.reflect.TypeToken", name: 
> "runtimeType", type: "interface java.lang.reflect.Type")
>   - object (class "com.google.common.reflect.TypeToken$SimpleTypeToken", 
> org.apache.beam.sdk.io.UnboundedSource)
>   - field (class "org.apache.beam.sdk.values.TypeDescriptor", name: 
> "token", type: "class com.google.common.reflect.TypeToken")
>   - object (class 
> "org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper$1",
>  org.apache.beam.sdk.io.UnboundedSource)
>   - field (class "org.apache.beam.sdk.coders.SerializableCoder", name: 
> "typeDescriptor", type: "class org.apache.beam.sdk.values.TypeDescriptor")
>   - object (class "org.apache.beam.sdk.coders.SerializableCoder", 
> SerializableCoder)
>   - field (class "org.apache.beam.sdk.coders.KvCoder", name: "keyCoder", 
> type: "interface org.apache.beam.sdk.coders.Coder")
>   - object (class "org.apache.beam.sdk.coders.KvCoder", 
> KvCoder(SerializableCoder,AvroCoder))
>   - field (class "org.apache.beam.sdk.coders.IterableLikeCoder", name: 
> "elementCoder", type: "interface org.apache.beam.sdk.coders.Coder")
>   - object (class "org.apache.beam.sdk.coders.ListCoder", 
> ListCoder(KvCoder(SerializableCoder,AvroCoder)))
>   - field (class 
> "org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper",
>  name: "checkpointCoder", type: "class org.apache.beam.sdk.coders.ListCoder")
>   - root object (class 
> "org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper",
>  
> org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper@b2c5e07)
>   at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1182)
> {code}
> bug introduced after commit:
> 7b98fa08d14e8121e8885f00a9a9a878b73f81a6
> pull request:
> https://github.com/apache/beam/pull/1537
> Code for reproduce error
> {code}
> import 

[jira] [Commented] (BEAM-1255) java.io.NotSerializableException in flink on UnboundedSource

2017-01-11 Thread Alexey Diomin (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-1255?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15818082#comment-15818082
 ] 

Alexey Diomin commented on BEAM-1255:
-

This bug relate for serialization of UnboundedSourceWrapper

{code}
@Test
  public void testSerialization() throws Exception {
PipelineOptions options = PipelineOptionsFactory.create();

TestCountingSource source = new TestCountingSource(1);
UnboundedSourceWrapper, 
TestCountingSource.CounterMark> flinkWrapper =
new UnboundedSourceWrapper<>(options, source, 1);

InstantiationUtil.serializeObject(flinkWrapper);
  }
{code}

> java.io.NotSerializableException in flink on UnboundedSource
> 
>
> Key: BEAM-1255
> URL: https://issues.apache.org/jira/browse/BEAM-1255
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink
>Affects Versions: 0.5.0
>Reporter: Alexey Diomin
>Assignee: Maximilian Michels
>
> After introduce new Coders with TypeDescriptor on flink runner we have issue:
> {code}
> Caused by: java.io.NotSerializableException: 
> sun.reflect.generics.reflectiveObjects.TypeVariableImpl
>   - element of array (index: 0)
>   - array (class "[Ljava.lang.Object;", size: 2)
>   - field (class 
> "com.google.common.collect.ImmutableList$SerializedForm", name: "elements", 
> type: "class [Ljava.lang.Object;")
>   - object (class 
> "com.google.common.collect.ImmutableList$SerializedForm", 
> com.google.common.collect.ImmutableList$SerializedForm@30af5b6b)
>   - field (class "com.google.common.reflect.Types$ParameterizedTypeImpl", 
> name: "argumentsList", type: "class com.google.common.collect.ImmutableList")
>   - object (class 
> "com.google.common.reflect.Types$ParameterizedTypeImpl", 
> org.apache.beam.sdk.io.UnboundedSource)
>   - field (class "com.google.common.reflect.TypeToken", name: 
> "runtimeType", type: "interface java.lang.reflect.Type")
>   - object (class "com.google.common.reflect.TypeToken$SimpleTypeToken", 
> org.apache.beam.sdk.io.UnboundedSource)
>   - field (class "org.apache.beam.sdk.values.TypeDescriptor", name: 
> "token", type: "class com.google.common.reflect.TypeToken")
>   - object (class 
> "org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper$1",
>  org.apache.beam.sdk.io.UnboundedSource)
>   - field (class "org.apache.beam.sdk.coders.SerializableCoder", name: 
> "typeDescriptor", type: "class org.apache.beam.sdk.values.TypeDescriptor")
>   - object (class "org.apache.beam.sdk.coders.SerializableCoder", 
> SerializableCoder)
>   - field (class "org.apache.beam.sdk.coders.KvCoder", name: "keyCoder", 
> type: "interface org.apache.beam.sdk.coders.Coder")
>   - object (class "org.apache.beam.sdk.coders.KvCoder", 
> KvCoder(SerializableCoder,AvroCoder))
>   - field (class "org.apache.beam.sdk.coders.IterableLikeCoder", name: 
> "elementCoder", type: "interface org.apache.beam.sdk.coders.Coder")
>   - object (class "org.apache.beam.sdk.coders.ListCoder", 
> ListCoder(KvCoder(SerializableCoder,AvroCoder)))
>   - field (class 
> "org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper",
>  name: "checkpointCoder", type: "class org.apache.beam.sdk.coders.ListCoder")
>   - root object (class 
> "org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper",
>  
> org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper@b2c5e07)
>   at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1182)
> {code}
> bug introduced after commit:
> 7b98fa08d14e8121e8885f00a9a9a878b73f81a6
> pull request:
> https://github.com/apache/beam/pull/1537
> Code for reproduce error
> {code}
> import com.google.common.collect.ImmutableList;
> import org.apache.beam.runners.flink.FlinkPipelineOptions;
> import org.apache.beam.runners.flink.FlinkRunner;
> import org.apache.beam.sdk.Pipeline;
> import org.apache.beam.sdk.io.kafka.KafkaIO;
> import org.apache.beam.sdk.options.PipelineOptionsFactory;
> public class FlinkSerialisationError {
> public static void main(String[] args) {
> FlinkPipelineOptions options = 
> PipelineOptionsFactory.as(FlinkPipelineOptions.class);
> options.setRunner(FlinkRunner.class);
> options.setStreaming(true);
> Pipeline pipeline = Pipeline.create(options);
> pipeline.apply(
> KafkaIO.read()
> .withBootstrapServers("localhost:9092")
> .withTopics(ImmutableList.of("test"))
> // set ConsumerGroup
> .withoutMetadata());
> pipeline.run();