[jira] [Assigned] (BEAM-1273) Error with FlinkPipelineOptions serialization after setStateBackend
[ https://issues.apache.org/jira/browse/BEAM-1273?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Alexey Diomin reassigned BEAM-1273: --- Assignee: Alexey Diomin (was: Aljoscha Krettek) > Error with FlinkPipelineOptions serialization after setStateBackend > --- > > Key: BEAM-1273 > URL: https://issues.apache.org/jira/browse/BEAM-1273 > Project: Beam > Issue Type: Bug > Components: runner-flink >Reporter: Alexey Diomin >Assignee: Alexey Diomin > > Trying setup FlinkPipelineOptions.setStateBackend cause error: > {code} > Caused by: com.fasterxml.jackson.databind.JsonMappingException: Can not > construct instance of org.apache.flink.runtime.state.AbstractStateBackend: > abstract types either need to be mapped to concrete types, have custom > deserializer, or contain additional type information. > {code} > Exception was thrown in SerializedPipelineOptions. > Main problem then AbstractStateBackend and their implementation can't be > mapped in JSON schema for serialization. > Error starting after: > [BEAM-617][flink] introduce option to set state backend -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (BEAM-1273) Error with FlinkPipelineOptions serialization after setStateBackend
Alexey Diomin created BEAM-1273: --- Summary: Error with FlinkPipelineOptions serialization after setStateBackend Key: BEAM-1273 URL: https://issues.apache.org/jira/browse/BEAM-1273 Project: Beam Issue Type: Bug Components: runner-flink Reporter: Alexey Diomin Assignee: Aljoscha Krettek Trying setup FlinkPipelineOptions.setStateBackend cause error: {code} Caused by: com.fasterxml.jackson.databind.JsonMappingException: Can not construct instance of org.apache.flink.runtime.state.AbstractStateBackend: abstract types either need to be mapped to concrete types, have custom deserializer, or contain additional type information. {code} Exception was thrown in SerializedPipelineOptions. Main problem then AbstractStateBackend and their implementation can't be mapped in JSON schema for serialization. Error starting after: [BEAM-617][flink] introduce option to set state backend -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Assigned] (BEAM-1255) java.io.NotSerializableException in flink on UnboundedSource
[ https://issues.apache.org/jira/browse/BEAM-1255?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Alexey Diomin reassigned BEAM-1255: --- Assignee: Alexey Diomin (was: Maximilian Michels) > java.io.NotSerializableException in flink on UnboundedSource > > > Key: BEAM-1255 > URL: https://issues.apache.org/jira/browse/BEAM-1255 > Project: Beam > Issue Type: Bug > Components: runner-flink >Affects Versions: 0.5.0 >Reporter: Alexey Diomin >Assignee: Alexey Diomin > > After introduce new Coders with TypeDescriptor on flink runner we have issue: > {code} > Caused by: java.io.NotSerializableException: > sun.reflect.generics.reflectiveObjects.TypeVariableImpl > - element of array (index: 0) > - array (class "[Ljava.lang.Object;", size: 2) > - field (class > "com.google.common.collect.ImmutableList$SerializedForm", name: "elements", > type: "class [Ljava.lang.Object;") > - object (class > "com.google.common.collect.ImmutableList$SerializedForm", > com.google.common.collect.ImmutableList$SerializedForm@30af5b6b) > - field (class "com.google.common.reflect.Types$ParameterizedTypeImpl", > name: "argumentsList", type: "class com.google.common.collect.ImmutableList") > - object (class > "com.google.common.reflect.Types$ParameterizedTypeImpl", > org.apache.beam.sdk.io.UnboundedSource) > - field (class "com.google.common.reflect.TypeToken", name: > "runtimeType", type: "interface java.lang.reflect.Type") > - object (class "com.google.common.reflect.TypeToken$SimpleTypeToken", > org.apache.beam.sdk.io.UnboundedSource ) > - field (class "org.apache.beam.sdk.values.TypeDescriptor", name: > "token", type: "class com.google.common.reflect.TypeToken") > - object (class > "org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper$1", > org.apache.beam.sdk.io.UnboundedSource ) > - field (class "org.apache.beam.sdk.coders.SerializableCoder", name: > "typeDescriptor", type: "class org.apache.beam.sdk.values.TypeDescriptor") > - object (class "org.apache.beam.sdk.coders.SerializableCoder", > SerializableCoder) > - field (class "org.apache.beam.sdk.coders.KvCoder", name: "keyCoder", > type: "interface org.apache.beam.sdk.coders.Coder") > - object (class "org.apache.beam.sdk.coders.KvCoder", > KvCoder(SerializableCoder,AvroCoder)) > - field (class "org.apache.beam.sdk.coders.IterableLikeCoder", name: > "elementCoder", type: "interface org.apache.beam.sdk.coders.Coder") > - object (class "org.apache.beam.sdk.coders.ListCoder", > ListCoder(KvCoder(SerializableCoder,AvroCoder))) > - field (class > "org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper", > name: "checkpointCoder", type: "class org.apache.beam.sdk.coders.ListCoder") > - root object (class > "org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper", > > org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper@b2c5e07) > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1182) > {code} > bug introduced after commit: > 7b98fa08d14e8121e8885f00a9a9a878b73f81a6 > pull request: > https://github.com/apache/beam/pull/1537 > Code for reproduce error > {code} > import com.google.common.collect.ImmutableList; > import org.apache.beam.runners.flink.FlinkPipelineOptions; > import org.apache.beam.runners.flink.FlinkRunner; > import org.apache.beam.sdk.Pipeline; > import org.apache.beam.sdk.io.kafka.KafkaIO; > import org.apache.beam.sdk.options.PipelineOptionsFactory; > public class FlinkSerialisationError { > public static void main(String[] args) { > FlinkPipelineOptions options = > PipelineOptionsFactory.as(FlinkPipelineOptions.class); > options.setRunner(FlinkRunner.class); > options.setStreaming(true); > Pipeline pipeline = Pipeline.create(options); > pipeline.apply( > KafkaIO.read() > .withBootstrapServers("localhost:9092") > .withTopics(ImmutableList.of("test")) > // set ConsumerGroup > .withoutMetadata()); > pipeline.run(); > } > } > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (BEAM-1255) java.io.NotSerializableException in flink on UnboundedSource
[ https://issues.apache.org/jira/browse/BEAM-1255?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15818082#comment-15818082 ] Alexey Diomin edited comment on BEAM-1255 at 1/11/17 12:20 PM: --- This bug relate for serialization of UnboundedSourceWrapper {code} @Test public void testSerialization() throws Exception { PipelineOptions options = PipelineOptionsFactory.create(); TestCountingSource source = new TestCountingSource(1); UnboundedSourceWrapper, TestCountingSource.CounterMark> flinkWrapper = new UnboundedSourceWrapper<>(options, source, 1); InstantiationUtil.serializeObject(flinkWrapper); } {code} UnboundedSourceWrapper.java:147 {code} Coder> sourceCoder = SerializableCoder.of(new TypeDescriptor >() { }); {code} new TypeDescriptor >() {}; produce not serializable object was (Author: humanoid): This bug relate for serialization of UnboundedSourceWrapper {code} @Test public void testSerialization() throws Exception { PipelineOptions options = PipelineOptionsFactory.create(); TestCountingSource source = new TestCountingSource(1); UnboundedSourceWrapper , TestCountingSource.CounterMark> flinkWrapper = new UnboundedSourceWrapper<>(options, source, 1); InstantiationUtil.serializeObject(flinkWrapper); } {code} > java.io.NotSerializableException in flink on UnboundedSource > > > Key: BEAM-1255 > URL: https://issues.apache.org/jira/browse/BEAM-1255 > Project: Beam > Issue Type: Bug > Components: runner-flink >Affects Versions: 0.5.0 >Reporter: Alexey Diomin >Assignee: Maximilian Michels > > After introduce new Coders with TypeDescriptor on flink runner we have issue: > {code} > Caused by: java.io.NotSerializableException: > sun.reflect.generics.reflectiveObjects.TypeVariableImpl > - element of array (index: 0) > - array (class "[Ljava.lang.Object;", size: 2) > - field (class > "com.google.common.collect.ImmutableList$SerializedForm", name: "elements", > type: "class [Ljava.lang.Object;") > - object (class > "com.google.common.collect.ImmutableList$SerializedForm", > com.google.common.collect.ImmutableList$SerializedForm@30af5b6b) > - field (class "com.google.common.reflect.Types$ParameterizedTypeImpl", > name: "argumentsList", type: "class com.google.common.collect.ImmutableList") > - object (class > "com.google.common.reflect.Types$ParameterizedTypeImpl", > org.apache.beam.sdk.io.UnboundedSource ) > - field (class "com.google.common.reflect.TypeToken", name: > "runtimeType", type: "interface java.lang.reflect.Type") > - object (class "com.google.common.reflect.TypeToken$SimpleTypeToken", > org.apache.beam.sdk.io.UnboundedSource ) > - field (class "org.apache.beam.sdk.values.TypeDescriptor", name: > "token", type: "class com.google.common.reflect.TypeToken") > - object (class > "org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper$1", > org.apache.beam.sdk.io.UnboundedSource ) > - field (class "org.apache.beam.sdk.coders.SerializableCoder", name: > "typeDescriptor", type: "class org.apache.beam.sdk.values.TypeDescriptor") > - object (class "org.apache.beam.sdk.coders.SerializableCoder", > SerializableCoder) > - field (class "org.apache.beam.sdk.coders.KvCoder", name: "keyCoder", > type: "interface org.apache.beam.sdk.coders.Coder") > - object (class "org.apache.beam.sdk.coders.KvCoder", > KvCoder(SerializableCoder,AvroCoder)) > - field (class "org.apache.beam.sdk.coders.IterableLikeCoder", name: > "elementCoder", type: "interface org.apache.beam.sdk.coders.Coder") > - object (class "org.apache.beam.sdk.coders.ListCoder", > ListCoder(KvCoder(SerializableCoder,AvroCoder))) > - field (class > "org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper", > name: "checkpointCoder", type: "class org.apache.beam.sdk.coders.ListCoder") > - root object (class > "org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper", > > org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper@b2c5e07) > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1182) > {code} > bug introduced after commit: > 7b98fa08d14e8121e8885f00a9a9a878b73f81a6 > pull request: > https://github.com/apache/beam/pull/1537 > Code for reproduce error > {code} > import com.google.common.collect.ImmutableList; > import
[jira] [Commented] (BEAM-1255) java.io.NotSerializableException in flink on UnboundedSource
[ https://issues.apache.org/jira/browse/BEAM-1255?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15818082#comment-15818082 ] Alexey Diomin commented on BEAM-1255: - This bug relate for serialization of UnboundedSourceWrapper {code} @Test public void testSerialization() throws Exception { PipelineOptions options = PipelineOptionsFactory.create(); TestCountingSource source = new TestCountingSource(1); UnboundedSourceWrapper, TestCountingSource.CounterMark> flinkWrapper = new UnboundedSourceWrapper<>(options, source, 1); InstantiationUtil.serializeObject(flinkWrapper); } {code} > java.io.NotSerializableException in flink on UnboundedSource > > > Key: BEAM-1255 > URL: https://issues.apache.org/jira/browse/BEAM-1255 > Project: Beam > Issue Type: Bug > Components: runner-flink >Affects Versions: 0.5.0 >Reporter: Alexey Diomin >Assignee: Maximilian Michels > > After introduce new Coders with TypeDescriptor on flink runner we have issue: > {code} > Caused by: java.io.NotSerializableException: > sun.reflect.generics.reflectiveObjects.TypeVariableImpl > - element of array (index: 0) > - array (class "[Ljava.lang.Object;", size: 2) > - field (class > "com.google.common.collect.ImmutableList$SerializedForm", name: "elements", > type: "class [Ljava.lang.Object;") > - object (class > "com.google.common.collect.ImmutableList$SerializedForm", > com.google.common.collect.ImmutableList$SerializedForm@30af5b6b) > - field (class "com.google.common.reflect.Types$ParameterizedTypeImpl", > name: "argumentsList", type: "class com.google.common.collect.ImmutableList") > - object (class > "com.google.common.reflect.Types$ParameterizedTypeImpl", > org.apache.beam.sdk.io.UnboundedSource ) > - field (class "com.google.common.reflect.TypeToken", name: > "runtimeType", type: "interface java.lang.reflect.Type") > - object (class "com.google.common.reflect.TypeToken$SimpleTypeToken", > org.apache.beam.sdk.io.UnboundedSource ) > - field (class "org.apache.beam.sdk.values.TypeDescriptor", name: > "token", type: "class com.google.common.reflect.TypeToken") > - object (class > "org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper$1", > org.apache.beam.sdk.io.UnboundedSource ) > - field (class "org.apache.beam.sdk.coders.SerializableCoder", name: > "typeDescriptor", type: "class org.apache.beam.sdk.values.TypeDescriptor") > - object (class "org.apache.beam.sdk.coders.SerializableCoder", > SerializableCoder) > - field (class "org.apache.beam.sdk.coders.KvCoder", name: "keyCoder", > type: "interface org.apache.beam.sdk.coders.Coder") > - object (class "org.apache.beam.sdk.coders.KvCoder", > KvCoder(SerializableCoder,AvroCoder)) > - field (class "org.apache.beam.sdk.coders.IterableLikeCoder", name: > "elementCoder", type: "interface org.apache.beam.sdk.coders.Coder") > - object (class "org.apache.beam.sdk.coders.ListCoder", > ListCoder(KvCoder(SerializableCoder,AvroCoder))) > - field (class > "org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper", > name: "checkpointCoder", type: "class org.apache.beam.sdk.coders.ListCoder") > - root object (class > "org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper", > > org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper@b2c5e07) > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1182) > {code} > bug introduced after commit: > 7b98fa08d14e8121e8885f00a9a9a878b73f81a6 > pull request: > https://github.com/apache/beam/pull/1537 > Code for reproduce error > {code} > import com.google.common.collect.ImmutableList; > import org.apache.beam.runners.flink.FlinkPipelineOptions; > import org.apache.beam.runners.flink.FlinkRunner; > import org.apache.beam.sdk.Pipeline; > import org.apache.beam.sdk.io.kafka.KafkaIO; > import org.apache.beam.sdk.options.PipelineOptionsFactory; > public class FlinkSerialisationError { > public static void main(String[] args) { > FlinkPipelineOptions options = > PipelineOptionsFactory.as(FlinkPipelineOptions.class); > options.setRunner(FlinkRunner.class); > options.setStreaming(true); > Pipeline pipeline = Pipeline.create(options); > pipeline.apply( > KafkaIO.read() > .withBootstrapServers("localhost:9092") > .withTopics(ImmutableList.of("test")) > // set ConsumerGroup > .withoutMetadata()); > pipeline.run();
[jira] [Commented] (BEAM-1227) Release 0.4.0
[ https://issues.apache.org/jira/browse/BEAM-1227?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15815919#comment-15815919 ] Alexey Diomin commented on BEAM-1227: - Maybe we can close this task ;) > Release 0.4.0 > - > > Key: BEAM-1227 > URL: https://issues.apache.org/jira/browse/BEAM-1227 > Project: Beam > Issue Type: Task > Components: project-management >Affects Versions: 0.4.0 >Reporter: Daniel Halperin >Assignee: Jean-Baptiste Onofré > > Umbrella bug for the 0.4.0 (incubating or not) release. > JB is the release manager. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (BEAM-1255) java.io.NotSerializableException in flink on UnboundedSource
Alexey Diomin created BEAM-1255: --- Summary: java.io.NotSerializableException in flink on UnboundedSource Key: BEAM-1255 URL: https://issues.apache.org/jira/browse/BEAM-1255 Project: Beam Issue Type: Bug Components: runner-flink Affects Versions: 0.5.0 Reporter: Alexey Diomin Assignee: Maximilian Michels After introduce new Coders with TypeDescriptor on flink runner we have issue: {code} Caused by: java.io.NotSerializableException: sun.reflect.generics.reflectiveObjects.TypeVariableImpl - element of array (index: 0) - array (class "[Ljava.lang.Object;", size: 2) - field (class "com.google.common.collect.ImmutableList$SerializedForm", name: "elements", type: "class [Ljava.lang.Object;") - object (class "com.google.common.collect.ImmutableList$SerializedForm", com.google.common.collect.ImmutableList$SerializedForm@30af5b6b) - field (class "com.google.common.reflect.Types$ParameterizedTypeImpl", name: "argumentsList", type: "class com.google.common.collect.ImmutableList") - object (class "com.google.common.reflect.Types$ParameterizedTypeImpl", org.apache.beam.sdk.io.UnboundedSource) - field (class "com.google.common.reflect.TypeToken", name: "runtimeType", type: "interface java.lang.reflect.Type") - object (class "com.google.common.reflect.TypeToken$SimpleTypeToken", org.apache.beam.sdk.io.UnboundedSource ) - field (class "org.apache.beam.sdk.values.TypeDescriptor", name: "token", type: "class com.google.common.reflect.TypeToken") - object (class "org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper$1", org.apache.beam.sdk.io.UnboundedSource ) - field (class "org.apache.beam.sdk.coders.SerializableCoder", name: "typeDescriptor", type: "class org.apache.beam.sdk.values.TypeDescriptor") - object (class "org.apache.beam.sdk.coders.SerializableCoder", SerializableCoder) - field (class "org.apache.beam.sdk.coders.KvCoder", name: "keyCoder", type: "interface org.apache.beam.sdk.coders.Coder") - object (class "org.apache.beam.sdk.coders.KvCoder", KvCoder(SerializableCoder,AvroCoder)) - field (class "org.apache.beam.sdk.coders.IterableLikeCoder", name: "elementCoder", type: "interface org.apache.beam.sdk.coders.Coder") - object (class "org.apache.beam.sdk.coders.ListCoder", ListCoder(KvCoder(SerializableCoder,AvroCoder))) - field (class "org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper", name: "checkpointCoder", type: "class org.apache.beam.sdk.coders.ListCoder") - root object (class "org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper", org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper@b2c5e07) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1182) {code} bug introduced after commit: 7b98fa08d14e8121e8885f00a9a9a878b73f81a6 pull request: https://github.com/apache/beam/pull/1537 Code for reproduce error {code} import com.google.common.collect.ImmutableList; import org.apache.beam.runners.flink.FlinkPipelineOptions; import org.apache.beam.runners.flink.FlinkRunner; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.io.kafka.KafkaIO; import org.apache.beam.sdk.options.PipelineOptionsFactory; public class FlinkSerialisationError { public static void main(String[] args) { FlinkPipelineOptions options = PipelineOptionsFactory.as(FlinkPipelineOptions.class); options.setRunner(FlinkRunner.class); options.setStreaming(true); Pipeline pipeline = Pipeline.create(options); pipeline.apply( KafkaIO.read() .withBootstrapServers("localhost:9092") .withTopics(ImmutableList.of("test")) // set ConsumerGroup .withoutMetadata()); pipeline.run(); } } {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (BEAM-106) Native support for conditional iteration
[ https://issues.apache.org/jira/browse/BEAM-106?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Alexey Diomin updated BEAM-106: --- Description: Ported from: https://github.com/GoogleCloudPlatform/DataflowJavaSDK/issues/50 There are a variety of use cases which would benefit from native support for conditional iteration. For instance, http://stackoverflow.com/questions/31654421/conditional-iterations-in-google-cloud-dataflow/31659923?noredirect=1#comment51264604_31659923 asks about being able to write a loop like the following: {code} PCollection data = ... while(needsMoreWork(data)) { data = doAStep(data) } {code} If there are specific use cases please let us know the details. In the future we will use this issue to post progress updates. was: Ported from: https://github.com/GoogleCloudPlatform/DataflowJavaSDK/issues/50 There are a variety of use cases which would benefit from native support for conditional iteration. For instance, http://stackoverflow.com/questions/31654421/conditional-iterations-in-google-cloud-dataflow/31659923?noredirect=1#comment51264604_31659923 asks about being able to write a loop like the following: PCollection data = ... while(needsMoreWork(data)) { data = doAStep(data) } If there are specific use cases please let us know the details. In the future we will use this issue to post progress updates. > Native support for conditional iteration > > > Key: BEAM-106 > URL: https://issues.apache.org/jira/browse/BEAM-106 > Project: Beam > Issue Type: New Feature > Components: sdk-ideas >Reporter: Luke Cwik >Assignee: James Malone > > Ported from: https://github.com/GoogleCloudPlatform/DataflowJavaSDK/issues/50 > There are a variety of use cases which would benefit from native support for > conditional iteration. > For instance, > http://stackoverflow.com/questions/31654421/conditional-iterations-in-google-cloud-dataflow/31659923?noredirect=1#comment51264604_31659923 > asks about being able to write a loop like the following: > {code} > PCollection data = ... > while(needsMoreWork(data)) { > data = doAStep(data) > } > {code} > If there are specific use cases please let us know the details. In the future > we will use this issue to post progress updates. -- This message was sent by Atlassian JIRA (v6.3.4#6332)