the meaining of "samplePointsPerPartitionHint" in RangePartitioner
HI all, The belowing is the code of RangePartitioner. class RangePartitioner[K : Ordering : ClassTag, V]( partitions: Int, rdd: RDD[_ <: Product2[K, V]], private var ascending: Boolean = true, val samplePointsPerPartitionHint: Int = 20) I feel puzzled about the samplePointsPerPartitionHint. My issue is : what is the samplePointsPerPartitionHint used for please? If I set samplePointsPerPartitionHint as 100 or 20,what will happed please? Thanks. Robin Shao 1427357...@qq.com
Re: how to investigate skew and DataFrames and RangePartitioner
Hi, I'm afraid there is currently no api to define RangeParititoner in df. // maropu On Tue, Jun 14, 2016 at 5:04 AM, Peter Halliday <pjh...@cornell.edu> wrote: > I have two questions > > First,I have a failure when I write parquet from Spark 1.6.1 on Amazon EMR > to S3. This is full batch, which is over 200GB of source data. The > partitioning is based on a geographic identifier we use, and also a date we > got the data. However, because of geographical density we certainly could > be hitting the fact we are getting tiles too dense. I’m trying to figure > out how to figure out the size of the file it’s trying to write out. > > Second, We use to use RDDs and RangePartitioner for task partitioning. > However, I don’t see this available in DataFrames. How does one achieve > this now. > > Peter Halliday > - > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > > -- --- Takeshi Yamamuro
how to investigate skew and DataFrames and RangePartitioner
I have two questions First,I have a failure when I write parquet from Spark 1.6.1 on Amazon EMR to S3. This is full batch, which is over 200GB of source data. The partitioning is based on a geographic identifier we use, and also a date we got the data. However, because of geographical density we certainly could be hitting the fact we are getting tiles too dense. I’m trying to figure out how to figure out the size of the file it’s trying to write out. Second, We use to use RDDs and RangePartitioner for task partitioning. However, I don’t see this available in DataFrames. How does one achieve this now. Peter Halliday - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RangePartitioner in Spark 1.2.1
Hi, Sparkers: I just happened to search in google for something related to the RangePartitioner of spark, and found an old thread in this email list as here: http://apache-spark-user-list.1001560.n3.nabble.com/RDD-and-Partition-td991.html I followed the code example mentioned in that email thread as following: scala import org.apache.spark.RangePartitionerimport org.apache.spark.RangePartitioner scala val rdd = sc.parallelize(List(apple, Ball, cat, dog, Elephant, fox, gas, horse, index, jet, kitsch, long, moon, Neptune, ooze, Pen, quiet, rose, sun, talk, umbrella, voice, Walrus, xeon, Yam, zebra))rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[0] at parallelize at console:13 scala rdd.keyBy(s = s(0).toUpper)res0: org.apache.spark.rdd.RDD[(Char, String)] = MappedRDD[1] at keyBy at console:16 scala res0.partitionBy(new RangePartitioner[Char, String](26, res0)).valuesres1: org.apache.spark.rdd.RDD[String] = MappedRDD[5] at values at console:18 scala res1.mapPartitionsWithIndex((idx, itr) = itr.map(s = (idx, s))).collect.foreach(println) The above example is clear for me to understand the meaning of the RangePartitioner, but to my surprise, I got the following result: (0,apple)(0,Ball)(1,cat)(2,dog)(3,Elephant)(4,fox)(5,gas)(6,horse)(7,index)(8,jet)(9,kitsch)(10,long)(11,moon)(12,Neptune)(13,ooze)(14,Pen)(15,quiet)(16,rose)(17,sun)(18,talk)(19,umbrella)(20,voice)(21,Walrus)(22,xeon)(23,Yam)(24,zebra) instead of a perfect range index from 0 to 25 in old email thread. Why is that? Is this a bug, or some new feature I don't understand? BTW, the above environment I tested is in Spark 1.2.1 with Hadoop 2.4 binary release. Thanks Yong
Re: RangePartitioner in Spark 1.2.1
RangePartitioner does not actually provide a guarantee that all partitions will be equal sized (that is hard), and instead uses sampling to approximate equal buckets. Thus, it is possible that a bucket is left empty. If you want the specified behavior, you should define your own partitioner. It would look something like this (untested): class AlphabetPartitioner extends Partitioner { def numPartitions = 26 def getPartition(key: Any): Int = key match { case s: String = s(0).toUpper - 'A' } override def equals(other: Any): Boolean = other.isInstanceOf[AlphabetPartitioner] override def hashCode: Int = 0 } On Tue, Feb 17, 2015 at 7:05 PM, java8964 java8...@hotmail.com wrote: Hi, Sparkers: I just happened to search in google for something related to the RangePartitioner of spark, and found an old thread in this email list as here: http://apache-spark-user-list.1001560.n3.nabble.com/RDD-and-Partition-td991.html I followed the code example mentioned in that email thread as following: scala import org.apache.spark.RangePartitioner import org.apache.spark.RangePartitioner scala val rdd = sc.parallelize(List(apple, Ball, cat, dog, Elephant, fox, gas, horse, index, jet, kitsch, long, moon, Neptune, ooze, Pen, quiet, rose, sun, talk, umbrella, voice, Walrus, xeon, Yam, zebra)) rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[0] at parallelize at console:13 scala rdd.keyBy(s = s(0).toUpper) res0: org.apache.spark.rdd.RDD[(Char, String)] = MappedRDD[1] at keyBy at console:16 scala res0.partitionBy(new RangePartitioner[Char, String](26, res0)).values res1: org.apache.spark.rdd.RDD[String] = MappedRDD[5] at values at console:18 scala res1.mapPartitionsWithIndex((idx, itr) = itr.map(s = (idx, s))).collect.foreach(println) The above example is clear for me to understand the meaning of the RangePartitioner, but to my surprise, I got the following result: *(0,apple)* *(0,Ball)* (1,cat) (2,dog) (3,Elephant) (4,fox) (5,gas) (6,horse) (7,index) (8,jet) (9,kitsch) (10,long) (11,moon) (12,Neptune) (13,ooze) (14,Pen) (15,quiet) (16,rose) (17,sun) (18,talk) (19,umbrella) (20,voice) (21,Walrus) (22,xeon) (23,Yam) (24,zebra) instead of a perfect range index from 0 to 25 in old email thread. Why is that? Is this a bug, or some new feature I don't understand? BTW, the above environment I tested is in Spark 1.2.1 with Hadoop 2.4 binary release. Thanks Yong
Re: Spark job stuck at RangePartitioner at Exchange.scala:79
I was able to resolve this by adding rdd.collect() after every stage. This enforced RDD evaluation and helped avoid the choke point. regards Sunita Kopppar On Sat, Jan 17, 2015 at 12:56 PM, Sunita Arvind sunitarv...@gmail.com wrote: Hi, My spark jobs suddenly started getting hung and here is the debug leading to it: Following the program, it seems to be stuck whenever I do any collect(), count or rdd.saveAsParquet file. AFAIK, any operation that requires data flow back to master causes this. I increased the memory to 5 MB. Also, as per the debug statements, the memory is sufficient enough. Also increased -Xss and 15/01/17 11:44:16 INFO storage.MemoryStore: ensureFreeSpace(264808) called with curMem=0, maxMem=1019782103 15/01/17 11:44:16 INFO storage.MemoryStore: Block broadcast_0 stored as values in memory (estimated size 258.6 KB, free 972.3 MB) 15/01/17 11:44:16 INFO spark.SparkContext: Starting job: collect at SparkPlan.scala:85 15/01/17 11:44:16 INFO storage.MemoryStore: ensureFreeSpace(210344) called with curMem=264808, maxMem=1019782103 15/01/17 11:44:16 INFO storage.MemoryStore: Block broadcast_1 stored as values in memory (estimated size 205.4 KB, free 972.1 MB) 15/01/17 11:44:16 INFO storage.MemoryStore: ensureFreeSpace(282200) called with curMem=475152, maxMem=1019782103 15/01/17 11:44:16 INFO storage.MemoryStore: Block broadcast_2 stored as values in memory (estimated size 275.6 KB, free 971.8 MB) 15/01/17 11:44:16 INFO spark.SparkContext: Starting job: RangePartitioner at Exchange.scala:79 A bit of background which may or may not be relevant. The program was working fine in eclipse, however, was getting hung upon submission to the cluster. In an attempt to debug, I changed the version in build.sbt to match the one on the cluster sbt config when the program was working: org.apache.spark %% spark-core % 1.1.0 % provided, org.apache.spark %% spark-sql % 1.1.0 % provided, spark.jobserver % job-server-api % 0.4.0, com.github.nscala-time %% nscala-time % 1.6.0, org.apache.hadoop % hadoop-client % 2.3.0 % provided During debugging, I changed this to: org.apache.spark %% spark-core % 1.2.0 % provided, org.apache.spark %% spark-sql % 1.2.0 % provided, spark.jobserver % job-server-api % 0.4.0, com.github.nscala-time %% nscala-time % 1.6.0, org.apache.hadoop % hadoop-client % 2.5.0 % provided This is when the program started getting hung at the first rdd.count(). Now, even after reverting the changes in build.sbt, my program is getting hung at the same point. Tried these config changes in addition to -Xmx and -Xss in the eclipse.ini to 5MB each and set the below vars programatically sparkConf.set(spark.akka.frameSize,10) sparkConf.set(spark.shuffle.spill,true) sparkConf.set(spark.driver.memory,512m) sparkConf.set(spark.executor.memory,1g) sparkConf.set(spark.driver.maxResultSize,1g) Please note. In eclipse as well as sbt the program kept throwing StackOverflow. Increasing Xss to 5 MB eliminated the problem, Could this be something unrelated to memory? The SchemaRDDs have close to 400 columns and hence I am using StructType(StructField) and performing applySchema. My code cannot be shared right now. If required, I will edit it and post. regards Sunita
Re: RangePartitioner
Hi Rishi, If you look in the Spark UI, have any executors registered? Are you able to collect a jstack of the driver process? -Sandy On Tue, Jan 20, 2015 at 9:07 PM, Rishi Yadav ri...@infoobjects.com wrote: I am joining two tables as below, the program stalls at below log line and never proceeds. What might be the issue and possible solution? INFO SparkContext: Starting job: RangePartitioner at Exchange.scala:79 Table 1 has 450 columns Table2 has 100 columns Both tables have few million rows val table1= myTable1.as('table1) val table2= myTable2.as('table2) val results= table1.join(table2,LeftOuter,Some(table1.Id.attr === table2.id.attr )) println(results.count()) Thanks and Regards, Rishi @meditativesoul
RangePartitioner
I am joining two tables as below, the program stalls at below log line and never proceeds. What might be the issue and possible solution? INFO SparkContext: Starting job: RangePartitioner at Exchange.scala:79 Table 1 has 450 columns Table2 has 100 columns Both tables have few million rows val table1= myTable1.as('table1) val table2= myTable2.as('table2) val results= table1.join(table2,LeftOuter,Some(table1.Id.attr === table2.id.attr )) println(results.count()) Thanks and Regards, Rishi @meditativesoul
Spark job stuck at RangePartitioner at Exchange.scala:79
Hi, My spark jobs suddenly started getting hung and here is the debug leading to it: Following the program, it seems to be stuck whenever I do any collect(), count or rdd.saveAsParquet file. AFAIK, any operation that requires data flow back to master causes this. I increased the memory to 5 MB. Also, as per the debug statements, the memory is sufficient enough. Also increased -Xss and 15/01/17 11:44:16 INFO storage.MemoryStore: ensureFreeSpace(264808) called with curMem=0, maxMem=1019782103 15/01/17 11:44:16 INFO storage.MemoryStore: Block broadcast_0 stored as values in memory (estimated size 258.6 KB, free 972.3 MB) 15/01/17 11:44:16 INFO spark.SparkContext: Starting job: collect at SparkPlan.scala:85 15/01/17 11:44:16 INFO storage.MemoryStore: ensureFreeSpace(210344) called with curMem=264808, maxMem=1019782103 15/01/17 11:44:16 INFO storage.MemoryStore: Block broadcast_1 stored as values in memory (estimated size 205.4 KB, free 972.1 MB) 15/01/17 11:44:16 INFO storage.MemoryStore: ensureFreeSpace(282200) called with curMem=475152, maxMem=1019782103 15/01/17 11:44:16 INFO storage.MemoryStore: Block broadcast_2 stored as values in memory (estimated size 275.6 KB, free 971.8 MB) 15/01/17 11:44:16 INFO spark.SparkContext: Starting job: RangePartitioner at Exchange.scala:79 A bit of background which may or may not be relevant. The program was working fine in eclipse, however, was getting hung upon submission to the cluster. In an attempt to debug, I changed the version in build.sbt to match the one on the cluster sbt config when the program was working: org.apache.spark %% spark-core % 1.1.0 % provided, org.apache.spark %% spark-sql % 1.1.0 % provided, spark.jobserver % job-server-api % 0.4.0, com.github.nscala-time %% nscala-time % 1.6.0, org.apache.hadoop % hadoop-client % 2.3.0 % provided During debugging, I changed this to: org.apache.spark %% spark-core % 1.2.0 % provided, org.apache.spark %% spark-sql % 1.2.0 % provided, spark.jobserver % job-server-api % 0.4.0, com.github.nscala-time %% nscala-time % 1.6.0, org.apache.hadoop % hadoop-client % 2.5.0 % provided This is when the program started getting hung at the first rdd.count(). Now, even after reverting the changes in build.sbt, my program is getting hung at the same point. Tried these config changes in addition to -Xmx and -Xss in the eclipse.ini to 5MB each and set the below vars programatically sparkConf.set(spark.akka.frameSize,10) sparkConf.set(spark.shuffle.spill,true) sparkConf.set(spark.driver.memory,512m) sparkConf.set(spark.executor.memory,1g) sparkConf.set(spark.driver.maxResultSize,1g) Please note. In eclipse as well as sbt the program kept throwing StackOverflow. Increasing Xss to 5 MB eliminated the problem, Could this be something unrelated to memory? The SchemaRDDs have close to 400 columns and hence I am using StructType(StructField) and performing applySchema. My code cannot be shared right now. If required, I will edit it and post. regards Sunita