Hello Vadim - Alternatively, you can achieve by using the *window functions* which is available from 1.4.0
*code_value.txt (Input)* ===================== 1000,200,Descr-200,01 1000,200,Descr-200-new,02 1000,201,Descr-201,01 1000,202,Descr-202-new,03 1000,202,Descr-202,01 1000,202,Descr-202-old,02 *Expected Output(DataFrame):* ========================== 1000,200,Descr-200-new,02 1000,201,Descr-201,01 1000,202,Descr-202-new,03 ====================================================================================== *Code (Spark-Shell)* import org.apache.spark.{SparkContext,SparkConf} import org.apache.spark.sql.hive.HiveContext import org.apache.spark.sql.expressions.Window import org.apache.spark.sql.functions val sqlSC = new org.apache.spark.sql.hive.HiveContext(sc) import sqlSC.implicits._ case class data(batch_id:Int,code:String,descr:String,seq:Int) val input_RDD = sc.textFile("Data/Projects/Spark/Input/code_Value.txt") val data_RDD = input_RDD.map(line=>line.split(",")).map(x=>data(x(0).toInt,x(1),x(2),x(3).toInt)) val data_DF = data_RDD.toDF() val winSpec = Window.partitionBy(data_DF("code")).orderBy(data_DF("seq").desc) {data_DF.select($"batch_id",$"code",$"descr",$"seq", rowNumber.over(winSpec).alias("rn")) .filter($"rn"<=1) .select($"batch_id",$"code",$"descr",$"seq") .show} ====================================================================================== Thanks & Regards, Gokula Krishnan* (Gokul)* On Wed, Dec 30, 2015 at 11:35 AM, Vadim Tkachenko <apache...@gmail.com> wrote: > Davies, > > Thank you, I will wait on 1.6 release. > > > http://apache-spark-user-list.1001560.n3.nabble.com/Problem-with-WINDOW-functions-tt25833.html > ? > > On Wed, Dec 30, 2015 at 12:06 AM, Davies Liu <dav...@databricks.com> > wrote: > >> Window functions are improved in 1.6 release, could you try 1.6-RC4 >> (or wait until next week for the final release)? >> >> Even In 1.6, the buffer of rows for window function does not support >> spilling (also does not use memory efficiently), there is a JIRA for >> it: https://issues.apache.org/jira/browse/SPARK-12295 >> >> On Tue, Dec 29, 2015 at 5:28 PM, vadimtk <apache...@gmail.com> wrote: >> > Hi, >> > >> > I can't successfully execute a query with WINDOW function. >> > >> > The statements are following: >> > >> > val orcFile = >> > >> sqlContext.read.parquet("/data/flash/spark/dat14sn").filter("upper(project)='EN'") >> > orcFile.registerTempTable("d1") >> > sqlContext.sql("SELECT day,page,dense_rank() OVER (PARTITION BY day >> ORDER >> > BY pageviews DESC) as rank FROM d1").filter("rank <= >> > 20").sort($"day",$"rank").collect().foreach(println) >> > >> > with default >> > spark.driver.memory >> > >> > I am getting java.lang.OutOfMemoryError: Java heap space. >> > The same if I set spark.driver.memory=10g. >> > >> > When I set spark.driver.memory=45g (the box has 256GB of RAM) the >> execution >> > fails with a different error: >> > >> > 15/12/29 23:03:19 WARN HeartbeatReceiver: Removing executor 0 with no >> recent >> > heartbeats: 129324 ms exceeds timeout 120000 ms >> > >> > And I see that GC takes a lot of time. >> > >> > What is a proper way to execute statements above? >> > >> > I see the similar problems reported >> > >> http://stackoverflow.com/questions/32196859/org-apache-spark-shuffle-fetchfailedexception >> > >> http://stackoverflow.com/questions/32544478/spark-memory-settings-for-count-action-in-a-big-table >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > -- >> > View this message in context: >> http://apache-spark-user-list.1001560.n3.nabble.com/Problem-with-WINDOW-functions-tp25833.html >> > Sent from the Apache Spark User List mailing list archive at Nabble.com. >> > >> > --------------------------------------------------------------------- >> > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >> > For additional commands, e-mail: user-h...@spark.apache.org >> > >> > >