Re: Bug broadcasting objects (serialization issue)

2015-09-03 Thread Andres R. Masegosa
Hi,

I get a new similar bug when broadcasting a list of integers if this
list is made unmodifiable,

elements = Collections.unmodifiableList(elements);


I include this code to reproduce the result,


public class WordCountExample {
public static void main(String[] args) throws Exception {
final ExecutionEnvironment env =
ExecutionEnvironment.getExecutionEnvironment();

DataSet text = env.fromElements(
"Who's there?",
"I think I hear them. Stand, ho! Who's there?");

List elements = new ArrayList();
elements.add(0);

elements = Collections.unmodifiableList(elements);

DataSet set = env.fromElements(new TestClass(elements));

DataSet> wordCounts = text
.flatMap(new LineSplitter())
.withBroadcastSet(set, "set")
.groupBy(0)
.sum(1);

wordCounts.print();
}

public static class LineSplitter implements FlatMapFunction> {
@Override
public void flatMap(String line, Collector> out) {
for (String word : line.split(" ")) {
out.collect(new Tuple2(word, 1));
}
}
}

public static class TestClass implements Serializable {
private static final long serialVersionUID = -2932037991574118651L;

List integerList;
public TestClass(List integerList){
this.integerList=integerList;
}


}
}

Thanks for your support,
Andres

On 2/9/15 11:17, Andres R. Masegosa  wrote:
> Hi,
> 
> I get a bug when trying to broadcast a list of integers created with the
> primitive "Arrays.asList(...)".
> 
> For example, if you try to run this "wordcount" example, you can
> reproduce the bug.
> 
> 
> public class WordCountExample {
> public static void main(String[] args) throws Exception {
> final ExecutionEnvironment env =
> ExecutionEnvironment.getExecutionEnvironment();
> 
> DataSet text = env.fromElements(
> "Who's there?",
> "I think I hear them. Stand, ho! Who's there?");
> 
> List elements = Arrays.asList(0, 0, 0);
> 
> DataSet set = env.fromElements(new TestClass(elements));
> 
> DataSet> wordCounts = text
> .flatMap(new LineSplitter())
> .withBroadcastSet(set, "set")
> .groupBy(0)
> .sum(1);
> 
> wordCounts.print();
> }
> 
> public static class LineSplitter implements FlatMapFunction Tuple2> {
> @Override
> public void flatMap(String line, Collector Integer>> out) {
> for (String word : line.split(" ")) {
> out.collect(new Tuple2(word, 1));
> }
> }
> }
> 
> public static class TestClass implements Serializable {
> private static final long serialVersionUID = -2932037991574118651L;
> 
> List integerList;
> public TestClass(List integerList){
> this.integerList=integerList;
> }
> 
> 
> }
> }
> 
> 
> However, if instead of using the primitive "Arrays.asList(...)", we use
> instead the ArrayList<> constructor, there is any problem
> 
> 
> Regards,
> Andres
> 


Re: Bug broadcasting objects (serialization issue)

2015-09-03 Thread Maximilian Michels
Thanks for clarifying the "eager serialization". By serializing and
deserializing explicitly (eagerly) we can raise better Exceptions to
notify the user of non-serializable classes.

> BTW: There is an opportunity to fix two problems with one patch: The 
> framesize overflow for the input format, and the serialization.

IMHO this adds another layer of complexity to the job submission
phase. I just had a chat with Robert about this. I wonder, is it
possible to increase the Akka framesize only for the Client
ActorSystem?

