Re: ArrayIndexOutOfBoundsException when using repartitionAndSortWithinPartitions()
Yup thanks Ted. My getPartition() method had a bug where a signed int was being moduloed with the number of partitions. Fixed that. Thanks, Ashish On Thu, Sep 10, 2015 at 10:44 AM, Ted Yu wrote: > Here is snippet of ExternalSorter.scala where ArrayIndexOutOfBoundsException > was thrown: > > while (iterator.hasNext) { > val partitionId = iterator.nextPartition() > iterator.writeNext(partitionWriters(partitionId)) > } > Meaning, partitionId was negative. > Execute the following and examine the value of i: > > int i = -78 % 40; > > You will see how your getPartition() method should be refined to prevent > this exception. > > On Thu, Sep 10, 2015 at 8:52 AM, Ashish Shenoy > wrote: > >> I am using spark-1.4.1 >> >> Here's the skeleton code: >> >> JavaPairRDD rddPair = >> rdd.repartitionAndSortWithinPartitions( >> new CustomPartitioner(), new ExportObjectComparator()) >> .persist(StorageLevel.MEMORY_AND_DISK_SER()); >> >> ... >> >> @SuppressWarnings("serial") >> private static class CustomPartitioner extends Partitioner { >> int numPartitions; >> @Override >> public int numPartitions() { >> numPartitions = 40; >> return numPartitions; >> } >> >> @Override >> public int getPartition(Object o) { >> NewKey newKey = (NewKey) o; >> return (int) newKey.getGsMinusURL() % numPartitions; >> } >> } >> >> ... >> >> @SuppressWarnings("serial") >> private static class ExportObjectComparator >> implements Serializable, Comparator { >> @Override >> public int compare(NewKey o1, NewKey o2) { >> if (o1.hits == o2.hits) { >> return 0; >> } else if (o1.hits > o2.hits) { >> return -1; >> } else { >> return 1; >> } >> } >> >> } >> >> ... >> >> >> >> Thanks, >> Ashish >> >> On Wed, Sep 9, 2015 at 5:13 PM, Ted Yu wrote: >> >>> Which release of Spark are you using ? >>> >>> Can you show skeleton of your partitioner and comparator ? >>> >>> Thanks >>> >>> >>> >>> On Sep 9, 2015, at 4:45 PM, Ashish Shenoy >>> wrote: >>> >>> Hi, >>> >>> I am trying to sort a RDD pair >>> using repartitionAndSortWithinPartitions() for my key [which is a custom >>> class, not a java primitive] using a custom partitioner on that key and a >>> custom comparator. However, it fails consistently: >>> >>> org.apache.spark.SparkException: Job aborted due to stage failure: Task >>> 18 in stage 1.0 failed 4 times, most recent failure: Lost task 18.3 in >>> stage 1.0 (TID 202, 172.16.18.25): >>> java.lang.ArrayIndexOutOfBoundsException: -78 >>> at >>> org.apache.spark.util.collection.ExternalSorter.spillToPartitionFiles(ExternalSorter.scala:375) >>> at >>> org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:208) >>> at >>> org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:62) >>> at >>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:70) >>> at >>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) >>> at org.apache.spark.scheduler.Task.run(Task.scala:70) >>> at >>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) >>> at >>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) >>> at >>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) >>> at java.lang.Thread.run(Thread.java:745) >>> >>> Driver stacktrace: >>> at org.apache.spark.scheduler.DAGScheduler.org >>> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1273) >>> at >>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1264) >>> at >>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1263) >>> at >>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) >>> at >>> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) >>> at >>> org.apache.spark.scheduler.DAGScheduler.abortSta
Re: ArrayIndexOutOfBoundsException when using repartitionAndSortWithinPartitions()
I am using spark-1.4.1 Here's the skeleton code: JavaPairRDD rddPair = rdd.repartitionAndSortWithinPartitions( new CustomPartitioner(), new ExportObjectComparator()) .persist(StorageLevel.MEMORY_AND_DISK_SER()); ... @SuppressWarnings("serial") private static class CustomPartitioner extends Partitioner { int numPartitions; @Override public int numPartitions() { numPartitions = 40; return numPartitions; } @Override public int getPartition(Object o) { NewKey newKey = (NewKey) o; return (int) newKey.getGsMinusURL() % numPartitions; } } ... @SuppressWarnings("serial") private static class ExportObjectComparator implements Serializable, Comparator { @Override public int compare(NewKey o1, NewKey o2) { if (o1.hits == o2.hits) { return 0; } else if (o1.hits > o2.hits) { return -1; } else { return 1; } } } ... Thanks, Ashish On Wed, Sep 9, 2015 at 5:13 PM, Ted Yu wrote: > Which release of Spark are you using ? > > Can you show skeleton of your partitioner and comparator ? > > Thanks > > > > On Sep 9, 2015, at 4:45 PM, Ashish Shenoy > wrote: > > Hi, > > I am trying to sort a RDD pair using repartitionAndSortWithinPartitions() > for my key [which is a custom class, not a java primitive] using a custom > partitioner on that key and a custom comparator. However, it fails > consistently: > > org.apache.spark.SparkException: Job aborted due to stage failure: Task 18 > in stage 1.0 failed 4 times, most recent failure: Lost task 18.3 in stage > 1.0 (TID 202, 172.16.18.25): java.lang.ArrayIndexOutOfBoundsException: -78 > at > org.apache.spark.util.collection.ExternalSorter.spillToPartitionFiles(ExternalSorter.scala:375) > at > org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:208) > at > org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:62) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:70) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) > at org.apache.spark.scheduler.Task.run(Task.scala:70) > at > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > > Driver stacktrace: > at org.apache.spark.scheduler.DAGScheduler.org > $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1273) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1264) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1263) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at > scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) > at > org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1263) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730) > at scala.Option.foreach(Option.scala:236) > at > org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:730) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1457) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1418) > at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) > > I also persist the RDD using the "memory and disk" storage level. The > stack trace above comes from spark's code and not my application code. Can > you pls point out what I am doing wrong ? > > Thanks, > Ashish > >
ArrayIndexOutOfBoundsException when using repartitionAndSortWithinPartitions()
Hi, I am trying to sort a RDD pair using repartitionAndSortWithinPartitions() for my key [which is a custom class, not a java primitive] using a custom partitioner on that key and a custom comparator. However, it fails consistently: org.apache.spark.SparkException: Job aborted due to stage failure: Task 18 in stage 1.0 failed 4 times, most recent failure: Lost task 18.3 in stage 1.0 (TID 202, 172.16.18.25): java.lang.ArrayIndexOutOfBoundsException: -78 at org.apache.spark.util.collection.ExternalSorter.spillToPartitionFiles(ExternalSorter.scala:375) at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:208) at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:62) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:70) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:70) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1273) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1264) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1263) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1263) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:730) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1457) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1418) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) I also persist the RDD using the "memory and disk" storage level. The stack trace above comes from spark's code and not my application code. Can you pls point out what I am doing wrong ? Thanks, Ashish