the meaining of "samplePointsPerPartitionHint" in RangePartitioner

2018-03-20 Thread 1427357...@qq.com
HI  all,

The belowing is the code of RangePartitioner.
class RangePartitioner[K : Ordering : ClassTag, V](
partitions: Int,
rdd: RDD[_ <: Product2[K, V]],
private var ascending: Boolean = true,
val samplePointsPerPartitionHint: Int = 20)
I feel puzzled about the samplePointsPerPartitionHint.
My issue is :
what is the samplePointsPerPartitionHint used for please?
If I set samplePointsPerPartitionHint as 100 or 20,what will happed please?

Thanks.

Robin Shao




1427357...@qq.com


Re: how to investigate skew and DataFrames and RangePartitioner

2016-06-14 Thread Takeshi Yamamuro
Hi,

I'm afraid there is currently no api to define RangeParititoner in df.

// maropu

On Tue, Jun 14, 2016 at 5:04 AM, Peter Halliday <pjh...@cornell.edu> wrote:

> I have two questions
>
> First,I have a failure when I write parquet from Spark 1.6.1 on Amazon EMR
> to S3.  This is full batch, which is over 200GB of source data.  The
> partitioning is based on a geographic identifier we use, and also a date we
> got the data.  However, because of geographical density we certainly could
> be hitting the fact we are getting tiles too dense.  I’m trying to figure
> out how to figure out the size of the file it’s trying to write out.
>
> Second, We use to use RDDs and RangePartitioner for task partitioning.
> However, I don’t see this available in DataFrames.  How does one achieve
> this now.
>
> Peter Halliday
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


-- 
---
Takeshi Yamamuro


how to investigate skew and DataFrames and RangePartitioner

2016-06-13 Thread Peter Halliday
I have two questions

First,I have a failure when I write parquet from Spark 1.6.1 on Amazon EMR to 
S3.  This is full batch, which is over 200GB of source data.  The partitioning 
is based on a geographic identifier we use, and also a date we got the data.  
However, because of geographical density we certainly could be hitting the fact 
we are getting tiles too dense.  I’m trying to figure out how to figure out the 
size of the file it’s trying to write out.

Second, We use to use RDDs and RangePartitioner for task partitioning.  
However, I don’t see this available in DataFrames.  How does one achieve this 
now.

Peter Halliday
-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RangePartitioner in Spark 1.2.1

2015-02-17 Thread java8964
Hi, Sparkers:
I just happened to search in google for something related to the 
RangePartitioner of spark, and found an old thread in this email list as here:
http://apache-spark-user-list.1001560.n3.nabble.com/RDD-and-Partition-td991.html
I followed the code example mentioned in that email thread as following:
scala  import org.apache.spark.RangePartitionerimport 
org.apache.spark.RangePartitioner
scala val rdd = sc.parallelize(List(apple, Ball, cat, dog, Elephant, 
fox, gas, horse, index, jet, kitsch, long, moon, Neptune, 
ooze, Pen, quiet, rose, sun, talk, umbrella, voice, Walrus, 
xeon, Yam, zebra))rdd: org.apache.spark.rdd.RDD[String] = 
ParallelCollectionRDD[0] at parallelize at console:13
scala rdd.keyBy(s = s(0).toUpper)res0: org.apache.spark.rdd.RDD[(Char, 
String)] = MappedRDD[1] at keyBy at console:16
scala res0.partitionBy(new RangePartitioner[Char, String](26, 
res0)).valuesres1: org.apache.spark.rdd.RDD[String] = MappedRDD[5] at values at 
console:18
scala res1.mapPartitionsWithIndex((idx, itr) = itr.map(s = (idx, 
s))).collect.foreach(println)
The above example is clear for me to understand the meaning of the 
RangePartitioner, but to my surprise, I got the following result:
(0,apple)(0,Ball)(1,cat)(2,dog)(3,Elephant)(4,fox)(5,gas)(6,horse)(7,index)(8,jet)(9,kitsch)(10,long)(11,moon)(12,Neptune)(13,ooze)(14,Pen)(15,quiet)(16,rose)(17,sun)(18,talk)(19,umbrella)(20,voice)(21,Walrus)(22,xeon)(23,Yam)(24,zebra)
instead of a perfect range index from 0 to 25 in old email thread. Why is that? 
Is this a bug, or some new feature I don't understand?
BTW, the above environment I tested is in Spark 1.2.1 with Hadoop 2.4 binary 
release.
Thanks
Yong  