On Wed, Sep 2, 2015 at 4:27 PM, Stephan Ewen  wrote:
> I see.
>
> Manual serialization implies also manual deserialization (on the workers
> only), which would give a better exception.
>
> BTW: There is an opportunity to fix two problems with one patch: The
> framesize overflow for the input format, and the serialization.
>
> On Wed, Sep 2, 2015 at 4:16 PM, Maximilian Michels  wrote:
>>
>> Ok but that would not prevent the above error, right? Serializing is
>> not the issue here.
>>
>> Nevertheless, it would catch all errors during initial serialization.
>> Deserializing has its own hazards due to possible Classloader issues.
>>
>> On Wed, Sep 2, 2015 at 4:05 PM, Stephan Ewen  wrote:
>> > Yes, even serialize in the constructor. Then the failure (if
>> > serialization
>> > does not work) comes immediately.
>> >
>> > On Wed, Sep 2, 2015 at 4:02 PM, Maximilian Michels 
>> > wrote:
>> >>
>> >> Nice suggestion. So you want to serialize and deserialize the
>> >> InputFormats
>> >> on the Client to check whether they can be transferred correctly?
>> >> Merely
>> >> serializing is not enough because the above Exception occurs during
>> >> deserialization.
>> >>
>> >> On Wed, Sep 2, 2015 at 2:29 PM, Stephan Ewen  wrote:
>> >>>
>> >>> We should try to improve the exception here. More people will run into
>> >>> this issue and the exception should help them understand it well.
>> >>>
>> >>> How about we do eager serialization into a set of byte arrays? Then
>> >>> the
>> >>> serializability issue comes immediately when the program is
>> >>> constructed,
>> >>> rather than later, when it is shipped.
>> >>>
>> >>> On Wed, Sep 2, 2015 at 12:56 PM, Maximilian Michels 
>> >>> wrote:
>> 
>>  Here's the JIRA issue:
>>  https://issues.apache.org/jira/browse/FLINK-2608
>> 
>>  On Wed, Sep 2, 2015 at 12:49 PM, Maximilian Michels 
>>  wrote:
>> >
>> > Hi Andreas,
>> >
>> > Thank you for reporting the problem and including the code to
>> > reproduce
>> > the problem. I think there is a problem with the class serialization
>> > or
>> > deserialization. Arrays.asList uses a private ArrayList class
>> > (java.util.Arrays$ArrayList) which is not the one you would normally
>> > use
>> > (java.util.ArrayList).
>> >
>> > I'll create a JIRA issue to keep track of the problem and to
>> > investigate further.
>> >
>> > Best regards,
>> > Max
>> >
>> > Here's the stack trace:
>> >
>> > Exception in thread "main"
>> > org.apache.flink.runtime.client.JobExecutionException: Cannot
>> > initialize
>> > task 'DataSource (at main(Test.java:32)
>> > (org.apache.flink.api.java.io.CollectionInputFormat))':
>> > Deserializing the
>> > InputFormat ([mytests.Test$TestClass@4d6025c5]) failed: unread block
>> > data
>> > at
>> >
>> > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$4.apply(JobManager.scala:523)
>> > at
>> >
>> > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$4.apply(JobManager.scala:507)
>> > at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>> > at
>> > scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>> > at
>> > scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>> > at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>> > at
>> >
>> > org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:507)
>> > at
>> >
>> > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:190)
>> > at
>> >
>> > scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
>> > at
>> >
>> > scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
>> > at
>> >
>> > scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
>> > at
>> >
>> > org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:43)
>> > 

Bug broadcasting objects (serialization issue)

2015-09-02 Thread Andres R. Masegosa
Hi,

I get a bug when trying to broadcast a list of integers created with the
primitive "Arrays.asList(...)".

For example, if you try to run this "wordcount" example, you can
reproduce the bug.


public class WordCountExample {
public static void main(String[] args) throws Exception {
final ExecutionEnvironment env =
ExecutionEnvironment.getExecutionEnvironment();

DataSet text = env.fromElements(
"Who's there?",
"I think I hear them. Stand, ho! Who's there?");

List elements = Arrays.asList(0, 0, 0);

DataSet set = env.fromElements(new TestClass(elements));

DataSet> wordCounts = text
.flatMap(new LineSplitter())
.withBroadcastSet(set, "set")
.groupBy(0)
.sum(1);

wordCounts.print();
}

public static class LineSplitter implements FlatMapFunction> {
@Override
public void flatMap(String line, Collector> out) {
for (String word : line.split(" ")) {
out.collect(new Tuple2(word, 1));
}
}
}

public static class TestClass implements Serializable {
private static final long serialVersionUID = -2932037991574118651L;

List integerList;
public TestClass(List integerList){
this.integerList=integerList;
}


}
}


However, if instead of using the primitive "Arrays.asList(...)", we use
instead the ArrayList<> constructor, there is any problem


Regards,
Andres


Re: Bug broadcasting objects (serialization issue)

2015-09-02 Thread Maximilian Michels
Hi Andreas,

Thank you for reporting the problem and including the code to reproduce the
problem. I think there is a problem with the class serialization or
deserialization. Arrays.asList uses a private ArrayList class
(java.util.Arrays$ArrayList) which is not the one you would normally use
(java.util.ArrayList).

I'll create a JIRA issue to keep track of the problem and to investigate
further.

Best regards,
Max

