@Andres I need latest but it should less than 10 months based income_age column and don't want to use sql here
On Wed, Aug 30, 2017 at 8:08 AM Andrés Ivaldi <iaiva...@gmail.com> wrote: > Hi, if you need the last value from income in window function you can use > last_value. > No tested but meaby with @ayan sql > > spark.sql("select *, row_number(), last_value(income) over (partition by > id order by income_age_ts desc) r from t") > > > On Tue, Aug 29, 2017 at 11:30 PM, purna pradeep <purna2prad...@gmail.com> > wrote: > >> @ayan, >> >> Thanks for your response >> >> I would like to have functions in this case calculateIncome and the >> reason why I need function is to reuse in other parts of the application >> ..that's the reason I'm planning for mapgroups with function as argument >> which takes rowiterator ..but not sure if this is the best to implement as >> my initial dataframe is very large >> >> On Tue, Aug 29, 2017 at 10:24 PM ayan guha <guha.a...@gmail.com> wrote: >> >>> Hi >>> >>> the tool you are looking for is window function. Example: >>> >>> >>> df.show() >>> +--------+----+---+------+-------------+ >>> |JoinDate|dept| id|income|income_age_ts| >>> +--------+----+---+------+-------------+ >>> | 4/20/13| ES|101| 19000| 4/20/17| >>> | 4/20/13| OS|101| 10000| 10/3/15| >>> | 4/20/12| DS|102| 13000| 5/9/17| >>> | 4/20/12| CS|102| 12000| 5/8/17| >>> | 4/20/10| EQ|103| 10000| 5/9/17| >>> | 4/20/10| MD|103| 9000| 5/8/17| >>> +--------+----+---+------+-------------+ >>> >>> >>> res = spark.sql("select *, row_number() over (partition by id order >>> by income_age_ts desc) r from t") >>> >>> res.show() >>> +--------+----+---+------+-------------+---+ >>> |JoinDate|dept| id|income|income_age_ts| r| >>> +--------+----+---+------+-------------+---+ >>> | 4/20/10| EQ|103| 10000| 5/9/17| 1| >>> | 4/20/10| MD|103| 9000| 5/8/17| 2| >>> | 4/20/13| ES|101| 19000| 4/20/17| 1| >>> | 4/20/13| OS|101| 10000| 10/3/15| 2| >>> | 4/20/12| DS|102| 13000| 5/9/17| 1| >>> | 4/20/12| CS|102| 12000| 5/8/17| 2| >>> +--------+----+---+------+-------------+---+ >>> >>> >>> res = spark.sql("select * from (select *, row_number() over >>> (partition by id order by income_age_ts desc) r from t) x where r=1") >>> >>> res.show() >>> +--------+----+---+------+-------------+---+ >>> |JoinDate|dept| id|income|income_age_ts| r| >>> +--------+----+---+------+-------------+---+ >>> | 4/20/10| EQ|103| 10000| 5/9/17| 1| >>> | 4/20/13| ES|101| 19000| 4/20/17| 1| >>> | 4/20/12| DS|102| 13000| 5/9/17| 1| >>> +--------+----+---+------+-------------+---+ >>> >>> This should be better because it uses all in-built optimizations in >>> Spark. >>> >>> Best >>> Ayan >>> >>> On Wed, Aug 30, 2017 at 11:06 AM, purna pradeep <purna2prad...@gmail.com >>> > wrote: >>> >>>> Please click on unnamed text/html link for better view >>>> >>>> On Tue, Aug 29, 2017 at 8:11 PM purna pradeep <purna2prad...@gmail.com> >>>> wrote: >>>> >>>>> >>>>> ---------- Forwarded message --------- >>>>> From: Mamillapalli, Purna Pradeep < >>>>> purnapradeep.mamillapa...@capitalone.com> >>>>> Date: Tue, Aug 29, 2017 at 8:08 PM >>>>> Subject: Spark question >>>>> To: purna pradeep <purna2prad...@gmail.com> >>>>> >>>>> Below is the input Dataframe(In real this is a very large Dataframe) >>>>> >>>>> >>>>> >>>>> EmployeeID >>>>> >>>>> INCOME >>>>> >>>>> INCOME AGE TS >>>>> >>>>> JoinDate >>>>> >>>>> Dept >>>>> >>>>> 101 >>>>> >>>>> 19000 >>>>> >>>>> 4/20/17 >>>>> >>>>> 4/20/13 >>>>> >>>>> ES >>>>> >>>>> 101 >>>>> >>>>> 10000 >>>>> >>>>> 10/3/15 >>>>> >>>>> 4/20/13 >>>>> >>>>> OS >>>>> >>>>> 102 >>>>> >>>>> 13000 >>>>> >>>>> 5/9/17 >>>>> >>>>> 4/20/12 >>>>> >>>>> DS >>>>> >>>>> 102 >>>>> >>>>> 12000 >>>>> >>>>> 5/8/17 >>>>> >>>>> 4/20/12 >>>>> >>>>> CS >>>>> >>>>> 103 >>>>> >>>>> 10000 >>>>> >>>>> 5/9/17 >>>>> >>>>> 4/20/10 >>>>> >>>>> EQ >>>>> >>>>> 103 >>>>> >>>>> 9000 >>>>> >>>>> 5/8/15 >>>>> >>>>> 4/20/10 >>>>> >>>>> MD >>>>> >>>>> Get the latest income of an employee which has Income_age ts <10 >>>>> months >>>>> >>>>> Expected output Dataframe >>>>> >>>>> EmployeeID >>>>> >>>>> INCOME >>>>> >>>>> INCOME AGE TS >>>>> >>>>> JoinDate >>>>> >>>>> Dept >>>>> >>>>> 101 >>>>> >>>>> 19000 >>>>> >>>>> 4/20/17 >>>>> >>>>> 4/20/13 >>>>> >>>>> ES >>>>> >>>>> 102 >>>>> >>>>> 13000 >>>>> >>>>> 5/9/17 >>>>> >>>>> 4/20/12 >>>>> >>>>> DS >>>>> >>>>> 103 >>>>> >>>>> 10000 >>>>> >>>>> 5/9/17 >>>>> >>>>> 4/20/10 >>>>> >>>>> EQ >>>>> >>>>> >>>>> >>>> >>>> >>>> >>>> >>>> >>>> Below is what im planning to implement >>>>> >>>>> >>>>> >>>>> case class employee (*EmployeeID*: Int, *INCOME*: Int, INCOMEAGE: >>>>> Int, *JOINDATE*: Int,DEPT:String) >>>>> >>>>> >>>>> >>>>> *val *empSchema = *new *StructType().add(*"EmployeeID"*,*"Int"*).add( >>>>> *"INCOME"*, *"Int"*).add(*"INCOMEAGE"*,*"Date"*) . add(*"JOINDATE"*, >>>>> *"Date"*). add(*"DEPT"*,*"String"*) >>>>> >>>>> >>>>> >>>>> *//Reading from the File **import *sparkSession.implicits._ >>>>> >>>>> *val *readEmpFile = sparkSession.read >>>>> .option(*"sep"*, *","*) >>>>> .schema(empSchema) >>>>> .csv(INPUT_DIRECTORY) >>>>> >>>>> >>>>> *//Create employee DataFrame **val *custDf = readEmpFile.as[employee] >>>>> >>>>> >>>>> *//Adding Salary Column **val *groupByDf = custDf.groupByKey(a => a.* >>>>> EmployeeID*) >>>>> >>>>> >>>>> *val *k = groupByDf.mapGroups((key,value) => performETL(value)) >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> *def *performETL(empData: Iterator[employee]) : new employee = { >>>>> >>>>> *val *empList = empData.toList >>>>> >>>>> >>>>> *//calculate income has Logic to figureout latest income for an >>>>> account and returns latest income val *income = >>>>> calculateIncome(empList) >>>>> >>>>> >>>>> *for *(i <- empList) { >>>>> >>>>> *val *row = i >>>>> >>>>> *return new *employee(row.EmployeeID, row.INCOMEAGE , income) >>>>> } >>>>> *return "Done"* >>>>> >>>>> >>>>> >>>>> } >>>>> >>>>> >>>>> >>>>> Is this a better approach or even the right approach to implement the >>>>> same.If not please suggest a better way to implement the same? >>>>> >>>>> >>>>> >>>>> ------------------------------ >>>>> >>>>> The information contained in this e-mail is confidential and/or >>>>> proprietary to Capital One and/or its affiliates and may only be used >>>>> solely in performance of work or services for Capital One. The information >>>>> transmitted herewith is intended only for use by the individual or entity >>>>> to which it is addressed. If the reader of this message is not the >>>>> intended >>>>> recipient, you are hereby notified that any review, retransmission, >>>>> dissemination, distribution, copying or other use of, or taking of any >>>>> action in reliance upon this information is strictly prohibited. If you >>>>> have received this communication in error, please contact the sender and >>>>> delete the material from your computer. >>>>> >>>> >>> >>> >>> -- >>> Best Regards, >>> Ayan Guha >>> >> > > > -- > Ing. Ivaldi Andres >