Re: RangePartitioner in Spark 1.2.1

2015-02-17 Thread Aaron Davidson
RangePartitioner does not actually provide a guarantee that all partitions
will be equal sized (that is hard), and instead uses sampling to
approximate equal buckets. Thus, it is possible that a bucket is left empty.

If you want the specified behavior, you should define your own partitioner.
It would look something like this (untested):
class AlphabetPartitioner extends Partitioner {
  def numPartitions = 26
  def getPartition(key: Any): Int = key match {
case s: String = s(0).toUpper - 'A'
  }
  override def equals(other: Any): Boolean =
other.isInstanceOf[AlphabetPartitioner]
  override def hashCode: Int = 0
}

On Tue, Feb 17, 2015 at 7:05 PM, java8964 java8...@hotmail.com wrote:

 Hi, Sparkers:

 I just happened to search in google for something related to the
 RangePartitioner of spark, and found an old thread in this email list as
 here:


 http://apache-spark-user-list.1001560.n3.nabble.com/RDD-and-Partition-td991.html

 I followed the code example mentioned in that email thread as following:

 scala  import org.apache.spark.RangePartitioner
 import org.apache.spark.RangePartitioner

 scala val rdd = sc.parallelize(List(apple, Ball, cat, dog,
 Elephant, fox, gas, horse, index, jet, kitsch, long,
 moon, Neptune, ooze, Pen, quiet, rose, sun, talk,
 umbrella, voice, Walrus, xeon, Yam, zebra))
 rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[0] at
 parallelize at console:13

 scala rdd.keyBy(s = s(0).toUpper)
 res0: org.apache.spark.rdd.RDD[(Char, String)] = MappedRDD[1] at keyBy at
 console:16

 scala res0.partitionBy(new RangePartitioner[Char, String](26,
 res0)).values
 res1: org.apache.spark.rdd.RDD[String] = MappedRDD[5] at values at
 console:18

 scala res1.mapPartitionsWithIndex((idx, itr) = itr.map(s = (idx,
 s))).collect.foreach(println)

 The above example is clear for me to understand the meaning of the
 RangePartitioner, but to my surprise, I got the following result:

 *(0,apple)*
 *(0,Ball)*
 (1,cat)
 (2,dog)
 (3,Elephant)
 (4,fox)
 (5,gas)
 (6,horse)
 (7,index)
 (8,jet)
 (9,kitsch)
 (10,long)
 (11,moon)
 (12,Neptune)
 (13,ooze)
 (14,Pen)
 (15,quiet)
 (16,rose)
 (17,sun)
 (18,talk)
 (19,umbrella)
 (20,voice)
 (21,Walrus)
 (22,xeon)
 (23,Yam)
 (24,zebra)

 instead of a perfect range index from 0 to 25 in old email thread. Why is
 that? Is this a bug, or some new feature I don't understand?

 BTW, the above environment I tested is in Spark 1.2.1 with Hadoop 2.4
 binary release.

 Thanks

 Yong



Re: Spark job stuck at RangePartitioner at Exchange.scala:79

2015-01-21 Thread Sunita Arvind
I was able to resolve this by adding rdd.collect() after every stage. This
enforced RDD evaluation and helped avoid the choke point.

regards
Sunita Kopppar