Here's the stack trace:

Exception in thread "main"
org.apache.flink.runtime.client.JobExecutionException: Cannot initialize
task 'DataSource (at main(Test.java:32)
(org.apache.flink.api.java.io.CollectionInputFormat))': Deserializing the
InputFormat ([mytests.Test$TestClass@4d6025c5]) failed: unread block data
at
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$4.apply(JobManager.scala:523)
at
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$4.apply(JobManager.scala:507)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at org.apache.flink.runtime.jobmanager.JobManager.org
$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:507)
at
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:190)
at
scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
at
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
at
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
at
org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:43)
at
org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
at
org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29)
at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
at
org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:92)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
at akka.dispatch.Mailbox.run(Mailbox.scala:221)
at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.Exception: Deserializing the InputFormat
([mytests.Test$TestClass@4d6025c5]) failed: unread block data
at
org.apache.flink.runtime.jobgraph.InputFormatVertex.initializeOnMaster(InputFormatVertex.java:60)
at
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$4.apply(JobManager.scala:520)
... 25 more
Caused by: java.lang.IllegalStateException: unread block data
at
java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(ObjectInputStream.java:2424)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1383)
at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
at
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:302)
at
org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:264)
at
org.apache.flink.runtime.operators.util.TaskConfig.getStubWrapper(TaskConfig.java:282)
at
org.apache.flink.runtime.jobgraph.InputFormatVertex.initializeOnMaster(InputFormatVertex.java:57)
... 26 more

On Wed, Sep 2, 2015 at 11:17 AM, Andres R. Masegosa 
wrote:

> Hi,
>
> I get a bug when trying to broadcast a list of integers created with the
> primitive "Arrays.asList(...)".
>
> For example, if you try to run this "wordcount" example, you can
> reproduce the bug.
>
>
> public class WordCountExample {
> public static void main(String[] args) throws Exception {
> final ExecutionEnvironment env =
> ExecutionEnvironment.getExecutionEnvironment();
>
> DataSet text = env.fromElements(
> "Who's there?",
> "I think I hear them. 

Re: Bug broadcasting objects (serialization issue)

2015-09-02 Thread Stephan Ewen
Yes, even serialize in the constructor. Then the failure (if serialization
does not work) comes immediately.

On Wed, Sep 2, 2015 at 4:02 PM, Maximilian Michels  wrote:

> Nice suggestion. So you want to serialize and deserialize the InputFormats
> on the Client to check whether they can be transferred correctly? Merely
> serializing is not enough because the above Exception occurs during
> deserialization.
>
> On Wed, Sep 2, 2015 at 2:29 PM, Stephan Ewen  wrote:
>
>> We should try to improve the exception here. More people will run into
>> this issue and the exception should help them understand it well.
>>
>> How about we do eager serialization into a set of byte arrays? Then the
>> serializability issue comes immediately when the program is constructed,
>> rather than later, when it is shipped.
>>
>> On Wed, Sep 2, 2015 at 12:56 PM, Maximilian Michels 
>> wrote:
>>
>>> Here's the JIRA issue: https://issues.apache.org/jira/browse/FLINK-2608
>>>
>>> On Wed, Sep 2, 2015 at 12:49 PM, Maximilian Michels 
>>> wrote:
>>>
 Hi Andreas,

 Thank you for reporting the problem and including the code to reproduce
 the problem. I think there is a problem with the class serialization or
 deserialization. Arrays.asList uses a private ArrayList class
 (java.util.Arrays$ArrayList) which is not the one you would normally use
 (java.util.ArrayList).

 I'll create a JIRA issue to keep track of the problem and to
 investigate further.

 Best regards,
 Max

 Here's the stack trace:

 Exception in thread "main"
 org.apache.flink.runtime.client.JobExecutionException: Cannot initialize
 task 'DataSource (at main(Test.java:32)
 (org.apache.flink.api.java.io.CollectionInputFormat))': Deserializing the
 InputFormat ([mytests.Test$TestClass@4d6025c5]) failed: unread block
 data
 at
 org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$4.apply(JobManager.scala:523)
 at
 org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$4.apply(JobManager.scala:507)
 at scala.collection.Iterator$class.foreach(Iterator.scala:727)
 at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
 at
 scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
 at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
 at org.apache.flink.runtime.jobmanager.JobManager.org
 $apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:507)
 at
 org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:190)
 at
 scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
 at
 scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
 at
 scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
 at
 org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:43)
 at
 org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29)
 at
 scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
 at
 org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29)
 at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
 at
 org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:92)
 at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
 at akka.actor.ActorCell.invoke(ActorCell.scala:487)
 at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
 at akka.dispatch.Mailbox.run(Mailbox.scala:221)
 at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
 at
 scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
 at
 scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
 at
 scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
 at
 scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
 Caused by: java.lang.Exception: Deserializing the InputFormat
 ([mytests.Test$TestClass@4d6025c5]) failed: unread block data
 at
 org.apache.flink.runtime.jobgraph.InputFormatVertex.initializeOnMaster(InputFormatVertex.java:60)
 at
 org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$4.apply(JobManager.scala:520)
 ... 25 more
 Caused by: java.lang.IllegalStateException: unread block data
 at
 

