Hello Vadim -

Alternatively, you can achieve by using the *window functions* which is
available from 1.4.0

*code_value.txt (Input)*
=====================
1000,200,Descr-200,01
1000,200,Descr-200-new,02
1000,201,Descr-201,01
1000,202,Descr-202-new,03
1000,202,Descr-202,01
1000,202,Descr-202-old,02

*Expected Output(DataFrame):*
==========================
1000,200,Descr-200-new,02
1000,201,Descr-201,01
1000,202,Descr-202-new,03
======================================================================================
*Code (Spark-Shell)*
import org.apache.spark.{SparkContext,SparkConf}
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions

val sqlSC = new org.apache.spark.sql.hive.HiveContext(sc)
import sqlSC.implicits._

case class data(batch_id:Int,code:String,descr:String,seq:Int)

val input_RDD = sc.textFile("Data/Projects/Spark/Input/code_Value.txt")

val data_RDD =
input_RDD.map(line=>line.split(",")).map(x=>data(x(0).toInt,x(1),x(2),x(3).toInt))
val data_DF  = data_RDD.toDF()
val winSpec =
Window.partitionBy(data_DF("code")).orderBy(data_DF("seq").desc)
{data_DF.select($"batch_id",$"code",$"descr",$"seq",
rowNumber.over(winSpec).alias("rn"))
       .filter($"rn"<=1)
  .select($"batch_id",$"code",$"descr",$"seq")
  .show}
======================================================================================



Thanks & Regards,
Gokula Krishnan* (Gokul)*

On Wed, Dec 30, 2015 at 11:35 AM, Vadim Tkachenko <apache...@gmail.com>
wrote:

> Davies,
>
> Thank you, I will wait on 1.6 release.
>
>
> http://apache-spark-user-list.1001560.n3.nabble.com/Problem-with-WINDOW-functions-tt25833.html
> ?
>
> On Wed, Dec 30, 2015 at 12:06 AM, Davies Liu <dav...@databricks.com>
> wrote:
>
>> Window functions are improved in 1.6 release, could you try 1.6-RC4
>> (or wait until next week for the final release)?
>>
>> Even In 1.6, the buffer of rows for window function does not support
>> spilling (also does not use memory efficiently), there is a JIRA for
>> it: https://issues.apache.org/jira/browse/SPARK-12295
>>
>> On Tue, Dec 29, 2015 at 5:28 PM, vadimtk <apache...@gmail.com> wrote:
>> > Hi,
>> >
>> > I can't successfully execute a query with WINDOW function.
>> >
>> > The statements are following:
>> >
>> > val orcFile =
>> >
>> sqlContext.read.parquet("/data/flash/spark/dat14sn").filter("upper(project)='EN'")
>> > orcFile.registerTempTable("d1")
>> >  sqlContext.sql("SELECT day,page,dense_rank() OVER (PARTITION BY day
>> ORDER
>> > BY pageviews DESC) as rank FROM d1").filter("rank <=
>> > 20").sort($"day",$"rank").collect().foreach(println)
>> >
>> > with default
>> > spark.driver.memory
>> >
>> > I am getting java.lang.OutOfMemoryError: Java heap space.
>> > The same if I set spark.driver.memory=10g.
>> >
>> > When I set spark.driver.memory=45g (the box has 256GB of RAM) the
>> execution
>> > fails with a different error:
>> >
>> > 15/12/29 23:03:19 WARN HeartbeatReceiver: Removing executor 0 with no
>> recent
>> > heartbeats: 129324 ms exceeds timeout 120000 ms
>> >
>> > And I see that GC takes a lot of time.
>> >
>> > What is a proper way to execute statements above?
>> >
>> > I see the similar problems reported
>> >
>> http://stackoverflow.com/questions/32196859/org-apache-spark-shuffle-fetchfailedexception
>> >
>> http://stackoverflow.com/questions/32544478/spark-memory-settings-for-count-action-in-a-big-table
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> > --
>> > View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Problem-with-WINDOW-functions-tp25833.html
>> > Sent from the Apache Spark User List mailing list archive at Nabble.com.
>> >
>> > ---------------------------------------------------------------------
>> > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> > For additional commands, e-mail: user-h...@spark.apache.org
>> >
>>
>
>

Reply via email to