On Sat, Jan 17, 2015 at 12:56 PM, Sunita Arvind sunitarv...@gmail.com
wrote:

 Hi,

 My spark jobs suddenly started getting hung and here is the debug leading
 to it:
 Following the program, it seems to be stuck whenever I do any collect(),
 count or rdd.saveAsParquet file. AFAIK, any operation that requires data
 flow back to master causes this. I increased the memory to 5 MB. Also, as
 per the debug statements, the memory is sufficient enough. Also increased
 -Xss and

 15/01/17 11:44:16 INFO storage.MemoryStore: ensureFreeSpace(264808) called
 with curMem=0, maxMem=1019782103
 15/01/17 11:44:16 INFO storage.MemoryStore: Block broadcast_0 stored as
 values in memory (estimated size 258.6 KB, free 972.3 MB)
 15/01/17 11:44:16 INFO spark.SparkContext: Starting job: collect at
 SparkPlan.scala:85
 15/01/17 11:44:16 INFO storage.MemoryStore: ensureFreeSpace(210344) called
 with curMem=264808, maxMem=1019782103
 15/01/17 11:44:16 INFO storage.MemoryStore: Block broadcast_1 stored as
 values in memory (estimated size 205.4 KB, free 972.1 MB)
 15/01/17 11:44:16 INFO storage.MemoryStore: ensureFreeSpace(282200) called
 with curMem=475152, maxMem=1019782103
 15/01/17 11:44:16 INFO storage.MemoryStore: Block broadcast_2 stored as
 values in memory (estimated size 275.6 KB, free 971.8 MB)
 15/01/17 11:44:16 INFO spark.SparkContext: Starting job: RangePartitioner
 at Exchange.scala:79

 A bit of background which may or may not be relevant. The program was
 working fine in eclipse, however, was getting hung upon submission to the
 cluster. In an attempt to debug, I changed the version in build.sbt to
 match the one on the cluster

 sbt config when the program was working:
   org.apache.spark %% spark-core % 1.1.0 % provided,
   org.apache.spark %% spark-sql % 1.1.0 % provided,
   spark.jobserver % job-server-api % 0.4.0,
   com.github.nscala-time %% nscala-time % 1.6.0,
   org.apache.hadoop % hadoop-client % 2.3.0 % provided


 During debugging, I changed this to:
   org.apache.spark %% spark-core % 1.2.0 % provided,
   org.apache.spark %% spark-sql % 1.2.0 % provided,
   spark.jobserver % job-server-api % 0.4.0,
   com.github.nscala-time %% nscala-time % 1.6.0,
   org.apache.hadoop % hadoop-client % 2.5.0 % provided

 This is when the program started getting hung at the first rdd.count().
 Now, even after reverting the changes in build.sbt, my program is getting
 hung at the same point.

 Tried these config changes in addition to -Xmx and -Xss in the eclipse.ini
 to 5MB each and set the below vars programatically

 sparkConf.set(spark.akka.frameSize,10)
 sparkConf.set(spark.shuffle.spill,true)
 sparkConf.set(spark.driver.memory,512m)
 sparkConf.set(spark.executor.memory,1g)
 sparkConf.set(spark.driver.maxResultSize,1g)

 Please note. In eclipse as well as sbt the program kept throwing
 StackOverflow. Increasing Xss to 5 MB eliminated the problem,
 Could this be something unrelated to memory? The SchemaRDDs have close to
 400 columns and hence I am using StructType(StructField) and performing
 applySchema.

 My code cannot be shared right now. If required, I will edit it and post.
 regards
 Sunita





Re: RangePartitioner

2015-01-21 Thread Sandy Ryza
Hi Rishi,

If you look in the Spark UI, have any executors registered?

Are you able to collect a jstack of the driver process?

-Sandy