Re: Bug broadcasting objects (serialization issue)

2015-09-02 Thread Maximilian Michels
Nice suggestion. So you want to serialize and deserialize the InputFormats
on the Client to check whether they can be transferred correctly? Merely
serializing is not enough because the above Exception occurs during
deserialization.

On Wed, Sep 2, 2015 at 2:29 PM, Stephan Ewen  wrote:

> We should try to improve the exception here. More people will run into
> this issue and the exception should help them understand it well.
>
> How about we do eager serialization into a set of byte arrays? Then the
> serializability issue comes immediately when the program is constructed,
> rather than later, when it is shipped.
>
> On Wed, Sep 2, 2015 at 12:56 PM, Maximilian Michels 
> wrote:
>
>> Here's the JIRA issue: https://issues.apache.org/jira/browse/FLINK-2608
>>
>> On Wed, Sep 2, 2015 at 12:49 PM, Maximilian Michels 
>> wrote:
>>
>>> Hi Andreas,
>>>
>>> Thank you for reporting the problem and including the code to reproduce
>>> the problem. I think there is a problem with the class serialization or
>>> deserialization. Arrays.asList uses a private ArrayList class
>>> (java.util.Arrays$ArrayList) which is not the one you would normally use
>>> (java.util.ArrayList).
>>>
>>> I'll create a JIRA issue to keep track of the problem and to investigate
>>> further.
>>>
>>> Best regards,
>>> Max
>>>
>>> Here's the stack trace:
>>>
>>> Exception in thread "main"
>>> org.apache.flink.runtime.client.JobExecutionException: Cannot initialize
>>> task 'DataSource (at main(Test.java:32)
>>> (org.apache.flink.api.java.io.CollectionInputFormat))': Deserializing the
>>> InputFormat ([mytests.Test$TestClass@4d6025c5]) failed: unread block
>>> data
>>> at
>>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$4.apply(JobManager.scala:523)
>>> at
>>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$4.apply(JobManager.scala:507)
>>> at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>>> at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>>> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>>> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>>> at org.apache.flink.runtime.jobmanager.JobManager.org
>>> $apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:507)
>>> at
>>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:190)
>>> at
>>> scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
>>> at
>>> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
>>> at
>>> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
>>> at
>>> org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:43)
>>> at
>>> org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29)
>>> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
>>> at
>>> org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29)
>>> at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
>>> at
>>> org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:92)
>>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
>>> at akka.actor.ActorCell.invoke(ActorCell.scala:487)
>>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
>>> at akka.dispatch.Mailbox.run(Mailbox.scala:221)
>>> at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
>>> at
>>> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>> at
>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>> at
>>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>> at
>>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>> Caused by: java.lang.Exception: Deserializing the InputFormat
>>> ([mytests.Test$TestClass@4d6025c5]) failed: unread block data
>>> at
>>> org.apache.flink.runtime.jobgraph.InputFormatVertex.initializeOnMaster(InputFormatVertex.java:60)
>>> at
>>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$4.apply(JobManager.scala:520)
>>> ... 25 more
>>> Caused by: java.lang.IllegalStateException: unread block data
>>> at
>>> java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(ObjectInputStream.java:2424)
>>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1383)
>>> at
>>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)
>>> at
>>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
>>> at
>>> 

Re: Bug broadcasting objects (serialization issue)

2015-09-02 Thread Maximilian Michels
Here's the JIRA issue: https://issues.apache.org/jira/browse/FLINK-2608

On Wed, Sep 2, 2015 at 12:49 PM, Maximilian Michels  wrote:

