[ 
https://issues.apache.org/jira/browse/FLINK-2608?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15568745#comment-15568745
 ] 

ASF GitHub Bot commented on FLINK-2608:
---------------------------------------

Github user rmetzger commented on the issue:

    https://github.com/apache/flink/pull/2623
  
    This change adds Kryo-shaded to our dependency tree:
    
    ```
    [INFO] |  +- com.twitter:chill_2.10:jar:0.8.1:compile
    [INFO] |  |  +- com.twitter:chill-java:jar:0.8.1:compile
    [INFO] |  |  \- com.esotericsoftware:kryo-shaded:jar:3.0.3:compile
    [INFO] |  |     \- com.esotericsoftware:minlog:jar:1.3.0:compile
    ```
    I suspect that maven is not recognizing this because chill seems to depend 
on `kryo-shaded`.
    Apparently, `kryo-shaded` has a shaded ASM version included, but it is not 
relocating the regular Kryo classes. So we'll end up having two Kryo versions 
in our classpath.
    So if we want to upgrade Kryo, we need to do it explicitly, to avoid having 
two Kryo versions in our classpath.
    
    Another issue we need to consider is the serialization compatibility. 
Savepoints in Flink could potentially contain data serialized with Kryo 2.24. 
If we want to provide savepoint compatibility between Flink 1.1 and 1.2, we 
need to consider that.
    According to the Kryo documentation, 2.24 to 3.0.0 is serialization 
compatible (I hope the same holds true for chill): 
https://github.com/EsotericSoftware/kryo/blob/master/CHANGES.md#2240---300-2014-10-04
    I would like to hear @StephanEwen and @uce's opinion on this.


> Arrays.asList(..) does not work with CollectionInputFormat
> ----------------------------------------------------------
>
>                 Key: FLINK-2608
>                 URL: https://issues.apache.org/jira/browse/FLINK-2608
>             Project: Flink
>          Issue Type: Bug
>          Components: Type Serialization System
>    Affects Versions: 0.9, 0.10.0
>            Reporter: Maximilian Michels
>            Priority: Minor
>             Fix For: 1.0.0
>
>
> When using Arrays.asList(..) as input for a CollectionInputFormat, the 
> serialization/deserialization fails when deploying the task.
> See the following program:
> {code:java}
> public class WordCountExample {
>     public static void main(String[] args) throws Exception {
>         final ExecutionEnvironment env =
> ExecutionEnvironment.getExecutionEnvironment();
>     DataSet<String> text = env.fromElements(
>                 "Who's there?",
>                 "I think I hear them. Stand, ho! Who's there?");
>         // DOES NOT WORK
>         List<Integer> elements = Arrays.asList(0, 0, 0);
>         // The following works:
>         //List<Integer> elements = new ArrayList<>(new int[] {0,0,0});
>         DataSet<TestClass> set = env.fromElements(new TestClass(elements));
>         DataSet<Tuple2<String, Integer>> wordCounts = text
>                 .flatMap(new LineSplitter())
>                 .withBroadcastSet(set, "set")
>                 .groupBy(0)
>                 .sum(1);
>         wordCounts.print();
>     }
>     public static class LineSplitter implements FlatMapFunction<String,
> Tuple2<String, Integer>> {
>         @Override
>         public void flatMap(String line, Collector<Tuple2<String,
> Integer>> out) {
>             for (String word : line.split(" ")) {
>                 out.collect(new Tuple2<String, Integer>(word, 1));
>             }
>         }
>     }
>     public static class TestClass implements Serializable {
>         private static final long serialVersionUID = -2932037991574118651L;
>         List<Integer> integerList;
>         public TestClass(List<Integer> integerList){
>             this.integerList=integerList;
>         }
>     }
> {code}
> {noformat}
> Exception in thread "main" 
> org.apache.flink.runtime.client.JobExecutionException: Cannot initialize task 
> 'DataSource (at main(Test.java:32) 
> (org.apache.flink.api.java.io.CollectionInputFormat))': Deserializing the 
> InputFormat ([mytests.Test$TestClass@4d6025c5]) failed: unread block data
>     at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$4.apply(JobManager.scala:523)
>     at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$4.apply(JobManager.scala:507)
>     at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>     at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>     at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>     at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>     at 
> org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:507)
>     at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:190)
>     at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
>     at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
>     at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
>     at 
> org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:43)
>     at 
> org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29)
>     at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
>     at 
> org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29)
>     at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
>     at 
> org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:92)
>     at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
>     at akka.actor.ActorCell.invoke(ActorCell.scala:487)
>     at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
>     at akka.dispatch.Mailbox.run(Mailbox.scala:221)
>     at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
>     at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>     at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>     at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>     at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: java.lang.Exception: Deserializing the InputFormat 
> ([mytests.Test$TestClass@4d6025c5]) failed: unread block data
>     at 
> org.apache.flink.runtime.jobgraph.InputFormatVertex.initializeOnMaster(InputFormatVertex.java:60)
>     at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$4.apply(JobManager.scala:520)
>     ... 25 more
> Caused by: java.lang.IllegalStateException: unread block data
>     at 
> java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(ObjectInputStream.java:2424)
>     at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1383)
>     at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)
>     at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
>     at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
>     at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
>     at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
>     at 
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:302)
>     at 
> org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:264)
>     at 
> org.apache.flink.runtime.operators.util.TaskConfig.getStubWrapper(TaskConfig.java:282)
>     at 
> org.apache.flink.runtime.jobgraph.InputFormatVertex.initializeOnMaster(InputFormatVertex.java:57)
>     ... 26 more
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to