Re: Bug broadcasting objects (serialization issue)
Hi, I get a new similar bug when broadcasting a list of integers if this list is made unmodifiable, elements = Collections.unmodifiableList(elements); I include this code to reproduce the result, public class WordCountExample { public static void main(String[] args) throws Exception { final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); DataSet text = env.fromElements( "Who's there?", "I think I hear them. Stand, ho! Who's there?"); List elements = new ArrayList(); elements.add(0); elements = Collections.unmodifiableList(elements); DataSet set = env.fromElements(new TestClass(elements)); DataSet> wordCounts = text .flatMap(new LineSplitter()) .withBroadcastSet(set, "set") .groupBy(0) .sum(1); wordCounts.print(); } public static class LineSplitter implements FlatMapFunction > { @Override public void flatMap(String line, Collector > out) { for (String word : line.split(" ")) { out.collect(new Tuple2 (word, 1)); } } } public static class TestClass implements Serializable { private static final long serialVersionUID = -2932037991574118651L; List integerList; public TestClass(List integerList){ this.integerList=integerList; } } } Thanks for your support, Andres On 2/9/15 11:17, Andres R. Masegosa wrote: > Hi, > > I get a bug when trying to broadcast a list of integers created with the > primitive "Arrays.asList(...)". > > For example, if you try to run this "wordcount" example, you can > reproduce the bug. > > > public class WordCountExample { > public static void main(String[] args) throws Exception { > final ExecutionEnvironment env = > ExecutionEnvironment.getExecutionEnvironment(); > > DataSet text = env.fromElements( > "Who's there?", > "I think I hear them. Stand, ho! Who's there?"); > > List elements = Arrays.asList(0, 0, 0); > > DataSet set = env.fromElements(new TestClass(elements)); > > DataSet > wordCounts = text > .flatMap(new LineSplitter()) > .withBroadcastSet(set, "set") > .groupBy(0) > .sum(1); > > wordCounts.print(); > } > > public static class LineSplitter implements FlatMapFunction Tuple2 > { > @Override > public void flatMap(String line, Collector Integer>> out) { > for (String word : line.split(" ")) { > out.collect(new Tuple2 (word, 1)); > } > } > } > > public static class TestClass implements Serializable { > private static final long serialVersionUID = -2932037991574118651L; > > List integerList; > public TestClass(List integerList){ > this.integerList=integerList; > } > > > } > } > > > However, if instead of using the primitive "Arrays.asList(...)", we use > instead the ArrayList<> constructor, there is any problem > > > Regards, > Andres >
Re: Bug broadcasting objects (serialization issue)
Thanks for clarifying the "eager serialization". By serializing and deserializing explicitly (eagerly) we can raise better Exceptions to notify the user of non-serializable classes. > BTW: There is an opportunity to fix two problems with one patch: The > framesize overflow for the input format, and the serialization. IMHO this adds another layer of complexity to the job submission phase. I just had a chat with Robert about this. I wonder, is it possible to increase the Akka framesize only for the Client ActorSystem? On Wed, Sep 2, 2015 at 4:27 PM, Stephan Ewenwrote: > I see. > > Manual serialization implies also manual deserialization (on the workers > only), which would give a better exception. > > BTW: There is an opportunity to fix two problems with one patch: The > framesize overflow for the input format, and the serialization. > > On Wed, Sep 2, 2015 at 4:16 PM, Maximilian Michels wrote: >> >> Ok but that would not prevent the above error, right? Serializing is >> not the issue here. >> >> Nevertheless, it would catch all errors during initial serialization. >> Deserializing has its own hazards due to possible Classloader issues. >> >> On Wed, Sep 2, 2015 at 4:05 PM, Stephan Ewen wrote: >> > Yes, even serialize in the constructor. Then the failure (if >> > serialization >> > does not work) comes immediately. >> > >> > On Wed, Sep 2, 2015 at 4:02 PM, Maximilian Michels >> > wrote: >> >> >> >> Nice suggestion. So you want to serialize and deserialize the >> >> InputFormats >> >> on the Client to check whether they can be transferred correctly? >> >> Merely >> >> serializing is not enough because the above Exception occurs during >> >> deserialization. >> >> >> >> On Wed, Sep 2, 2015 at 2:29 PM, Stephan Ewen wrote: >> >>> >> >>> We should try to improve the exception here. More people will run into >> >>> this issue and the exception should help them understand it well. >> >>> >> >>> How about we do eager serialization into a set of byte arrays? Then >> >>> the >> >>> serializability issue comes immediately when the program is >> >>> constructed, >> >>> rather than later, when it is shipped. >> >>> >> >>> On Wed, Sep 2, 2015 at 12:56 PM, Maximilian Michels >> >>> wrote: >> >> Here's the JIRA issue: >> https://issues.apache.org/jira/browse/FLINK-2608 >> >> On Wed, Sep 2, 2015 at 12:49 PM, Maximilian Michels >> wrote: >> > >> > Hi Andreas, >> > >> > Thank you for reporting the problem and including the code to >> > reproduce >> > the problem. I think there is a problem with the class serialization >> > or >> > deserialization. Arrays.asList uses a private ArrayList class >> > (java.util.Arrays$ArrayList) which is not the one you would normally >> > use >> > (java.util.ArrayList). >> > >> > I'll create a JIRA issue to keep track of the problem and to >> > investigate further. >> > >> > Best regards, >> > Max >> > >> > Here's the stack trace: >> > >> > Exception in thread "main" >> > org.apache.flink.runtime.client.JobExecutionException: Cannot >> > initialize >> > task 'DataSource (at main(Test.java:32) >> > (org.apache.flink.api.java.io.CollectionInputFormat))': >> > Deserializing the >> > InputFormat ([mytests.Test$TestClass@4d6025c5]) failed: unread block >> > data >> > at >> > >> > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$4.apply(JobManager.scala:523) >> > at >> > >> > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$4.apply(JobManager.scala:507) >> > at scala.collection.Iterator$class.foreach(Iterator.scala:727) >> > at >> > scala.collection.AbstractIterator.foreach(Iterator.scala:1157) >> > at >> > scala.collection.IterableLike$class.foreach(IterableLike.scala:72) >> > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) >> > at >> > >> > org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:507) >> > at >> > >> > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:190) >> > at >> > >> > scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) >> > at >> > >> > scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) >> > at >> > >> > scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) >> > at >> > >> > org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:43) >> >
Bug broadcasting objects (serialization issue)
Hi, I get a bug when trying to broadcast a list of integers created with the primitive "Arrays.asList(...)". For example, if you try to run this "wordcount" example, you can reproduce the bug. public class WordCountExample { public static void main(String[] args) throws Exception { final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); DataSet text = env.fromElements( "Who's there?", "I think I hear them. Stand, ho! Who's there?"); List elements = Arrays.asList(0, 0, 0); DataSet set = env.fromElements(new TestClass(elements)); DataSet> wordCounts = text .flatMap(new LineSplitter()) .withBroadcastSet(set, "set") .groupBy(0) .sum(1); wordCounts.print(); } public static class LineSplitter implements FlatMapFunction > { @Override public void flatMap(String line, Collector > out) { for (String word : line.split(" ")) { out.collect(new Tuple2 (word, 1)); } } } public static class TestClass implements Serializable { private static final long serialVersionUID = -2932037991574118651L; List integerList; public TestClass(List integerList){ this.integerList=integerList; } } } However, if instead of using the primitive "Arrays.asList(...)", we use instead the ArrayList<> constructor, there is any problem Regards, Andres
Re: Bug broadcasting objects (serialization issue)
Hi Andreas, Thank you for reporting the problem and including the code to reproduce the problem. I think there is a problem with the class serialization or deserialization. Arrays.asList uses a private ArrayList class (java.util.Arrays$ArrayList) which is not the one you would normally use (java.util.ArrayList). I'll create a JIRA issue to keep track of the problem and to investigate further. Best regards, Max Here's the stack trace: Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Cannot initialize task 'DataSource (at main(Test.java:32) (org.apache.flink.api.java.io.CollectionInputFormat))': Deserializing the InputFormat ([mytests.Test$TestClass@4d6025c5]) failed: unread block data at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$4.apply(JobManager.scala:523) at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$4.apply(JobManager.scala:507) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at org.apache.flink.runtime.jobmanager.JobManager.org $apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:507) at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:190) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:43) at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29) at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118) at org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29) at akka.actor.Actor$class.aroundReceive(Actor.scala:465) at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:92) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) at akka.actor.ActorCell.invoke(ActorCell.scala:487) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254) at akka.dispatch.Mailbox.run(Mailbox.scala:221) at akka.dispatch.Mailbox.exec(Mailbox.scala:231) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: java.lang.Exception: Deserializing the InputFormat ([mytests.Test$TestClass@4d6025c5]) failed: unread block data at org.apache.flink.runtime.jobgraph.InputFormatVertex.initializeOnMaster(InputFormatVertex.java:60) at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$4.apply(JobManager.scala:520) ... 25 more Caused by: java.lang.IllegalStateException: unread block data at java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(ObjectInputStream.java:2424) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1383) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371) at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:302) at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:264) at org.apache.flink.runtime.operators.util.TaskConfig.getStubWrapper(TaskConfig.java:282) at org.apache.flink.runtime.jobgraph.InputFormatVertex.initializeOnMaster(InputFormatVertex.java:57) ... 26 more On Wed, Sep 2, 2015 at 11:17 AM, Andres R. Masegosawrote: > Hi, > > I get a bug when trying to broadcast a list of integers created with the > primitive "Arrays.asList(...)". > > For example, if you try to run this "wordcount" example, you can > reproduce the bug. > > > public class WordCountExample { > public static void main(String[] args) throws Exception { > final ExecutionEnvironment env = > ExecutionEnvironment.getExecutionEnvironment(); > > DataSet text = env.fromElements( > "Who's there?", > "I think I hear them.
Re: Bug broadcasting objects (serialization issue)
Yes, even serialize in the constructor. Then the failure (if serialization does not work) comes immediately. On Wed, Sep 2, 2015 at 4:02 PM, Maximilian Michelswrote: > Nice suggestion. So you want to serialize and deserialize the InputFormats > on the Client to check whether they can be transferred correctly? Merely > serializing is not enough because the above Exception occurs during > deserialization. > > On Wed, Sep 2, 2015 at 2:29 PM, Stephan Ewen wrote: > >> We should try to improve the exception here. More people will run into >> this issue and the exception should help them understand it well. >> >> How about we do eager serialization into a set of byte arrays? Then the >> serializability issue comes immediately when the program is constructed, >> rather than later, when it is shipped. >> >> On Wed, Sep 2, 2015 at 12:56 PM, Maximilian Michels >> wrote: >> >>> Here's the JIRA issue: https://issues.apache.org/jira/browse/FLINK-2608 >>> >>> On Wed, Sep 2, 2015 at 12:49 PM, Maximilian Michels >>> wrote: >>> Hi Andreas, Thank you for reporting the problem and including the code to reproduce the problem. I think there is a problem with the class serialization or deserialization. Arrays.asList uses a private ArrayList class (java.util.Arrays$ArrayList) which is not the one you would normally use (java.util.ArrayList). I'll create a JIRA issue to keep track of the problem and to investigate further. Best regards, Max Here's the stack trace: Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Cannot initialize task 'DataSource (at main(Test.java:32) (org.apache.flink.api.java.io.CollectionInputFormat))': Deserializing the InputFormat ([mytests.Test$TestClass@4d6025c5]) failed: unread block data at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$4.apply(JobManager.scala:523) at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$4.apply(JobManager.scala:507) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at org.apache.flink.runtime.jobmanager.JobManager.org $apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:507) at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:190) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:43) at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29) at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118) at org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29) at akka.actor.Actor$class.aroundReceive(Actor.scala:465) at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:92) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) at akka.actor.ActorCell.invoke(ActorCell.scala:487) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254) at akka.dispatch.Mailbox.run(Mailbox.scala:221) at akka.dispatch.Mailbox.exec(Mailbox.scala:231) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: java.lang.Exception: Deserializing the InputFormat ([mytests.Test$TestClass@4d6025c5]) failed: unread block data at org.apache.flink.runtime.jobgraph.InputFormatVertex.initializeOnMaster(InputFormatVertex.java:60) at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$4.apply(JobManager.scala:520) ... 25 more Caused by: java.lang.IllegalStateException: unread block data at
Re: Bug broadcasting objects (serialization issue)
Nice suggestion. So you want to serialize and deserialize the InputFormats on the Client to check whether they can be transferred correctly? Merely serializing is not enough because the above Exception occurs during deserialization. On Wed, Sep 2, 2015 at 2:29 PM, Stephan Ewenwrote: > We should try to improve the exception here. More people will run into > this issue and the exception should help them understand it well. > > How about we do eager serialization into a set of byte arrays? Then the > serializability issue comes immediately when the program is constructed, > rather than later, when it is shipped. > > On Wed, Sep 2, 2015 at 12:56 PM, Maximilian Michels > wrote: > >> Here's the JIRA issue: https://issues.apache.org/jira/browse/FLINK-2608 >> >> On Wed, Sep 2, 2015 at 12:49 PM, Maximilian Michels >> wrote: >> >>> Hi Andreas, >>> >>> Thank you for reporting the problem and including the code to reproduce >>> the problem. I think there is a problem with the class serialization or >>> deserialization. Arrays.asList uses a private ArrayList class >>> (java.util.Arrays$ArrayList) which is not the one you would normally use >>> (java.util.ArrayList). >>> >>> I'll create a JIRA issue to keep track of the problem and to investigate >>> further. >>> >>> Best regards, >>> Max >>> >>> Here's the stack trace: >>> >>> Exception in thread "main" >>> org.apache.flink.runtime.client.JobExecutionException: Cannot initialize >>> task 'DataSource (at main(Test.java:32) >>> (org.apache.flink.api.java.io.CollectionInputFormat))': Deserializing the >>> InputFormat ([mytests.Test$TestClass@4d6025c5]) failed: unread block >>> data >>> at >>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$4.apply(JobManager.scala:523) >>> at >>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$4.apply(JobManager.scala:507) >>> at scala.collection.Iterator$class.foreach(Iterator.scala:727) >>> at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) >>> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) >>> at scala.collection.AbstractIterable.foreach(Iterable.scala:54) >>> at org.apache.flink.runtime.jobmanager.JobManager.org >>> $apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:507) >>> at >>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:190) >>> at >>> scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) >>> at >>> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) >>> at >>> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) >>> at >>> org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:43) >>> at >>> org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29) >>> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118) >>> at >>> org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29) >>> at akka.actor.Actor$class.aroundReceive(Actor.scala:465) >>> at >>> org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:92) >>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) >>> at akka.actor.ActorCell.invoke(ActorCell.scala:487) >>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254) >>> at akka.dispatch.Mailbox.run(Mailbox.scala:221) >>> at akka.dispatch.Mailbox.exec(Mailbox.scala:231) >>> at >>> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) >>> at >>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) >>> at >>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) >>> at >>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) >>> Caused by: java.lang.Exception: Deserializing the InputFormat >>> ([mytests.Test$TestClass@4d6025c5]) failed: unread block data >>> at >>> org.apache.flink.runtime.jobgraph.InputFormatVertex.initializeOnMaster(InputFormatVertex.java:60) >>> at >>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$4.apply(JobManager.scala:520) >>> ... 25 more >>> Caused by: java.lang.IllegalStateException: unread block data >>> at >>> java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(ObjectInputStream.java:2424) >>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1383) >>> at >>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993) >>> at >>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918) >>> at >>>
Re: Bug broadcasting objects (serialization issue)
Here's the JIRA issue: https://issues.apache.org/jira/browse/FLINK-2608 On Wed, Sep 2, 2015 at 12:49 PM, Maximilian Michelswrote: > Hi Andreas, > > Thank you for reporting the problem and including the code to reproduce > the problem. I think there is a problem with the class serialization or > deserialization. Arrays.asList uses a private ArrayList class > (java.util.Arrays$ArrayList) which is not the one you would normally use > (java.util.ArrayList). > > I'll create a JIRA issue to keep track of the problem and to investigate > further. > > Best regards, > Max > > Here's the stack trace: > > Exception in thread "main" > org.apache.flink.runtime.client.JobExecutionException: Cannot initialize > task 'DataSource (at main(Test.java:32) > (org.apache.flink.api.java.io.CollectionInputFormat))': Deserializing the > InputFormat ([mytests.Test$TestClass@4d6025c5]) failed: unread block data > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$4.apply(JobManager.scala:523) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$4.apply(JobManager.scala:507) > at scala.collection.Iterator$class.foreach(Iterator.scala:727) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > at org.apache.flink.runtime.jobmanager.JobManager.org > $apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:507) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:190) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) > at > org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:43) > at > org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29) > at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118) > at > org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29) > at akka.actor.Actor$class.aroundReceive(Actor.scala:465) > at > org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:92) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) > at akka.actor.ActorCell.invoke(ActorCell.scala:487) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254) > at akka.dispatch.Mailbox.run(Mailbox.scala:221) > at akka.dispatch.Mailbox.exec(Mailbox.scala:231) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Caused by: java.lang.Exception: Deserializing the InputFormat > ([mytests.Test$TestClass@4d6025c5]) failed: unread block data > at > org.apache.flink.runtime.jobgraph.InputFormatVertex.initializeOnMaster(InputFormatVertex.java:60) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$4.apply(JobManager.scala:520) > ... 25 more > Caused by: java.lang.IllegalStateException: unread block data > at > java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(ObjectInputStream.java:2424) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1383) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993) > at > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371) > at > org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:302) > at > org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:264) > at > org.apache.flink.runtime.operators.util.TaskConfig.getStubWrapper(TaskConfig.java:282) > at > org.apache.flink.runtime.jobgraph.InputFormatVertex.initializeOnMaster(InputFormatVertex.java:57) > ... 26 more > > On Wed, Sep 2, 2015 at 11:17 AM, Andres R. Masegosa > wrote: > >> Hi, >> >> I get a bug when trying to broadcast a list of integers created with the >> primitive "Arrays.asList(...)". >> >> For example, if you try to run this "wordcount" example, you can
Re: Bug broadcasting objects (serialization issue)
We should try to improve the exception here. More people will run into this issue and the exception should help them understand it well. How about we do eager serialization into a set of byte arrays? Then the serializability issue comes immediately when the program is constructed, rather than later, when it is shipped. On Wed, Sep 2, 2015 at 12:56 PM, Maximilian Michelswrote: > Here's the JIRA issue: https://issues.apache.org/jira/browse/FLINK-2608 > > On Wed, Sep 2, 2015 at 12:49 PM, Maximilian Michels > wrote: > >> Hi Andreas, >> >> Thank you for reporting the problem and including the code to reproduce >> the problem. I think there is a problem with the class serialization or >> deserialization. Arrays.asList uses a private ArrayList class >> (java.util.Arrays$ArrayList) which is not the one you would normally use >> (java.util.ArrayList). >> >> I'll create a JIRA issue to keep track of the problem and to investigate >> further. >> >> Best regards, >> Max >> >> Here's the stack trace: >> >> Exception in thread "main" >> org.apache.flink.runtime.client.JobExecutionException: Cannot initialize >> task 'DataSource (at main(Test.java:32) >> (org.apache.flink.api.java.io.CollectionInputFormat))': Deserializing the >> InputFormat ([mytests.Test$TestClass@4d6025c5]) failed: unread block data >> at >> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$4.apply(JobManager.scala:523) >> at >> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$4.apply(JobManager.scala:507) >> at scala.collection.Iterator$class.foreach(Iterator.scala:727) >> at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) >> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) >> at scala.collection.AbstractIterable.foreach(Iterable.scala:54) >> at org.apache.flink.runtime.jobmanager.JobManager.org >> $apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:507) >> at >> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:190) >> at >> scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) >> at >> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) >> at >> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) >> at >> org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:43) >> at >> org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29) >> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118) >> at >> org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29) >> at akka.actor.Actor$class.aroundReceive(Actor.scala:465) >> at >> org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:92) >> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) >> at akka.actor.ActorCell.invoke(ActorCell.scala:487) >> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254) >> at akka.dispatch.Mailbox.run(Mailbox.scala:221) >> at akka.dispatch.Mailbox.exec(Mailbox.scala:231) >> at >> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) >> at >> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) >> at >> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) >> at >> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) >> Caused by: java.lang.Exception: Deserializing the InputFormat >> ([mytests.Test$TestClass@4d6025c5]) failed: unread block data >> at >> org.apache.flink.runtime.jobgraph.InputFormatVertex.initializeOnMaster(InputFormatVertex.java:60) >> at >> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$4.apply(JobManager.scala:520) >> ... 25 more >> Caused by: java.lang.IllegalStateException: unread block data >> at >> java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(ObjectInputStream.java:2424) >> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1383) >> at >> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993) >> at >> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918) >> at >> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) >> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) >> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371) >> at >> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:302) >> at >> org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:264) >>
Re: Bug broadcasting objects (serialization issue)
I see. Manual serialization implies also manual deserialization (on the workers only), which would give a better exception. BTW: There is an opportunity to fix two problems with one patch: The framesize overflow for the input format, and the serialization. On Wed, Sep 2, 2015 at 4:16 PM, Maximilian Michelswrote: > Ok but that would not prevent the above error, right? Serializing is > not the issue here. > > Nevertheless, it would catch all errors during initial serialization. > Deserializing has its own hazards due to possible Classloader issues. > > On Wed, Sep 2, 2015 at 4:05 PM, Stephan Ewen wrote: > > Yes, even serialize in the constructor. Then the failure (if > serialization > > does not work) comes immediately. > > > > On Wed, Sep 2, 2015 at 4:02 PM, Maximilian Michels > wrote: > >> > >> Nice suggestion. So you want to serialize and deserialize the > InputFormats > >> on the Client to check whether they can be transferred correctly? Merely > >> serializing is not enough because the above Exception occurs during > >> deserialization. > >> > >> On Wed, Sep 2, 2015 at 2:29 PM, Stephan Ewen wrote: > >>> > >>> We should try to improve the exception here. More people will run into > >>> this issue and the exception should help them understand it well. > >>> > >>> How about we do eager serialization into a set of byte arrays? Then the > >>> serializability issue comes immediately when the program is > constructed, > >>> rather than later, when it is shipped. > >>> > >>> On Wed, Sep 2, 2015 at 12:56 PM, Maximilian Michels > >>> wrote: > > Here's the JIRA issue: > https://issues.apache.org/jira/browse/FLINK-2608 > > On Wed, Sep 2, 2015 at 12:49 PM, Maximilian Michels > wrote: > > > > Hi Andreas, > > > > Thank you for reporting the problem and including the code to > reproduce > > the problem. I think there is a problem with the class serialization > or > > deserialization. Arrays.asList uses a private ArrayList class > > (java.util.Arrays$ArrayList) which is not the one you would normally > use > > (java.util.ArrayList). > > > > I'll create a JIRA issue to keep track of the problem and to > > investigate further. > > > > Best regards, > > Max > > > > Here's the stack trace: > > > > Exception in thread "main" > > org.apache.flink.runtime.client.JobExecutionException: Cannot > initialize > > task 'DataSource (at main(Test.java:32) > > (org.apache.flink.api.java.io.CollectionInputFormat))': > Deserializing the > > InputFormat ([mytests.Test$TestClass@4d6025c5]) failed: unread > block data > > at > > > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$4.apply(JobManager.scala:523) > > at > > > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$4.apply(JobManager.scala:507) > > at scala.collection.Iterator$class.foreach(Iterator.scala:727) > > at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) > > at > > scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > > at > > org.apache.flink.runtime.jobmanager.JobManager.org > $apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:507) > > at > > > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:190) > > at > > > scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) > > at > > > scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) > > at > > > scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) > > at > > > org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:43) > > at > > > org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29) > > at > > scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118) > > at > > > org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29) > > at akka.actor.Actor$class.aroundReceive(Actor.scala:465) > > at > > > org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:92) > > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) > > at akka.actor.ActorCell.invoke(ActorCell.scala:487) > > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254) > > at akka.dispatch.Mailbox.run(Mailbox.scala:221) > > at akka.dispatch.Mailbox.exec(Mailbox.scala:231) > > at > >