I was using this function percentile_approx on 100GB of compressed data and it just hangs there. Any pointers?
On Wed, Mar 22, 2017 at 6:09 PM, ayan guha <guha.a...@gmail.com> wrote: > For median, use percentile_approx with 0.5 (50th percentile is the median) > > On Thu, Mar 23, 2017 at 11:01 AM, Yong Zhang <java8...@hotmail.com> wrote: > >> He is looking for median, not mean/avg. >> >> >> You have to implement the median logic by yourself, as there is no >> directly implementation from Spark. You can use RDD API, if you are using >> 1.6.x, or dataset if 2.x >> >> >> The following example gives you an idea how to calculate the median using >> dataset API. You can even change the code to add additional logic to >> calculate the diff of every value with the median. >> >> >> scala> spark.version >> res31: String = 2.1.0 >> >> scala> val ds = Seq((100,0.43),(100,0.33),(100,0.73),(101,0.29),(101,0.96), >> (101,0.42),(101,0.01)).toDF("id","value").as[(Int, Double)] >> ds: org.apache.spark.sql.Dataset[(Int, Double)] = [id: int, value: double] >> >> scala> ds.show >> +---+-----+ >> | id|value| >> +---+-----+ >> |100| 0.43| >> |100| 0.33| >> |100| 0.73| >> |101| 0.29| >> |101| 0.96| >> |101| 0.42| >> |101| 0.01| >> +---+-----+ >> >> scala> def median(seq: Seq[Double]) = { >> | val size = seq.size >> | val sorted = seq.sorted >> | size match { >> | case even if size % 2 == 0 => (sorted((size-2)/2) + >> sorted(size/2)) / 2 >> | case odd => sorted((size-1)/2) >> | } >> | } >> median: (seq: Seq[Double])Double >> >> scala> ds.groupByKey(_._1).mapGroups((id, iter) => (id, >> median(iter.map(_._2).toSeq))).show >> +---+-----+ >> | _1| _2| >> +---+-----+ >> |101|0.355| >> |100| 0.43| >> +---+-----+ >> >> >> Yong >> >> >> >> >> ------------------------------ >> *From:* ayan guha <guha.a...@gmail.com> >> *Sent:* Wednesday, March 22, 2017 7:23 PM >> *To:* Craig Ching >> *Cc:* Yong Zhang; user@spark.apache.org >> *Subject:* Re: calculate diff of value and median in a group >> >> I would suggest use window function with partitioning. >> >> select group1,group2,name,value, avg(value) over (partition group1,group2 >> order by name) m >> from t >> >> On Thu, Mar 23, 2017 at 9:58 AM, Craig Ching <craigch...@gmail.com> >> wrote: >> >>> Are the elements count big per group? If not, you can group them and use >>> the code to calculate the median and diff. >>> >>> >>> They're not big, no. Any pointers on how I might do that? The part I'm >>> having trouble with is the grouping, I can't seem to see how to do the >>> median per group. For mean, we have the agg feature, but not for median >>> (and I understand the reasons for that). >>> >>> Yong >>> >>> ------------------------------ >>> *From:* Craig Ching <craigch...@gmail.com> >>> *Sent:* Wednesday, March 22, 2017 3:17 PM >>> *To:* user@spark.apache.org >>> *Subject:* calculate diff of value and median in a group >>> >>> Hi, >>> >>> When using pyspark, I'd like to be able to calculate the difference >>> between grouped values and their median for the group. Is this possible? >>> Here is some code I hacked up that does what I want except that it >>> calculates the grouped diff from mean. Also, please feel free to comment >>> on how I could make this better if you feel like being helpful :) >>> >>> from pyspark import SparkContext >>> from pyspark.sql import SparkSession >>> from pyspark.sql.types import ( >>> StringType, >>> LongType, >>> DoubleType, >>> StructField, >>> StructType >>> ) >>> from pyspark.sql import functions as F >>> >>> >>> sc = SparkContext(appName='myapp') >>> spark = SparkSession(sc) >>> >>> file_name = 'data.csv' >>> >>> fields = [ >>> StructField( >>> 'group2', >>> LongType(), >>> True), >>> StructField( >>> 'name', >>> StringType(), >>> True), >>> StructField( >>> 'value', >>> DoubleType(), >>> True), >>> StructField( >>> 'group1', >>> LongType(), >>> True) >>> ] >>> schema = StructType(fields) >>> >>> df = spark.read.csv( >>> file_name, header=False, mode="DROPMALFORMED", schema=schema >>> ) >>> df.show() >>> means = df.select([ >>> 'group1', >>> 'group2', >>> 'name', >>> 'value']).groupBy([ >>> 'group1', >>> 'group2' >>> ]).agg( >>> F.mean('value').alias('mean_value') >>> ).orderBy('group1', 'group2') >>> >>> cond = [df.group1 == means.group1, df.group2 == means.group2] >>> >>> means.show() >>> df = df.select([ >>> 'group1', >>> 'group2', >>> 'name', >>> 'value']).join( >>> means, >>> cond >>> ).drop( >>> df.group1 >>> ).drop( >>> df.group2 >>> ).select('group1', >>> 'group2', >>> 'name', >>> 'value', >>> 'mean_value') >>> >>> final = df.withColumn( >>> 'diff', >>> F.abs(df.value - df.mean_value)) >>> final.show() >>> >>> sc.stop() >>> >>> And here is an example dataset I'm playing with: >>> >>> 100,name1,0.43,0 >>> 100,name2,0.33,0 >>> 100,name3,0.73,0 >>> 101,name1,0.29,0 >>> 101,name2,0.96,0 >>> 101,name3,0.42,0 >>> 102,name1,0.01,0 >>> 102,name2,0.42,0 >>> 102,name3,0.51,0 >>> 103,name1,0.55,0 >>> 103,name2,0.45,0 >>> 103,name3,0.02,0 >>> 104,name1,0.93,0 >>> 104,name2,0.16,0 >>> 104,name3,0.74,0 >>> 105,name1,0.41,0 >>> 105,name2,0.65,0 >>> 105,name3,0.29,0 >>> 100,name1,0.51,1 >>> 100,name2,0.51,1 >>> 100,name3,0.43,1 >>> 101,name1,0.59,1 >>> 101,name2,0.55,1 >>> 101,name3,0.84,1 >>> 102,name1,0.01,1 >>> 102,name2,0.98,1 >>> 102,name3,0.44,1 >>> 103,name1,0.47,1 >>> 103,name2,0.16,1 >>> 103,name3,0.02,1 >>> 104,name1,0.83,1 >>> 104,name2,0.89,1 >>> 104,name3,0.31,1 >>> 105,name1,0.59,1 >>> 105,name2,0.77,1 >>> 105,name3,0.45,1 >>> >>> and here is what I'm trying to produce: >>> >>> group1,group2,name,value,median,diff >>> 0,100,name1,0.43,0.43,0.0 >>> 0,100,name2,0.33,0.43,0.10 >>> 0,100,name3,0.73,0.43,0.30 >>> 0,101,name1,0.29,0.42,0.13 >>> 0,101,name2,0.96,0.42,0.54 >>> 0,101,name3,0.42,0.42,0.0 >>> 0,102,name1,0.01,0.42,0.41 >>> 0,102,name2,0.42,0.42,0.0 >>> 0,102,name3,0.51,0.42,0.09 >>> 0,103,name1,0.55,0.45,0.10 >>> 0,103,name2,0.45,0.45,0.0 >>> 0,103,name3,0.02,0.45,0.43 >>> 0,104,name1,0.93,0.74,0.19 >>> 0,104,name2,0.16,0.74,0.58 >>> 0,104,name3,0.74,0.74,0.0 >>> 0,105,name1,0.41,0.41,0.0 >>> 0,105,name2,0.65,0.41,0.24 >>> 0,105,name3,0.29,0.41,0.24 >>> 1,100,name1,0.51,0.51,0.0 >>> 1,100,name2,0.51,0.51,0.0 >>> 1,100,name3,0.43,0.51,0.08 >>> 1,101,name1,0.59,0.59,0.0 >>> 1,101,name2,0.55,0.59,0.04 >>> 1,101,name3,0.84,0.59,0.25 >>> 1,102,name1,0.01,0.44,0.43 >>> 1,102,name2,0.98,0.44,0.54 >>> 1,102,name3,0.44,0.44,0.0 >>> 1,103,name1,0.47,0.16,0.31 >>> 1,103,name2,0.16,0.16,0.0 >>> 1,103,name3,0.02,0.16,0.14 >>> 1,104,name1,0.83,0.83,0.0 >>> 1,104,name2,0.89,0.83,0.06 >>> 1,104,name3,0.31,0.83,0.52 >>> 1,105,name1,0.59,0.59,0.0 >>> 1,105,name2,0.77,0.59,0.18 >>> 1,105,name3,0.45,0.59,0.14 >>> >>> Thanks for any help! >>> >>> Cheers, >>> Craig >>> >>> >> >> >> -- >> Best Regards, >> Ayan Guha >> > > > > -- > Best Regards, > Ayan Guha >