[jira] [Commented] (FLINK-1392) Serializing Protobuf - issue 1
[ https://issues.apache.org/jira/browse/FLINK-1392?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14274943#comment-14274943 ] Robert Metzger commented on FLINK-1392: --- Thank you for reporting the issue. I think FLINK-1393 is caused by the same root cause, but the java serialization is swallowing the exception. I think I found a solution for serializing Protobuf through Kryo: https://github.com/twitter/chill/blob/develop/chill-protobuf/src/main/java/com/twitter/chill/protobuf/ProtobufSerializer.java I'll create you branch that you can try out. > Serializing Protobuf - issue 1 > -- > > Key: FLINK-1392 > URL: https://issues.apache.org/jira/browse/FLINK-1392 > Project: Flink > Issue Type: Bug >Reporter: Felix Neutatz >Priority: Minor > > Hi, I started to experiment with Parquet using Protobuf. > When I use the standard Protobuf class: > com.twitter.data.proto.tutorial.AddressBookProtos > The code which I run, can be found here: > [https://github.com/FelixNeutatz/incubator-flink/blob/ParquetAtFlink/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/example/ParquetProtobufOutput.java] > I get the following exception: > {code:xml} > Exception in thread "main" java.lang.Exception: Deserializing the > InputFormat (org.apache.flink.api.java.io.CollectionInputFormat) failed: > Could not read the user code wrapper: Error while deserializing element from > collection > at > org.apache.flink.runtime.jobgraph.InputFormatVertex.initializeOnMaster(InputFormatVertex.java:60) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1$$anonfun$applyOrElse$5.apply(JobManager.scala:179) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1$$anonfun$applyOrElse$5.apply(JobManager.scala:172) > at scala.collection.Iterator$class.foreach(Iterator.scala:727) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:172) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) > at > org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:34) > at > org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:27) > at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118) > at > org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:27) > at akka.actor.Actor$class.aroundReceive(Actor.scala:465) > at > org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:52) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) > at akka.actor.ActorCell.invoke(ActorCell.scala:487) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254) > at akka.dispatch.Mailbox.run(Mailbox.scala:221) > at akka.dispatch.Mailbox.exec(Mailbox.scala:231) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Caused by: > org.apache.flink.runtime.operators.util.CorruptConfigurationException: Could > not read the user code wrapper: Error while deserializing element from > collection > at > org.apache.flink.runtime.operators.util.TaskConfig.getStubWrapper(TaskConfig.java:285) > at > org.apache.flink.runtime.jobgraph.InputFormatVertex.initializeOnMaster(InputFormatVertex.java:57) > ... 25 more > Caused by: java.io.IOException: Error while deserializing element from > collection > at > org.apache.flink.api.java.io.CollectionInputFormat.readObject(CollectionInputFormat.java:108) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:606) > at > java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017) > at
[jira] [Commented] (FLINK-1392) Serializing Protobuf - issue 1
[ https://issues.apache.org/jira/browse/FLINK-1392?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14274963#comment-14274963 ] Robert Metzger commented on FLINK-1392: --- Hi Felix, could you try and see if the changes in my branch "flink1392" (https://github.com/rmetzger/flink/tree/flink1392) resolves the two issues? > Serializing Protobuf - issue 1 > -- > > Key: FLINK-1392 > URL: https://issues.apache.org/jira/browse/FLINK-1392 > Project: Flink > Issue Type: Bug >Reporter: Felix Neutatz >Priority: Minor > > Hi, I started to experiment with Parquet using Protobuf. > When I use the standard Protobuf class: > com.twitter.data.proto.tutorial.AddressBookProtos > The code which I run, can be found here: > [https://github.com/FelixNeutatz/incubator-flink/blob/ParquetAtFlink/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/example/ParquetProtobufOutput.java] > I get the following exception: > {code:xml} > Exception in thread "main" java.lang.Exception: Deserializing the > InputFormat (org.apache.flink.api.java.io.CollectionInputFormat) failed: > Could not read the user code wrapper: Error while deserializing element from > collection > at > org.apache.flink.runtime.jobgraph.InputFormatVertex.initializeOnMaster(InputFormatVertex.java:60) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1$$anonfun$applyOrElse$5.apply(JobManager.scala:179) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1$$anonfun$applyOrElse$5.apply(JobManager.scala:172) > at scala.collection.Iterator$class.foreach(Iterator.scala:727) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:172) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) > at > org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:34) > at > org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:27) > at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118) > at > org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:27) > at akka.actor.Actor$class.aroundReceive(Actor.scala:465) > at > org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:52) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) > at akka.actor.ActorCell.invoke(ActorCell.scala:487) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254) > at akka.dispatch.Mailbox.run(Mailbox.scala:221) > at akka.dispatch.Mailbox.exec(Mailbox.scala:231) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Caused by: > org.apache.flink.runtime.operators.util.CorruptConfigurationException: Could > not read the user code wrapper: Error while deserializing element from > collection > at > org.apache.flink.runtime.operators.util.TaskConfig.getStubWrapper(TaskConfig.java:285) > at > org.apache.flink.runtime.jobgraph.InputFormatVertex.initializeOnMaster(InputFormatVertex.java:57) > ... 25 more > Caused by: java.io.IOException: Error while deserializing element from > collection > at > org.apache.flink.api.java.io.CollectionInputFormat.readObject(CollectionInputFormat.java:108) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:606) > at > java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:135
[jira] [Commented] (FLINK-1392) Serializing Protobuf - issue 1
[ https://issues.apache.org/jira/browse/FLINK-1392?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14275146#comment-14275146 ] Felix Neutatz commented on FLINK-1392: -- The second issue was fixed by your branch, but the standard example still fails: {code:xml} Exception in thread "main" org.apache.flink.compiler.CompilerException: Error translating node 'Data Source "at main(ParquetProtobufOutput.java:69) (org.apache.flink.api.java.io.CollectionInputFormat)" : NONE [[ GlobalProperties [partitioning=RANDOM] ]] [[ LocalProperties [ordering=null, grouped=null, unique=null] ]]': This is supposed to be overridden by subclasses. at org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.preVisit(NepheleJobGraphGenerator.java:361) at org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.preVisit(NepheleJobGraphGenerator.java:99) at org.apache.flink.compiler.plan.SourcePlanNode.accept(SourcePlanNode.java:83) at org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199) at org.apache.flink.compiler.plan.OptimizedPlan.accept(OptimizedPlan.java:165) at org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.compileJobGraph(NepheleJobGraphGenerator.java:170) at org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:169) at org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:51) at org.apache.flink.hadoopcompatibility.mapreduce.example.ParquetProtobufOutput.main(ParquetProtobufOutput.java:88) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at com.intellij.rt.execution.application.AppMain.main(AppMain.java:134) Caused by: java.lang.UnsupportedOperationException: This is supposed to be overridden by subclasses. at com.google.protobuf.GeneratedMessage.getUnknownFields(GeneratedMessage.java:180) at com.twitter.data.proto.tutorial.AddressBookProtos$Person$PhoneNumber.getSerializedSize(AddressBookProtos.java:261) at com.google.protobuf.CodedOutputStream.computeMessageSizeNoTag(CodedOutputStream.java:749) at com.google.protobuf.CodedOutputStream.computeMessageSize(CodedOutputStream.java:530) at com.twitter.data.proto.tutorial.AddressBookProtos$Person.getSerializedSize(AddressBookProtos.java:747) at com.google.protobuf.AbstractMessageLite.toByteArray(AbstractMessageLite.java:62) at com.twitter.chill.protobuf.ProtobufSerializer.write(ProtobufSerializer.java:63) at com.twitter.chill.protobuf.ProtobufSerializer.write(ProtobufSerializer.java:40) at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:599) at org.apache.flink.api.java.typeutils.runtime.KryoSerializer.serialize(KryoSerializer.java:124) at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:96) at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:30) at org.apache.flink.api.java.io.CollectionInputFormat.writeObject(CollectionInputFormat.java:88) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1495) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:286) at org.apache.flink.util.InstantiationUtil.writeObjectToConfig(InstantiationUtil.java:240) at org.apache.flink.runtime.operators.util.TaskConfig.setStubWrapper(TaskConfig.java:272) at org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.createDataSourceVertex(NepheleJobGraphGenerator.java:854)
[jira] [Commented] (FLINK-1392) Serializing Protobuf - issue 1
[ https://issues.apache.org/jira/browse/FLINK-1392?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14275163#comment-14275163 ] Robert Metzger commented on FLINK-1392: --- Maybe there is a protobuf version mismatch http://stackoverflow.com/questions/15908268/error-with-serialization-with-protobuf Which version are you using? > Serializing Protobuf - issue 1 > -- > > Key: FLINK-1392 > URL: https://issues.apache.org/jira/browse/FLINK-1392 > Project: Flink > Issue Type: Bug >Reporter: Felix Neutatz >Assignee: Robert Metzger >Priority: Minor > > Hi, I started to experiment with Parquet using Protobuf. > When I use the standard Protobuf class: > com.twitter.data.proto.tutorial.AddressBookProtos > The code which I run, can be found here: > [https://github.com/FelixNeutatz/incubator-flink/blob/ParquetAtFlink/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/example/ParquetProtobufOutput.java] > I get the following exception: > {code:xml} > Exception in thread "main" java.lang.Exception: Deserializing the > InputFormat (org.apache.flink.api.java.io.CollectionInputFormat) failed: > Could not read the user code wrapper: Error while deserializing element from > collection > at > org.apache.flink.runtime.jobgraph.InputFormatVertex.initializeOnMaster(InputFormatVertex.java:60) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1$$anonfun$applyOrElse$5.apply(JobManager.scala:179) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1$$anonfun$applyOrElse$5.apply(JobManager.scala:172) > at scala.collection.Iterator$class.foreach(Iterator.scala:727) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:172) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) > at > org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:34) > at > org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:27) > at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118) > at > org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:27) > at akka.actor.Actor$class.aroundReceive(Actor.scala:465) > at > org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:52) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) > at akka.actor.ActorCell.invoke(ActorCell.scala:487) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254) > at akka.dispatch.Mailbox.run(Mailbox.scala:221) > at akka.dispatch.Mailbox.exec(Mailbox.scala:231) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Caused by: > org.apache.flink.runtime.operators.util.CorruptConfigurationException: Could > not read the user code wrapper: Error while deserializing element from > collection > at > org.apache.flink.runtime.operators.util.TaskConfig.getStubWrapper(TaskConfig.java:285) > at > org.apache.flink.runtime.jobgraph.InputFormatVertex.initializeOnMaster(InputFormatVertex.java:57) > ... 25 more > Caused by: java.io.IOException: Error while deserializing element from > collection > at > org.apache.flink.api.java.io.CollectionInputFormat.readObject(CollectionInputFormat.java:108) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:606) > at > java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) > at java.io.ObjectInputS
[jira] [Commented] (FLINK-1392) Serializing Protobuf - issue 1
[ https://issues.apache.org/jira/browse/FLINK-1392?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14275170#comment-14275170 ] Felix Neutatz commented on FLINK-1392: -- Yes, that was the problem. I didn't use 2.5 in this case ... Thank you very much for the super quick fix :) :) > Serializing Protobuf - issue 1 > -- > > Key: FLINK-1392 > URL: https://issues.apache.org/jira/browse/FLINK-1392 > Project: Flink > Issue Type: Bug >Reporter: Felix Neutatz >Assignee: Robert Metzger >Priority: Minor > > Hi, I started to experiment with Parquet using Protobuf. > When I use the standard Protobuf class: > com.twitter.data.proto.tutorial.AddressBookProtos > The code which I run, can be found here: > [https://github.com/FelixNeutatz/incubator-flink/blob/ParquetAtFlink/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/example/ParquetProtobufOutput.java] > I get the following exception: > {code:xml} > Exception in thread "main" java.lang.Exception: Deserializing the > InputFormat (org.apache.flink.api.java.io.CollectionInputFormat) failed: > Could not read the user code wrapper: Error while deserializing element from > collection > at > org.apache.flink.runtime.jobgraph.InputFormatVertex.initializeOnMaster(InputFormatVertex.java:60) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1$$anonfun$applyOrElse$5.apply(JobManager.scala:179) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1$$anonfun$applyOrElse$5.apply(JobManager.scala:172) > at scala.collection.Iterator$class.foreach(Iterator.scala:727) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:172) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) > at > org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:34) > at > org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:27) > at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118) > at > org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:27) > at akka.actor.Actor$class.aroundReceive(Actor.scala:465) > at > org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:52) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) > at akka.actor.ActorCell.invoke(ActorCell.scala:487) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254) > at akka.dispatch.Mailbox.run(Mailbox.scala:221) > at akka.dispatch.Mailbox.exec(Mailbox.scala:231) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Caused by: > org.apache.flink.runtime.operators.util.CorruptConfigurationException: Could > not read the user code wrapper: Error while deserializing element from > collection > at > org.apache.flink.runtime.operators.util.TaskConfig.getStubWrapper(TaskConfig.java:285) > at > org.apache.flink.runtime.jobgraph.InputFormatVertex.initializeOnMaster(InputFormatVertex.java:57) > ... 25 more > Caused by: java.io.IOException: Error while deserializing element from > collection > at > org.apache.flink.api.java.io.CollectionInputFormat.readObject(CollectionInputFormat.java:108) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:606) > at > java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
[jira] [Commented] (FLINK-1392) Serializing Protobuf - issue 1
[ https://issues.apache.org/jira/browse/FLINK-1392?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14275194#comment-14275194 ] Robert Metzger commented on FLINK-1392: --- Cool. I'm finalizing the patch and put it into the 0.8.1 and 0.9 release. > Serializing Protobuf - issue 1 > -- > > Key: FLINK-1392 > URL: https://issues.apache.org/jira/browse/FLINK-1392 > Project: Flink > Issue Type: Bug >Reporter: Felix Neutatz >Assignee: Robert Metzger >Priority: Minor > > Hi, I started to experiment with Parquet using Protobuf. > When I use the standard Protobuf class: > com.twitter.data.proto.tutorial.AddressBookProtos > The code which I run, can be found here: > [https://github.com/FelixNeutatz/incubator-flink/blob/ParquetAtFlink/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/example/ParquetProtobufOutput.java] > I get the following exception: > {code:xml} > Exception in thread "main" java.lang.Exception: Deserializing the > InputFormat (org.apache.flink.api.java.io.CollectionInputFormat) failed: > Could not read the user code wrapper: Error while deserializing element from > collection > at > org.apache.flink.runtime.jobgraph.InputFormatVertex.initializeOnMaster(InputFormatVertex.java:60) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1$$anonfun$applyOrElse$5.apply(JobManager.scala:179) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1$$anonfun$applyOrElse$5.apply(JobManager.scala:172) > at scala.collection.Iterator$class.foreach(Iterator.scala:727) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:172) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) > at > org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:34) > at > org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:27) > at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118) > at > org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:27) > at akka.actor.Actor$class.aroundReceive(Actor.scala:465) > at > org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:52) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) > at akka.actor.ActorCell.invoke(ActorCell.scala:487) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254) > at akka.dispatch.Mailbox.run(Mailbox.scala:221) > at akka.dispatch.Mailbox.exec(Mailbox.scala:231) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Caused by: > org.apache.flink.runtime.operators.util.CorruptConfigurationException: Could > not read the user code wrapper: Error while deserializing element from > collection > at > org.apache.flink.runtime.operators.util.TaskConfig.getStubWrapper(TaskConfig.java:285) > at > org.apache.flink.runtime.jobgraph.InputFormatVertex.initializeOnMaster(InputFormatVertex.java:57) > ... 25 more > Caused by: java.io.IOException: Error while deserializing element from > collection > at > org.apache.flink.api.java.io.CollectionInputFormat.readObject(CollectionInputFormat.java:108) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:606) > at > java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > at > java.io.ObjectInputSt
[jira] [Commented] (FLINK-1392) Serializing Protobuf - issue 1
[ https://issues.apache.org/jira/browse/FLINK-1392?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14288929#comment-14288929 ] ASF GitHub Bot commented on FLINK-1392: --- Github user StephanEwen commented on the pull request: https://github.com/apache/flink/pull/322#issuecomment-71158115 Per the discussion on the dev mailing list (http://mail-archives.apache.org/mod_mbox/flink-dev/201501.mbox/browser), should this be registered by the code that analyzes the generic type and sets up the serializer for the encountered (nested) types? [FLINK-1417] > Serializing Protobuf - issue 1 > -- > > Key: FLINK-1392 > URL: https://issues.apache.org/jira/browse/FLINK-1392 > Project: Flink > Issue Type: Bug >Reporter: Felix Neutatz >Assignee: Robert Metzger >Priority: Minor > > Hi, I started to experiment with Parquet using Protobuf. > When I use the standard Protobuf class: > com.twitter.data.proto.tutorial.AddressBookProtos > The code which I run, can be found here: > [https://github.com/FelixNeutatz/incubator-flink/blob/ParquetAtFlink/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/example/ParquetProtobufOutput.java] > I get the following exception: > {code:xml} > Exception in thread "main" java.lang.Exception: Deserializing the > InputFormat (org.apache.flink.api.java.io.CollectionInputFormat) failed: > Could not read the user code wrapper: Error while deserializing element from > collection > at > org.apache.flink.runtime.jobgraph.InputFormatVertex.initializeOnMaster(InputFormatVertex.java:60) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1$$anonfun$applyOrElse$5.apply(JobManager.scala:179) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1$$anonfun$applyOrElse$5.apply(JobManager.scala:172) > at scala.collection.Iterator$class.foreach(Iterator.scala:727) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:172) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) > at > org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:34) > at > org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:27) > at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118) > at > org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:27) > at akka.actor.Actor$class.aroundReceive(Actor.scala:465) > at > org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:52) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) > at akka.actor.ActorCell.invoke(ActorCell.scala:487) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254) > at akka.dispatch.Mailbox.run(Mailbox.scala:221) > at akka.dispatch.Mailbox.exec(Mailbox.scala:231) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Caused by: > org.apache.flink.runtime.operators.util.CorruptConfigurationException: Could > not read the user code wrapper: Error while deserializing element from > collection > at > org.apache.flink.runtime.operators.util.TaskConfig.getStubWrapper(TaskConfig.java:285) > at > org.apache.flink.runtime.jobgraph.InputFormatVertex.initializeOnMaster(InputFormatVertex.java:57) > ... 25 more > Caused by: java.io.IOException: Error while deserializing element from > collection > at > org.apache.flink.api.java.io.CollectionInputFormat.readObject(CollectionInputFormat.java:108) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:606) > at > java.io.ObjectStreamClass.invok
[jira] [Commented] (FLINK-1392) Serializing Protobuf - issue 1
[ https://issues.apache.org/jira/browse/FLINK-1392?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14291996#comment-14291996 ] ASF GitHub Bot commented on FLINK-1392: --- Github user rmetzger closed the pull request at: https://github.com/apache/flink/pull/322 > Serializing Protobuf - issue 1 > -- > > Key: FLINK-1392 > URL: https://issues.apache.org/jira/browse/FLINK-1392 > Project: Flink > Issue Type: Sub-task >Reporter: Felix Neutatz >Assignee: Robert Metzger >Priority: Minor > > Hi, I started to experiment with Parquet using Protobuf. > When I use the standard Protobuf class: > com.twitter.data.proto.tutorial.AddressBookProtos > The code which I run, can be found here: > [https://github.com/FelixNeutatz/incubator-flink/blob/ParquetAtFlink/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/example/ParquetProtobufOutput.java] > I get the following exception: > {code:xml} > Exception in thread "main" java.lang.Exception: Deserializing the > InputFormat (org.apache.flink.api.java.io.CollectionInputFormat) failed: > Could not read the user code wrapper: Error while deserializing element from > collection > at > org.apache.flink.runtime.jobgraph.InputFormatVertex.initializeOnMaster(InputFormatVertex.java:60) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1$$anonfun$applyOrElse$5.apply(JobManager.scala:179) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1$$anonfun$applyOrElse$5.apply(JobManager.scala:172) > at scala.collection.Iterator$class.foreach(Iterator.scala:727) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:172) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) > at > org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:34) > at > org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:27) > at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118) > at > org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:27) > at akka.actor.Actor$class.aroundReceive(Actor.scala:465) > at > org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:52) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) > at akka.actor.ActorCell.invoke(ActorCell.scala:487) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254) > at akka.dispatch.Mailbox.run(Mailbox.scala:221) > at akka.dispatch.Mailbox.exec(Mailbox.scala:231) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Caused by: > org.apache.flink.runtime.operators.util.CorruptConfigurationException: Could > not read the user code wrapper: Error while deserializing element from > collection > at > org.apache.flink.runtime.operators.util.TaskConfig.getStubWrapper(TaskConfig.java:285) > at > org.apache.flink.runtime.jobgraph.InputFormatVertex.initializeOnMaster(InputFormatVertex.java:57) > ... 25 more > Caused by: java.io.IOException: Error while deserializing element from > collection > at > org.apache.flink.api.java.io.CollectionInputFormat.readObject(CollectionInputFormat.java:108) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:606) > at > java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) >
[jira] [Commented] (FLINK-1392) Serializing Protobuf - issue 1
[ https://issues.apache.org/jira/browse/FLINK-1392?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14291995#comment-14291995 ] ASF GitHub Bot commented on FLINK-1392: --- Github user rmetzger commented on the pull request: https://github.com/apache/flink/pull/322#issuecomment-71484526 I will make this change part of a new pull request for https://issues.apache.org/jira/browse/FLINK-1417. > Serializing Protobuf - issue 1 > -- > > Key: FLINK-1392 > URL: https://issues.apache.org/jira/browse/FLINK-1392 > Project: Flink > Issue Type: Sub-task >Reporter: Felix Neutatz >Assignee: Robert Metzger >Priority: Minor > > Hi, I started to experiment with Parquet using Protobuf. > When I use the standard Protobuf class: > com.twitter.data.proto.tutorial.AddressBookProtos > The code which I run, can be found here: > [https://github.com/FelixNeutatz/incubator-flink/blob/ParquetAtFlink/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/example/ParquetProtobufOutput.java] > I get the following exception: > {code:xml} > Exception in thread "main" java.lang.Exception: Deserializing the > InputFormat (org.apache.flink.api.java.io.CollectionInputFormat) failed: > Could not read the user code wrapper: Error while deserializing element from > collection > at > org.apache.flink.runtime.jobgraph.InputFormatVertex.initializeOnMaster(InputFormatVertex.java:60) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1$$anonfun$applyOrElse$5.apply(JobManager.scala:179) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1$$anonfun$applyOrElse$5.apply(JobManager.scala:172) > at scala.collection.Iterator$class.foreach(Iterator.scala:727) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:172) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) > at > org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:34) > at > org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:27) > at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118) > at > org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:27) > at akka.actor.Actor$class.aroundReceive(Actor.scala:465) > at > org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:52) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) > at akka.actor.ActorCell.invoke(ActorCell.scala:487) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254) > at akka.dispatch.Mailbox.run(Mailbox.scala:221) > at akka.dispatch.Mailbox.exec(Mailbox.scala:231) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Caused by: > org.apache.flink.runtime.operators.util.CorruptConfigurationException: Could > not read the user code wrapper: Error while deserializing element from > collection > at > org.apache.flink.runtime.operators.util.TaskConfig.getStubWrapper(TaskConfig.java:285) > at > org.apache.flink.runtime.jobgraph.InputFormatVertex.initializeOnMaster(InputFormatVertex.java:57) > ... 25 more > Caused by: java.io.IOException: Error while deserializing element from > collection > at > org.apache.flink.api.java.io.CollectionInputFormat.readObject(CollectionInputFormat.java:108) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:606) > at > java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893) > at > java.io.ObjectInputStr
[jira] [Commented] (FLINK-1392) Serializing Protobuf - issue 1
[ https://issues.apache.org/jira/browse/FLINK-1392?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14312242#comment-14312242 ] Robert Metzger commented on FLINK-1392: --- Pushed fix for "release-0.8" in http://git-wip-us.apache.org/repos/asf/flink/commit/c4c3db50. > Serializing Protobuf - issue 1 > -- > > Key: FLINK-1392 > URL: https://issues.apache.org/jira/browse/FLINK-1392 > Project: Flink > Issue Type: Sub-task >Affects Versions: 0.8, 0.9 >Reporter: Felix Neutatz >Assignee: Robert Metzger >Priority: Minor > Fix For: 0.8.1 > > > Hi, I started to experiment with Parquet using Protobuf. > When I use the standard Protobuf class: > com.twitter.data.proto.tutorial.AddressBookProtos > The code which I run, can be found here: > [https://github.com/FelixNeutatz/incubator-flink/blob/ParquetAtFlink/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/example/ParquetProtobufOutput.java] > I get the following exception: > {code:xml} > Exception in thread "main" java.lang.Exception: Deserializing the > InputFormat (org.apache.flink.api.java.io.CollectionInputFormat) failed: > Could not read the user code wrapper: Error while deserializing element from > collection > at > org.apache.flink.runtime.jobgraph.InputFormatVertex.initializeOnMaster(InputFormatVertex.java:60) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1$$anonfun$applyOrElse$5.apply(JobManager.scala:179) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1$$anonfun$applyOrElse$5.apply(JobManager.scala:172) > at scala.collection.Iterator$class.foreach(Iterator.scala:727) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:172) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) > at > org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:34) > at > org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:27) > at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118) > at > org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:27) > at akka.actor.Actor$class.aroundReceive(Actor.scala:465) > at > org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:52) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) > at akka.actor.ActorCell.invoke(ActorCell.scala:487) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254) > at akka.dispatch.Mailbox.run(Mailbox.scala:221) > at akka.dispatch.Mailbox.exec(Mailbox.scala:231) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Caused by: > org.apache.flink.runtime.operators.util.CorruptConfigurationException: Could > not read the user code wrapper: Error while deserializing element from > collection > at > org.apache.flink.runtime.operators.util.TaskConfig.getStubWrapper(TaskConfig.java:285) > at > org.apache.flink.runtime.jobgraph.InputFormatVertex.initializeOnMaster(InputFormatVertex.java:57) > ... 25 more > Caused by: java.io.IOException: Error while deserializing element from > collection > at > org.apache.flink.api.java.io.CollectionInputFormat.readObject(CollectionInputFormat.java:108) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:606) > at > java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) > at java.io.Objec