On Tue, Jan 20, 2015 at 9:07 PM, Rishi Yadav ri...@infoobjects.com wrote:

  I am joining two tables as below, the program stalls at below log line
 and never proceeds.
 What might be the issue and possible solution?

  INFO SparkContext: Starting job: RangePartitioner at Exchange.scala:79

 Table 1 has  450 columns
 Table2 has  100 columns

 Both tables have few million rows


 val table1= myTable1.as('table1)
 val table2= myTable2.as('table2)
 val results=
 table1.join(table2,LeftOuter,Some(table1.Id.attr === table2.id.attr ))


println(results.count())

 Thanks and Regards,
 Rishi
 @meditativesoul



RangePartitioner

2015-01-20 Thread Rishi Yadav
I am joining two tables as below, the program stalls at below log line and 
never proceeds.
What might be the issue and possible solution?


 INFO SparkContext: Starting job: RangePartitioner at Exchange.scala:79


Table 1 has  450 columns
Table2 has  100 columns


Both tables have few million rows




            val table1= myTable1.as('table1)
            val table2= myTable2.as('table2)
            val results= table1.join(table2,LeftOuter,Some(table1.Id.attr === 
table2.id.attr ))




           println(results.count())

Thanks and Regards,
Rishi
@meditativesoul

Spark job stuck at RangePartitioner at Exchange.scala:79

2015-01-17 Thread Sunita Arvind
Hi,

My spark jobs suddenly started getting hung and here is the debug leading
to it:
Following the program, it seems to be stuck whenever I do any collect(),
count or rdd.saveAsParquet file. AFAIK, any operation that requires data
flow back to master causes this. I increased the memory to 5 MB. Also, as
per the debug statements, the memory is sufficient enough. Also increased
-Xss and

15/01/17 11:44:16 INFO storage.MemoryStore: ensureFreeSpace(264808) called
with curMem=0, maxMem=1019782103
15/01/17 11:44:16 INFO storage.MemoryStore: Block broadcast_0 stored as
values in memory (estimated size 258.6 KB, free 972.3 MB)
15/01/17 11:44:16 INFO spark.SparkContext: Starting job: collect at
SparkPlan.scala:85
15/01/17 11:44:16 INFO storage.MemoryStore: ensureFreeSpace(210344) called
with curMem=264808, maxMem=1019782103
15/01/17 11:44:16 INFO storage.MemoryStore: Block broadcast_1 stored as
values in memory (estimated size 205.4 KB, free 972.1 MB)
15/01/17 11:44:16 INFO storage.MemoryStore: ensureFreeSpace(282200) called
with curMem=475152, maxMem=1019782103
15/01/17 11:44:16 INFO storage.MemoryStore: Block broadcast_2 stored as
values in memory (estimated size 275.6 KB, free 971.8 MB)
15/01/17 11:44:16 INFO spark.SparkContext: Starting job: RangePartitioner
at Exchange.scala:79

A bit of background which may or may not be relevant. The program was
working fine in eclipse, however, was getting hung upon submission to the
cluster. In an attempt to debug, I changed the version in build.sbt to
match the one on the cluster

sbt config when the program was working:
  org.apache.spark %% spark-core % 1.1.0 % provided,
  org.apache.spark %% spark-sql % 1.1.0 % provided,
  spark.jobserver % job-server-api % 0.4.0,
  com.github.nscala-time %% nscala-time % 1.6.0,
  org.apache.hadoop % hadoop-client % 2.3.0 % provided


During debugging, I changed this to:
  org.apache.spark %% spark-core % 1.2.0 % provided,
  org.apache.spark %% spark-sql % 1.2.0 % provided,
  spark.jobserver % job-server-api % 0.4.0,
  com.github.nscala-time %% nscala-time % 1.6.0,
  org.apache.hadoop % hadoop-client % 2.5.0 % provided

This is when the program started getting hung at the first rdd.count().
Now, even after reverting the changes in build.sbt, my program is getting
hung at the same point.

Tried these config changes in addition to -Xmx and -Xss in the eclipse.ini
to 5MB each and set the below vars programatically

sparkConf.set(spark.akka.frameSize,10)
sparkConf.set(spark.shuffle.spill,true)
sparkConf.set(spark.driver.memory,512m)
sparkConf.set(spark.executor.memory,1g)
sparkConf.set(spark.driver.maxResultSize,1g)

Please note. In eclipse as well as sbt the program kept throwing
StackOverflow. Increasing Xss to 5 MB eliminated the problem,
Could this be something unrelated to memory? The SchemaRDDs have close to
400 columns and hence I am using StructType(StructField) and performing
applySchema.

My code cannot be shared right now. If required, I will edit it and post.
regards
Sunita