Re: ArrayIndexOutOfBoundsException when using repartitionAndSortWithinPartitions()

2015-09-10 Thread Ashish Shenoy
Yup thanks Ted. My getPartition() method had a bug where a signed int was
being moduloed with the number of partitions. Fixed that.

Thanks,
Ashish

On Thu, Sep 10, 2015 at 10:44 AM, Ted Yu  wrote:

> Here is snippet of ExternalSorter.scala where ArrayIndexOutOfBoundsException
> was thrown:
>
> while (iterator.hasNext) {
>   val partitionId = iterator.nextPartition()
>   iterator.writeNext(partitionWriters(partitionId))
> }
> Meaning, partitionId was negative.
> Execute the following and examine the value of i:
>
> int i = -78 % 40;
>
> You will see how your getPartition() method should be refined to prevent
> this exception.
>
> On Thu, Sep 10, 2015 at 8:52 AM, Ashish Shenoy 
> wrote:
>
>> I am using spark-1.4.1
>>
>> Here's the skeleton code:
>>
>> JavaPairRDD rddPair =
>>   rdd.repartitionAndSortWithinPartitions(
>>   new CustomPartitioner(), new ExportObjectComparator())
>> .persist(StorageLevel.MEMORY_AND_DISK_SER());
>>
>> ...
>>
>> @SuppressWarnings("serial")
>> private static class CustomPartitioner extends Partitioner {
>>   int numPartitions;
>>   @Override
>>   public int numPartitions() {
>> numPartitions = 40;
>> return numPartitions;
>>   }
>>
>>   @Override
>>   public int getPartition(Object o) {
>> NewKey newKey = (NewKey) o;
>> return (int) newKey.getGsMinusURL() % numPartitions;
>>   }
>> }
>>
>> ...
>>
>> @SuppressWarnings("serial")
>> private static class ExportObjectComparator
>>   implements Serializable, Comparator {
>>   @Override
>>   public int compare(NewKey o1, NewKey o2) {
>> if (o1.hits == o2.hits) {
>>   return 0;
>> } else if (o1.hits > o2.hits) {
>>   return -1;
>> } else {
>>   return 1;
>> }
>>   }
>>
>> }
>>
>> ...
>>
>>
>>
>> Thanks,
>> Ashish
>>
>> On Wed, Sep 9, 2015 at 5:13 PM, Ted Yu  wrote:
>>
>>> Which release of Spark are you using ?
>>>
>>> Can you show skeleton of your partitioner and comparator ?
>>>
>>> Thanks
>>>
>>>
>>>
>>> On Sep 9, 2015, at 4:45 PM, Ashish Shenoy 
>>> wrote:
>>>
>>> Hi,
>>>
>>> I am trying to sort a RDD pair
>>> using repartitionAndSortWithinPartitions() for my key [which is a custom
>>> class, not a java primitive] using a custom partitioner on that key and a
>>> custom comparator. However, it fails consistently:
>>>
>>> org.apache.spark.SparkException: Job aborted due to stage failure: Task
>>> 18 in stage 1.0 failed 4 times, most recent failure: Lost task 18.3 in
>>> stage 1.0 (TID 202, 172.16.18.25):
>>> java.lang.ArrayIndexOutOfBoundsException: -78
>>> at
>>> org.apache.spark.util.collection.ExternalSorter.spillToPartitionFiles(ExternalSorter.scala:375)
>>> at
>>> org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:208)
>>> at
>>> org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:62)
>>> at
>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:70)
>>> at
>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>>> at org.apache.spark.scheduler.Task.run(Task.scala:70)
>>> at
>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
>>> at
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>> at
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>> at java.lang.Thread.run(Thread.java:745)
>>>
>>> Driver stacktrace:
>>> at org.apache.spark.scheduler.DAGScheduler.org
>>> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1273)
>>> at
>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1264)
>>> at
>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1263)
>>> at
>>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>>> at
>>> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>>> at
>>> org.apache.spark.scheduler.DAGScheduler.abortSta

