Which Spark version are you using? AFAIK the corruption bugs in sort-based
shuffle should have been fixed in newer Spark releases.

On Wed, Jun 24, 2015 at 12:25 PM, Piero Cinquegrana <
pcinquegr...@marketshare.com> wrote:

>  Switching spark.shuffle.manager from sort to hash fixed this issue as
> documented here:
>
>
>
> https://issues.apache.org/jira/browse/SPARK-4105
>
>
>
>
>
> *From:* Piero Cinquegrana [mailto:pcinquegr...@marketshare.com]
> *Sent:* Wednesday, June 24, 2015 12:09 PM
> *To:* 'user@spark.apache.org'
> *Subject:* com.esotericsoftware.kryo.KryoException: java.io.IOException:
> failed to read chunk
>
>
>
> Hello Spark Experts,
>
>
>
> I am facing the following issue.
>
>
>
> 1)      I am converting a org.apache.spark.sql.Row into
> org.apache.spark.mllib.linalg.Vectors using sparse notation
>
> 2)      After the parsing proceeds successfully I try to look at the
> result and I get the following error: com.esotericsoftware.kryo.KryoException:
> java.io.IOException: failed to read chunk
>
>
>
> Has anybody experience this before? I noticed this bug
> https://issues.apache.org/jira/browse/SPARK-4105
>
>
>
> Is it related?
>
>
>
> This is the command that creates the RDD of Sparse vectors.
>
>
>
> >  val parsedData = stack.map(row => LabeledPoint(row.getDouble(4),
> sparseVectorCat(row, CategoriesIdx, InteractionIds, tupleMap, vecLength)))
>
>
>
> parsedData:
> org.apache.spark.rdd.RDD[org.apache.spark.mllib.regression.LabeledPoint] =
> MapPartitionsRDD[64] at map at DataFrame.scala:848
>
>
>
>
>
> Here is how the data looks like after the conversion into sparse vectors
>
>
>
> > val test2 = stack.filter("rows[1] is not null").head(10)
>
>
>
> test2: Array[org.apache.spark.sql.Row] = Array([2014-08-17
> 20:19:00,2014-08-17
> 20:19:00,545,Greenville-N.Bern-Washngtn,0.0,EST5EDT,Sunday,20,19,33,8,2014,Sunday|20,545,2014-08-17
> 20:19:00,ArrayBuffer(37!2014-08-17 20:19:00!2014-08-17 20:19:50!2014-08-17
> 20:19:50!2014-08-17
> 20:19:50!545!Greenville-N.Bern-Washngtn!EST5EDT!UNKNOWN!10!CNN!Prime!M!UNKNOWN|10|CNN|Prime|M!2019!0.006461420296348449,
> 37!2014-08-17 20:19:00!2014-08-17 20:19:45!2014-08-17 20:19:45!2014-08-17
> 20:19:45!545!Greenville-N.Bern-Washngtn!EST5EDT!UNKNOWN!5!CNN!Prime!M!UNKNOWN|5|CNN|Prime|M!2019!0.006461420296348449)],
> [2014-08-17 23:45:00,2014-08-18
> 00:45:00,625,Waco-Temple-Bryan,0.0,CST6CDT,Sunday,23,45,33,8,2014,Sunday|23,625,2014-08-18
> 00:45:00,ArrayBuffer(276!2014-08-18 00:45:00!2014-08-18 00:45:14!2014-08-17
> 23:45:14...
>
>
>
> > val parsedTest2 = test2.map(row => LabeledPoint(row.getDouble(4),
> sparseVectorCat(row, CategoriesIdx, InteractionIds, tupleMap, vecLength)))
>
>
>
> parsedTest2: Array[org.apache.spark.mllib.regression.LabeledPoint] =
> Array((0.0,(2450,[241,1452,1480,1608,1706],[1.0,1.0,2019.0,2019.0,1.0])),
> (0.0,(2450,[6,1133,1342,2184,2314],[1.0,1.0,1050.0,1244.0,1.0])),
> (0.0,(2450,[414,1133,1310,1605,2206],[941.0,1.0,1.0,1.0,907.0])),
> (0.0,(2450,[322,761,981,1203,1957],[5203.0,1.0,1.0,1.0,5203.0])),
> (1.0,(2450,[117,322,943,1757,1957],[1.0,910.0,1.0,1.0,910.0])),
> (0.0,(2450,[645,1018,1074,1778,1974],[1.0,1507.0,1.0,1507.0,1.0])),
> (0.0,(2450,[522,542,814,1128,1749],[1.0,432.0,796.0,1.0,1.0])),
> (0.0,(2450,[166,322,413,1706,2256],[1.0,1260.0,1.0,1.0,1260.0])),
> (1.0,(2450,[1203,1295,1354,2189,2388],[1.0,1.0,1.0,3705.0,2823.0])),
> (6.0,(2450,[293,1203,1312,1627,2035],[2716.0,1.0,1.0,1.0,2716.0])))
>
>
>
>
>
> > parsedData.first
>
>
>
> org.apache.spark.SparkException: Job aborted due to stage failure: Task 0
> in stage 52.0 failed 4 times, most recent failure: Lost task 0.3 in stage
> 52.0 (TID 4243, ip-10-0-0-174.ec2.internal):
> com.esotericsoftware.kryo.KryoException: java.io.IOException: failed to
> read chunk
>
> Serialization trace:
>
> values (org.apache.spark.sql.catalyst.expressions.GenericMutableRow)
>
>             at com.esotericsoftware.kryo.io.Input.fill(Input.java:142)
>
>             at com.esotericsoftware.kryo.io.Input.require(Input.java:169)
>
>             at
> com.esotericsoftware.kryo.io.Input.readUtf8_slow(Input.java:524)
>
>             at com.esotericsoftware.kryo.io.Input.readUtf8(Input.java:517)
>
>             at
> com.esotericsoftware.kryo.io.Input.readString(Input.java:447)
>
>             at
> com.esotericsoftware.kryo.serializers.DefaultSerializers$StringSerializer.read(DefaultSerializers.java:157)
>
>             at
> com.esotericsoftware.kryo.serializers.DefaultSerializers$StringSerializer.read(DefaultSerializers.java:146)
>
>             at
> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:732)
>
>             at
> com.twitter.chill.TraversableSerializer.read(Traversable.scala:43)
>
>             at
> com.twitter.chill.TraversableSerializer.read(Traversable.scala:21)
>
>             at
> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:732)
>
>             at
> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:338)
>
>             at
> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:293)
>
>             at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:651)
>
>             at
> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605)
>
>             at
> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
>
>             at
> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:732)
>
>             at
> com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:42)
>
>             at
> com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:33)
>
>             at
> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:732)
>
>             at
> org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:138)
>
>             at
> org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:133)
>
>             at
> org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
>
>             at
> org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
>
>             at
> scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
>
>             at
> org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
>
>             at
> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
>
>             at
> scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>
>             at
> scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>
>             at org.apache.spark.sql.execution.joins.HashOuterJoin.org
> $apache$spark$sql$execution$joins$HashOuterJoin$$buildHashTable(HashOuterJoin.scala:170)
>
>             at
> org.apache.spark.sql.execution.joins.HashOuterJoin$$anonfun$execute$1.apply(HashOuterJoin.scala:193)
>
>             at
> org.apache.spark.sql.execution.joins.HashOuterJoin$$anonfun$execute$1.apply(HashOuterJoin.scala:188)
>
>             at
> org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:88)
>
>             at
> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>
>             at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
>
>             at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>
>             at
> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>
>             at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
>
>             at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>
>             at
> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>
>             at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
>
>             at
> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
>
>             at org.apache.spark.scheduler.Task.run(Task.scala:64)
>
>             at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
>
>             at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>
>             at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>
>             at java.lang.Thread.run(Thread.java:745)
>
> Caused by: java.io.IOException: failed to read chunk
>
>             at
> org.xerial.snappy.SnappyInputStream.hasNextChunk(SnappyInputStream.java:347)
>
>             at
> org.xerial.snappy.SnappyInputStream.rawRead(SnappyInputStream.java:158)
>
>             at
> org.xerial.snappy.SnappyInputStream.read(SnappyInputStream.java:142)
>
>             at com.esotericsoftware.kryo.io.Input.fill(Input.java:140)
>
>             ... 46 more
>
>
>
> Driver stacktrace:
>
>             at org.apache.spark.scheduler.DAGScheduler.org
> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1230)
>
>             at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1219)
>
>             at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1218)
>
>             at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>
>             at
> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>
>             at
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1218)
>
>             at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:719)
>
>             at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:719)
>
>             at scala.Option.foreach(Option.scala:236)
>
>             at
> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:719)
>
>             at
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1419)
>
>             at
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1380)
>
>             at
> org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
>
>
>

Reply via email to