> Hi Andreas,
>
> Thank you for reporting the problem and including the code to reproduce
> the problem. I think there is a problem with the class serialization or
> deserialization. Arrays.asList uses a private ArrayList class
> (java.util.Arrays$ArrayList) which is not the one you would normally use
> (java.util.ArrayList).
>
> I'll create a JIRA issue to keep track of the problem and to investigate
> further.
>
> Best regards,
> Max
>
> Here's the stack trace:
>
> Exception in thread "main"
> org.apache.flink.runtime.client.JobExecutionException: Cannot initialize
> task 'DataSource (at main(Test.java:32)
> (org.apache.flink.api.java.io.CollectionInputFormat))': Deserializing the
> InputFormat ([mytests.Test$TestClass@4d6025c5]) failed: unread block data
> at
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$4.apply(JobManager.scala:523)
> at
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$4.apply(JobManager.scala:507)
> at scala.collection.Iterator$class.foreach(Iterator.scala:727)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> at org.apache.flink.runtime.jobmanager.JobManager.org
> $apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:507)
> at
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:190)
> at
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
> at
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
> at
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
> at
> org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:43)
> at
> org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29)
> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
> at
> org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29)
> at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
> at
> org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:92)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
> at akka.actor.ActorCell.invoke(ActorCell.scala:487)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
> at akka.dispatch.Mailbox.run(Mailbox.scala:221)
> at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: java.lang.Exception: Deserializing the InputFormat
> ([mytests.Test$TestClass@4d6025c5]) failed: unread block data
> at
> org.apache.flink.runtime.jobgraph.InputFormatVertex.initializeOnMaster(InputFormatVertex.java:60)
> at
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$4.apply(JobManager.scala:520)
> ... 25 more
> Caused by: java.lang.IllegalStateException: unread block data
> at
> java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(ObjectInputStream.java:2424)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1383)
> at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)
> at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
> at
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:302)
> at
> org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:264)
> at
> org.apache.flink.runtime.operators.util.TaskConfig.getStubWrapper(TaskConfig.java:282)
> at
> org.apache.flink.runtime.jobgraph.InputFormatVertex.initializeOnMaster(InputFormatVertex.java:57)
> ... 26 more
>
> On Wed, Sep 2, 2015 at 11:17 AM, Andres R. Masegosa 
> wrote:
>
>> Hi,
>>
>> I get a bug when trying to broadcast a list of integers created with the
>> primitive "Arrays.asList(...)".
>>
>> For example, if you try to run this "wordcount" example, you can

Re: Bug broadcasting objects (serialization issue)

2015-09-02 Thread Stephan Ewen
We should try to improve the exception here. More people will run into this
issue and the exception should help them understand it well.

How about we do eager serialization into a set of byte arrays? Then the
serializability issue comes immediately when the program is constructed,
rather than later, when it is shipped.

On Wed, Sep 2, 2015 at 12:56 PM, Maximilian Michels  wrote:

> Here's the JIRA issue: https://issues.apache.org/jira/browse/FLINK-2608
>
> On Wed, Sep 2, 2015 at 12:49 PM, Maximilian Michels 
> wrote:
>
>> Hi Andreas,
>>
>> Thank you for reporting the problem and including the code to reproduce
>> the problem. I think there is a problem with the class serialization or
>> deserialization. Arrays.asList uses a private ArrayList class
>> (java.util.Arrays$ArrayList) which is not the one you would normally use
>> (java.util.ArrayList).
>>
>> I'll create a JIRA issue to keep track of the problem and to investigate
>> further.
>>
>> Best regards,
>> Max
>>
>> Here's the stack trace:
>>
>> Exception in thread "main"
>> org.apache.flink.runtime.client.JobExecutionException: Cannot initialize
>> task 'DataSource (at main(Test.java:32)
>> (org.apache.flink.api.java.io.CollectionInputFormat))': Deserializing the
>> InputFormat ([mytests.Test$TestClass@4d6025c5]) failed: unread block data
>> at
>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$4.apply(JobManager.scala:523)
>> at
>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$4.apply(JobManager.scala:507)
>> at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>> at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>> at org.apache.flink.runtime.jobmanager.JobManager.org
>> $apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:507)
>> at
>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:190)
>> at
>> scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
>> at
>> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
>> at
>> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
>> at
>> org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:43)
>> at
>> org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29)
>> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
>> at
>> org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29)
>> at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
>> at
>> org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:92)
>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
>> at akka.actor.ActorCell.invoke(ActorCell.scala:487)
>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
>> at akka.dispatch.Mailbox.run(Mailbox.scala:221)
>> at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
>> at
>> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>> at
>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>> at
>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>> at
>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>> Caused by: java.lang.Exception: Deserializing the InputFormat
>> ([mytests.Test$TestClass@4d6025c5]) failed: unread block data
>> at
>> org.apache.flink.runtime.jobgraph.InputFormatVertex.initializeOnMaster(InputFormatVertex.java:60)
>> at
>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$4.apply(JobManager.scala:520)
>> ... 25 more
>> Caused by: java.lang.IllegalStateException: unread block data
>> at
>> java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(ObjectInputStream.java:2424)
>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1383)
>> at
>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)
>> at
>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
>> at
>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
>> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
>> at
>> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:302)
>> at
>> org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:264)
>> 

