Re: ArrayIndexOutOfBoundsException when using repartitionAndSortWithinPartitions()
Created https://github.com/apache/spark/pull/8703 to make exception message more helpful. On Thu, Sep 10, 2015 at 1:24 PM, Ashish Shenoy wrote: > Yup thanks Ted. My getPartition() method had a bug where a signed int was > being moduloed with the number of partitions. Fixed that. > > Thanks, > Ashish > > On Thu, Sep 10, 2015 at 10:44 AM, Ted Yu wrote: > >> Here is snippet of ExternalSorter.scala where ArrayIndexOutOfBoundsException >> was thrown: >> >> while (iterator.hasNext) { >> val partitionId = iterator.nextPartition() >> iterator.writeNext(partitionWriters(partitionId)) >> } >> Meaning, partitionId was negative. >> Execute the following and examine the value of i: >> >> int i = -78 % 40; >> >> You will see how your getPartition() method should be refined to prevent >> this exception. >> >> On Thu, Sep 10, 2015 at 8:52 AM, Ashish Shenoy >> wrote: >> >>> I am using spark-1.4.1 >>> >>> Here's the skeleton code: >>> >>> JavaPairRDD rddPair = >>> rdd.repartitionAndSortWithinPartitions( >>> new CustomPartitioner(), new ExportObjectComparator()) >>> .persist(StorageLevel.MEMORY_AND_DISK_SER()); >>> >>> ... >>> >>> @SuppressWarnings("serial") >>> private static class CustomPartitioner extends Partitioner { >>> int numPartitions; >>> @Override >>> public int numPartitions() { >>> numPartitions = 40; >>> return numPartitions; >>> } >>> >>> @Override >>> public int getPartition(Object o) { >>> NewKey newKey = (NewKey) o; >>> return (int) newKey.getGsMinusURL() % numPartitions; >>> } >>> } >>> >>> ... >>> >>> @SuppressWarnings("serial") >>> private static class ExportObjectComparator >>> implements Serializable, Comparator { >>> @Override >>> public int compare(NewKey o1, NewKey o2) { >>> if (o1.hits == o2.hits) { >>> return 0; >>> } else if (o1.hits > o2.hits) { >>> return -1; >>> } else { >>> return 1; >>> } >>> } >>> >>> } >>> >>> ... >>> >>> >>> >>> Thanks, >>> Ashish >>> >>> On Wed, Sep 9, 2015 at 5:13 PM, Ted Yu wrote: >>> Which release of Spark are you using ? Can you show skeleton of your partitioner and comparator ? Thanks On Sep 9, 2015, at 4:45 PM, Ashish Shenoy wrote: Hi, I am trying to sort a RDD pair using repartitionAndSortWithinPartitions() for my key [which is a custom class, not a java primitive] using a custom partitioner on that key and a custom comparator. However, it fails consistently: org.apache.spark.SparkException: Job aborted due to stage failure: Task 18 in stage 1.0 failed 4 times, most recent failure: Lost task 18.3 in stage 1.0 (TID 202, 172.16.18.25): java.lang.ArrayIndexOutOfBoundsException: -78 at org.apache.spark.util.collection.ExternalSorter.spillToPartitionFiles(ExternalSorter.scala:375) at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:208) at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:62) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:70) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:70) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1273) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1264) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1263) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1263) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:730) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1457) at org.apache.spark.scheduler.DAGSchedulerEventP
Re: ArrayIndexOutOfBoundsException when using repartitionAndSortWithinPartitions()
Yup thanks Ted. My getPartition() method had a bug where a signed int was being moduloed with the number of partitions. Fixed that. Thanks, Ashish On Thu, Sep 10, 2015 at 10:44 AM, Ted Yu wrote: > Here is snippet of ExternalSorter.scala where ArrayIndexOutOfBoundsException > was thrown: > > while (iterator.hasNext) { > val partitionId = iterator.nextPartition() > iterator.writeNext(partitionWriters(partitionId)) > } > Meaning, partitionId was negative. > Execute the following and examine the value of i: > > int i = -78 % 40; > > You will see how your getPartition() method should be refined to prevent > this exception. > > On Thu, Sep 10, 2015 at 8:52 AM, Ashish Shenoy > wrote: > >> I am using spark-1.4.1 >> >> Here's the skeleton code: >> >> JavaPairRDD rddPair = >> rdd.repartitionAndSortWithinPartitions( >> new CustomPartitioner(), new ExportObjectComparator()) >> .persist(StorageLevel.MEMORY_AND_DISK_SER()); >> >> ... >> >> @SuppressWarnings("serial") >> private static class CustomPartitioner extends Partitioner { >> int numPartitions; >> @Override >> public int numPartitions() { >> numPartitions = 40; >> return numPartitions; >> } >> >> @Override >> public int getPartition(Object o) { >> NewKey newKey = (NewKey) o; >> return (int) newKey.getGsMinusURL() % numPartitions; >> } >> } >> >> ... >> >> @SuppressWarnings("serial") >> private static class ExportObjectComparator >> implements Serializable, Comparator { >> @Override >> public int compare(NewKey o1, NewKey o2) { >> if (o1.hits == o2.hits) { >> return 0; >> } else if (o1.hits > o2.hits) { >> return -1; >> } else { >> return 1; >> } >> } >> >> } >> >> ... >> >> >> >> Thanks, >> Ashish >> >> On Wed, Sep 9, 2015 at 5:13 PM, Ted Yu wrote: >> >>> Which release of Spark are you using ? >>> >>> Can you show skeleton of your partitioner and comparator ? >>> >>> Thanks >>> >>> >>> >>> On Sep 9, 2015, at 4:45 PM, Ashish Shenoy >>> wrote: >>> >>> Hi, >>> >>> I am trying to sort a RDD pair >>> using repartitionAndSortWithinPartitions() for my key [which is a custom >>> class, not a java primitive] using a custom partitioner on that key and a >>> custom comparator. However, it fails consistently: >>> >>> org.apache.spark.SparkException: Job aborted due to stage failure: Task >>> 18 in stage 1.0 failed 4 times, most recent failure: Lost task 18.3 in >>> stage 1.0 (TID 202, 172.16.18.25): >>> java.lang.ArrayIndexOutOfBoundsException: -78 >>> at >>> org.apache.spark.util.collection.ExternalSorter.spillToPartitionFiles(ExternalSorter.scala:375) >>> at >>> org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:208) >>> at >>> org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:62) >>> at >>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:70) >>> at >>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) >>> at org.apache.spark.scheduler.Task.run(Task.scala:70) >>> at >>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) >>> at >>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) >>> at >>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) >>> at java.lang.Thread.run(Thread.java:745) >>> >>> Driver stacktrace: >>> at org.apache.spark.scheduler.DAGScheduler.org >>> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1273) >>> at >>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1264) >>> at >>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1263) >>> at >>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) >>> at >>> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) >>> at >>> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1263) >>> at >>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730) >>> at >>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730) >>> at scala.Option.foreach(Option.scala:236) >>> at >>> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:730) >>> at >>> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1457) >>> at >>> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1418) >>> at >>> org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) >>> >>> I also persist the RDD using the "memory and disk" storage level. The >>> stack trace above comes from spark's code and not my application code. Can >>> you pls
Re: ArrayIndexOutOfBoundsException when using repartitionAndSortWithinPartitions()
Here is snippet of ExternalSorter.scala where ArrayIndexOutOfBoundsException was thrown: while (iterator.hasNext) { val partitionId = iterator.nextPartition() iterator.writeNext(partitionWriters(partitionId)) } Meaning, partitionId was negative. Execute the following and examine the value of i: int i = -78 % 40; You will see how your getPartition() method should be refined to prevent this exception. On Thu, Sep 10, 2015 at 8:52 AM, Ashish Shenoy wrote: > I am using spark-1.4.1 > > Here's the skeleton code: > > JavaPairRDD rddPair = > rdd.repartitionAndSortWithinPartitions( > new CustomPartitioner(), new ExportObjectComparator()) > .persist(StorageLevel.MEMORY_AND_DISK_SER()); > > ... > > @SuppressWarnings("serial") > private static class CustomPartitioner extends Partitioner { > int numPartitions; > @Override > public int numPartitions() { > numPartitions = 40; > return numPartitions; > } > > @Override > public int getPartition(Object o) { > NewKey newKey = (NewKey) o; > return (int) newKey.getGsMinusURL() % numPartitions; > } > } > > ... > > @SuppressWarnings("serial") > private static class ExportObjectComparator > implements Serializable, Comparator { > @Override > public int compare(NewKey o1, NewKey o2) { > if (o1.hits == o2.hits) { > return 0; > } else if (o1.hits > o2.hits) { > return -1; > } else { > return 1; > } > } > > } > > ... > > > > Thanks, > Ashish > > On Wed, Sep 9, 2015 at 5:13 PM, Ted Yu wrote: > >> Which release of Spark are you using ? >> >> Can you show skeleton of your partitioner and comparator ? >> >> Thanks >> >> >> >> On Sep 9, 2015, at 4:45 PM, Ashish Shenoy >> wrote: >> >> Hi, >> >> I am trying to sort a RDD pair using repartitionAndSortWithinPartitions() >> for my key [which is a custom class, not a java primitive] using a custom >> partitioner on that key and a custom comparator. However, it fails >> consistently: >> >> org.apache.spark.SparkException: Job aborted due to stage failure: Task >> 18 in stage 1.0 failed 4 times, most recent failure: Lost task 18.3 in >> stage 1.0 (TID 202, 172.16.18.25): >> java.lang.ArrayIndexOutOfBoundsException: -78 >> at >> org.apache.spark.util.collection.ExternalSorter.spillToPartitionFiles(ExternalSorter.scala:375) >> at >> org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:208) >> at >> org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:62) >> at >> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:70) >> at >> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) >> at org.apache.spark.scheduler.Task.run(Task.scala:70) >> at >> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) >> at >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) >> at >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) >> at java.lang.Thread.run(Thread.java:745) >> >> Driver stacktrace: >> at org.apache.spark.scheduler.DAGScheduler.org >> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1273) >> at >> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1264) >> at >> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1263) >> at >> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) >> at >> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) >> at >> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1263) >> at >> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730) >> at >> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730) >> at scala.Option.foreach(Option.scala:236) >> at >> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:730) >> at >> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1457) >> at >> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1418) >> at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) >> >> I also persist the RDD using the "memory and disk" storage level. The >> stack trace above comes from spark's code and not my application code. Can >> you pls point out what I am doing wrong ? >> >> Thanks, >> Ashish >> >> >
Re: ArrayIndexOutOfBoundsException when using repartitionAndSortWithinPartitions()
I am using spark-1.4.1 Here's the skeleton code: JavaPairRDD rddPair = rdd.repartitionAndSortWithinPartitions( new CustomPartitioner(), new ExportObjectComparator()) .persist(StorageLevel.MEMORY_AND_DISK_SER()); ... @SuppressWarnings("serial") private static class CustomPartitioner extends Partitioner { int numPartitions; @Override public int numPartitions() { numPartitions = 40; return numPartitions; } @Override public int getPartition(Object o) { NewKey newKey = (NewKey) o; return (int) newKey.getGsMinusURL() % numPartitions; } } ... @SuppressWarnings("serial") private static class ExportObjectComparator implements Serializable, Comparator { @Override public int compare(NewKey o1, NewKey o2) { if (o1.hits == o2.hits) { return 0; } else if (o1.hits > o2.hits) { return -1; } else { return 1; } } } ... Thanks, Ashish On Wed, Sep 9, 2015 at 5:13 PM, Ted Yu wrote: > Which release of Spark are you using ? > > Can you show skeleton of your partitioner and comparator ? > > Thanks > > > > On Sep 9, 2015, at 4:45 PM, Ashish Shenoy > wrote: > > Hi, > > I am trying to sort a RDD pair using repartitionAndSortWithinPartitions() > for my key [which is a custom class, not a java primitive] using a custom > partitioner on that key and a custom comparator. However, it fails > consistently: > > org.apache.spark.SparkException: Job aborted due to stage failure: Task 18 > in stage 1.0 failed 4 times, most recent failure: Lost task 18.3 in stage > 1.0 (TID 202, 172.16.18.25): java.lang.ArrayIndexOutOfBoundsException: -78 > at > org.apache.spark.util.collection.ExternalSorter.spillToPartitionFiles(ExternalSorter.scala:375) > at > org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:208) > at > org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:62) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:70) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) > at org.apache.spark.scheduler.Task.run(Task.scala:70) > at > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > > Driver stacktrace: > at org.apache.spark.scheduler.DAGScheduler.org > $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1273) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1264) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1263) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at > scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) > at > org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1263) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730) > at scala.Option.foreach(Option.scala:236) > at > org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:730) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1457) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1418) > at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) > > I also persist the RDD using the "memory and disk" storage level. The > stack trace above comes from spark's code and not my application code. Can > you pls point out what I am doing wrong ? > > Thanks, > Ashish > >
Re: ArrayIndexOutOfBoundsException when using repartitionAndSortWithinPartitions()
Which release of Spark are you using ? Can you show skeleton of your partitioner and comparator ? Thanks > On Sep 9, 2015, at 4:45 PM, Ashish Shenoy wrote: > > Hi, > > I am trying to sort a RDD pair using repartitionAndSortWithinPartitions() for > my key [which is a custom class, not a java primitive] using a custom > partitioner on that key and a custom comparator. However, it fails > consistently: > > org.apache.spark.SparkException: Job aborted due to stage failure: Task 18 in > stage 1.0 failed 4 times, most recent failure: Lost task 18.3 in stage 1.0 > (TID 202, 172.16.18.25): java.lang.ArrayIndexOutOfBoundsException: -78 > at > org.apache.spark.util.collection.ExternalSorter.spillToPartitionFiles(ExternalSorter.scala:375) > at > org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:208) > at > org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:62) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:70) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) > at org.apache.spark.scheduler.Task.run(Task.scala:70) > at > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > > Driver stacktrace: > at > org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1273) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1264) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1263) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) > at > org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1263) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730) > at scala.Option.foreach(Option.scala:236) > at > org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:730) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1457) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1418) > at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) > > I also persist the RDD using the "memory and disk" storage level. The stack > trace above comes from spark's code and not my application code. Can you pls > point out what I am doing wrong ? > > Thanks, > Ashish
ArrayIndexOutOfBoundsException when using repartitionAndSortWithinPartitions()
Hi, I am trying to sort a RDD pair using repartitionAndSortWithinPartitions() for my key [which is a custom class, not a java primitive] using a custom partitioner on that key and a custom comparator. However, it fails consistently: org.apache.spark.SparkException: Job aborted due to stage failure: Task 18 in stage 1.0 failed 4 times, most recent failure: Lost task 18.3 in stage 1.0 (TID 202, 172.16.18.25): java.lang.ArrayIndexOutOfBoundsException: -78 at org.apache.spark.util.collection.ExternalSorter.spillToPartitionFiles(ExternalSorter.scala:375) at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:208) at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:62) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:70) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:70) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1273) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1264) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1263) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1263) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:730) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1457) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1418) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) I also persist the RDD using the "memory and disk" storage level. The stack trace above comes from spark's code and not my application code. Can you pls point out what I am doing wrong ? Thanks, Ashish