Re: ArrayIndexOutOfBoundsException when using repartitionAndSortWithinPartitions()

2015-09-10 Thread Ashish Shenoy
I am using spark-1.4.1

Here's the skeleton code:

JavaPairRDD rddPair =
  rdd.repartitionAndSortWithinPartitions(
  new CustomPartitioner(), new ExportObjectComparator())
.persist(StorageLevel.MEMORY_AND_DISK_SER());

...

@SuppressWarnings("serial")
private static class CustomPartitioner extends Partitioner {
  int numPartitions;
  @Override
  public int numPartitions() {
numPartitions = 40;
return numPartitions;
  }

  @Override
  public int getPartition(Object o) {
NewKey newKey = (NewKey) o;
return (int) newKey.getGsMinusURL() % numPartitions;
  }
}

...

@SuppressWarnings("serial")
private static class ExportObjectComparator
  implements Serializable, Comparator {
  @Override
  public int compare(NewKey o1, NewKey o2) {
if (o1.hits == o2.hits) {
  return 0;
} else if (o1.hits > o2.hits) {
  return -1;
} else {
  return 1;
}
  }

}

...



Thanks,
Ashish

On Wed, Sep 9, 2015 at 5:13 PM, Ted Yu  wrote:

> Which release of Spark are you using ?
>
> Can you show skeleton of your partitioner and comparator ?
>
> Thanks
>
>
>
> On Sep 9, 2015, at 4:45 PM, Ashish Shenoy 
> wrote:
>
> Hi,
>
> I am trying to sort a RDD pair using repartitionAndSortWithinPartitions()
> for my key [which is a custom class, not a java primitive] using a custom
> partitioner on that key and a custom comparator. However, it fails
> consistently:
>
> org.apache.spark.SparkException: Job aborted due to stage failure: Task 18
> in stage 1.0 failed 4 times, most recent failure: Lost task 18.3 in stage
> 1.0 (TID 202, 172.16.18.25): java.lang.ArrayIndexOutOfBoundsException: -78
> at
> org.apache.spark.util.collection.ExternalSorter.spillToPartitionFiles(ExternalSorter.scala:375)
> at
> org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:208)
> at
> org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:62)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:70)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
> at org.apache.spark.scheduler.Task.run(Task.scala:70)
> at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
>
> Driver stacktrace:
> at org.apache.spark.scheduler.DAGScheduler.org
> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1273)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1264)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1263)
> at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at
> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> at
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1263)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
> at scala.Option.foreach(Option.scala:236)
> at
> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:730)
> at
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1457)
> at
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1418)
> at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
>
> I also persist the RDD using the "memory and disk" storage level. The
> stack trace above comes from spark's code and not my application code. Can
> you pls point out what I am doing wrong ?
>
> Thanks,
> Ashish
>
>


ArrayIndexOutOfBoundsException when using repartitionAndSortWithinPartitions()

2015-09-09 Thread Ashish Shenoy
Hi,

I am trying to sort a RDD pair using repartitionAndSortWithinPartitions()
for my key [which is a custom class, not a java primitive] using a custom
partitioner on that key and a custom comparator. However, it fails
consistently:

org.apache.spark.SparkException: Job aborted due to stage failure: Task 18
in stage 1.0 failed 4 times, most recent failure: Lost task 18.3 in stage
1.0 (TID 202, 172.16.18.25): java.lang.ArrayIndexOutOfBoundsException: -78
at
org.apache.spark.util.collection.ExternalSorter.spillToPartitionFiles(ExternalSorter.scala:375)
at
org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:208)
at
org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:62)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:70)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:70)
at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org
$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1273)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1264)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1263)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at
scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1263)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
at scala.Option.foreach(Option.scala:236)
at
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:730)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1457)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1418)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)

I also persist the RDD using the "memory and disk" storage level. The stack
trace above comes from spark's code and not my application code. Can you
pls point out what I am doing wrong ?

Thanks,
Ashish