Re: Bug broadcasting objects (serialization issue)

2015-09-02 Thread Stephan Ewen
I see.

Manual serialization implies also manual deserialization (on the workers
only), which would give a better exception.

BTW: There is an opportunity to fix two problems with one patch: The
framesize overflow for the input format, and the serialization.

On Wed, Sep 2, 2015 at 4:16 PM, Maximilian Michels  wrote:

> Ok but that would not prevent the above error, right? Serializing is
> not the issue here.
>
> Nevertheless, it would catch all errors during initial serialization.
> Deserializing has its own hazards due to possible Classloader issues.
>
> On Wed, Sep 2, 2015 at 4:05 PM, Stephan Ewen  wrote:
> > Yes, even serialize in the constructor. Then the failure (if
> serialization
> > does not work) comes immediately.
> >
> > On Wed, Sep 2, 2015 at 4:02 PM, Maximilian Michels 
> wrote:
> >>
> >> Nice suggestion. So you want to serialize and deserialize the
> InputFormats
> >> on the Client to check whether they can be transferred correctly? Merely
> >> serializing is not enough because the above Exception occurs during
> >> deserialization.
> >>
> >> On Wed, Sep 2, 2015 at 2:29 PM, Stephan Ewen  wrote:
> >>>
> >>> We should try to improve the exception here. More people will run into
> >>> this issue and the exception should help them understand it well.
> >>>
> >>> How about we do eager serialization into a set of byte arrays? Then the
> >>> serializability issue comes immediately when the program is
> constructed,
> >>> rather than later, when it is shipped.
> >>>
> >>> On Wed, Sep 2, 2015 at 12:56 PM, Maximilian Michels 
> >>> wrote:
> 
>  Here's the JIRA issue:
> https://issues.apache.org/jira/browse/FLINK-2608
> 
>  On Wed, Sep 2, 2015 at 12:49 PM, Maximilian Michels 
>  wrote:
> >
> > Hi Andreas,
> >
> > Thank you for reporting the problem and including the code to
> reproduce
> > the problem. I think there is a problem with the class serialization
> or
> > deserialization. Arrays.asList uses a private ArrayList class
> > (java.util.Arrays$ArrayList) which is not the one you would normally
> use
> > (java.util.ArrayList).
> >
> > I'll create a JIRA issue to keep track of the problem and to
> > investigate further.
> >
> > Best regards,
> > Max
> >
> > Here's the stack trace:
> >
> > Exception in thread "main"
> > org.apache.flink.runtime.client.JobExecutionException: Cannot
> initialize
> > task 'DataSource (at main(Test.java:32)
> > (org.apache.flink.api.java.io.CollectionInputFormat))':
> Deserializing the
> > InputFormat ([mytests.Test$TestClass@4d6025c5]) failed: unread
> block data
> > at
> >
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$4.apply(JobManager.scala:523)
> > at
> >
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$4.apply(JobManager.scala:507)
> > at scala.collection.Iterator$class.foreach(Iterator.scala:727)
> > at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
> > at
> > scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> > at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> > at
> > org.apache.flink.runtime.jobmanager.JobManager.org
> $apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:507)
> > at
> >
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:190)
> > at
> >
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
> > at
> >
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
> > at
> >
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
> > at
> >
> org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:43)
> > at
> >
> org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29)
> > at
> > scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
> > at
> >
> org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29)
> > at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
> > at
> >
> org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:92)
> > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
> > at akka.actor.ActorCell.invoke(ActorCell.scala:487)
> > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
> > at akka.dispatch.Mailbox.run(Mailbox.scala:221)
> > at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
> > at
> >