[ https://issues.apache.org/jira/browse/SPARK-17204?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15791884#comment-15791884 ]
Michael Allman commented on SPARK-17204: ---------------------------------------- I'm 99% sure I've fixed this. I'll submit a PR in the coming days. > Spark 2.0 off heap RDD persistence with replication factor 2 leads to > in-memory data corruption > ----------------------------------------------------------------------------------------------- > > Key: SPARK-17204 > URL: https://issues.apache.org/jira/browse/SPARK-17204 > Project: Spark > Issue Type: Bug > Components: Spark Core > Affects Versions: 2.0.0 > Reporter: Michael Allman > > We use the {{OFF_HEAP}} storage level extensively with great success. We've > tried off-heap storage with replication factor 2 and have always received > exceptions on the executor side very shortly after starting the job. For > example: > {code} > com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: > 9086 > at > com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:137) > at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:670) > at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:781) > at > org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:229) > at > org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:169) > at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73) > at > org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32) > at > org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) > at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:461) > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificColumnarIterator.hasNext(Unknown > Source) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithoutKey$(Unknown > Source) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown > Source) > at > org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) > at > org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370) > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) > at > org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47) > at org.apache.spark.scheduler.Task.run(Task.scala:85) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > {code} > or > {code} > java.lang.IndexOutOfBoundsException: Index: 6, Size: 0 > at java.util.ArrayList.rangeCheck(ArrayList.java:653) > at java.util.ArrayList.get(ArrayList.java:429) > at > com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:60) > at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:834) > at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:788) > at > org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:229) > at > org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:169) > at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73) > at > org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32) > at > org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) > at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:461) > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificColumnarIterator.hasNext(Unknown > Source) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithoutKey$(Unknown > Source) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown > Source) > at > org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) > at > org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370) > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) > at > org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47) > at org.apache.spark.scheduler.Task.run(Task.scala:85) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > {code} > or > {code} > java.lang.NullPointerException > at > org.apache.spark.sql.execution.columnar.InMemoryTableScanExec$$anonfun$doExecute$1$$anonfun$6.apply(InMemoryTableScanExec.scala:141) > at > org.apache.spark.sql.execution.columnar.InMemoryTableScanExec$$anonfun$doExecute$1$$anonfun$6.apply(InMemoryTableScanExec.scala:140) > at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:463) > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificColumnarIterator.hasNext(Unknown > Source) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificColumnarIterator.hasNext(Unknown > Source) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithoutKey$(Unknown > Source) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown > Source) > at > org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) > at > org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370) > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) > at > org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47) > at org.apache.spark.scheduler.Task.run(Task.scala:85) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > {code} > We've tried switching to Java serialization and get a different exception: > {code} > java.io.StreamCorruptedException: invalid stream header: 780000D0 > at > java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.java:808) > at java.io.ObjectInputStream.<init>(ObjectInputStream.java:301) > at > org.apache.spark.serializer.JavaDeserializationStream$$anon$1.<init>(JavaSerializer.scala:63) > at > org.apache.spark.serializer.JavaDeserializationStream.<init>(JavaSerializer.scala:63) > at > org.apache.spark.serializer.JavaSerializerInstance.deserializeStream(JavaSerializer.scala:122) > at > org.apache.spark.serializer.SerializerManager.dataDeserializeStream(SerializerManager.scala:146) > at > org.apache.spark.storage.BlockManager.getLocalValues(BlockManager.scala:433) > at > org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:672) > at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:330) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:281) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47) > at org.apache.spark.scheduler.Task.run(Task.scala:85) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > {code} > This suggest some kind of memory corruption to me. > I've been able to consistently reproduce this problem in local-cluster mode > with a very simple code snippet. First, start a spark shell like this: > {noformat} > MASTER=local-cluster[2,1,1024] ./spark-2.1/bin/spark-shell --conf > spark.memory.offHeap.enabled=true --conf spark.memory.offHeap.size=1024 > {noformat} > Then run the following: > {code} > import org.apache.spark.storage.StorageLevel > val OFF_HEAP_2 = StorageLevel(useDisk = true, useMemory = true, useOffHeap = > true, deserialized = false, replication = 2) > sc.range(0, 100).persist(OFF_HEAP_2).count > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org