Hi
the tool you are looking for is window function. Example:
>>> df.show()
+--------+----+---+------+-------------+
|JoinDate|dept| id|income|income_age_ts|
+--------+----+---+------+-------------+
| 4/20/13| ES|101| 19000| 4/20/17|
| 4/20/13| OS|101| 10000| 10/3/15|
| 4/20/12| DS|102| 13000| 5/9/17|
| 4/20/12| CS|102| 12000| 5/8/17|
| 4/20/10| EQ|103| 10000| 5/9/17|
| 4/20/10| MD|103| 9000| 5/8/17|
+--------+----+---+------+-------------+
>>> res = spark.sql("select *, row_number() over (partition by id order by
income_age_ts desc) r from t")
>>> res.show()
+--------+----+---+------+-------------+---+
|JoinDate|dept| id|income|income_age_ts| r|
+--------+----+---+------+-------------+---+
| 4/20/10| EQ|103| 10000| 5/9/17| 1|
| 4/20/10| MD|103| 9000| 5/8/17| 2|
| 4/20/13| ES|101| 19000| 4/20/17| 1|
| 4/20/13| OS|101| 10000| 10/3/15| 2|
| 4/20/12| DS|102| 13000| 5/9/17| 1|
| 4/20/12| CS|102| 12000| 5/8/17| 2|
+--------+----+---+------+-------------+---+
>>> res = spark.sql("select * from (select *, row_number() over (partition
by id order by income_age_ts desc) r from t) x where r=1")
>>> res.show()
+--------+----+---+------+-------------+---+
|JoinDate|dept| id|income|income_age_ts| r|
+--------+----+---+------+-------------+---+
| 4/20/10| EQ|103| 10000| 5/9/17| 1|
| 4/20/13| ES|101| 19000| 4/20/17| 1|
| 4/20/12| DS|102| 13000| 5/9/17| 1|
+--------+----+---+------+-------------+---+
This should be better because it uses all in-built optimizations in Spark.
Best
Ayan
On Wed, Aug 30, 2017 at 11:06 AM, purna pradeep <[email protected]>
wrote:
> Please click on unnamed text/html link for better view
>
> On Tue, Aug 29, 2017 at 8:11 PM purna pradeep <[email protected]>
> wrote:
>
>>
>> ---------- Forwarded message ---------
>> From: Mamillapalli, Purna Pradeep <PurnaPradeep.Mamillapalli@
>> capitalone.com>
>> Date: Tue, Aug 29, 2017 at 8:08 PM
>> Subject: Spark question
>> To: purna pradeep <[email protected]>
>>
>> Below is the input Dataframe(In real this is a very large Dataframe)
>>
>>
>>
>> EmployeeID
>>
>> INCOME
>>
>> INCOME AGE TS
>>
>> JoinDate
>>
>> Dept
>>
>> 101
>>
>> 19000
>>
>> 4/20/17
>>
>> 4/20/13
>>
>> ES
>>
>> 101
>>
>> 10000
>>
>> 10/3/15
>>
>> 4/20/13
>>
>> OS
>>
>> 102
>>
>> 13000
>>
>> 5/9/17
>>
>> 4/20/12
>>
>> DS
>>
>> 102
>>
>> 12000
>>
>> 5/8/17
>>
>> 4/20/12
>>
>> CS
>>
>> 103
>>
>> 10000
>>
>> 5/9/17
>>
>> 4/20/10
>>
>> EQ
>>
>> 103
>>
>> 9000
>>
>> 5/8/15
>>
>> 4/20/10
>>
>> MD
>>
>> Get the latest income of an employee which has Income_age ts <10 months
>>
>> Expected output Dataframe
>>
>> EmployeeID
>>
>> INCOME
>>
>> INCOME AGE TS
>>
>> JoinDate
>>
>> Dept
>>
>> 101
>>
>> 19000
>>
>> 4/20/17
>>
>> 4/20/13
>>
>> ES
>>
>> 102
>>
>> 13000
>>
>> 5/9/17
>>
>> 4/20/12
>>
>> DS
>>
>> 103
>>
>> 10000
>>
>> 5/9/17
>>
>> 4/20/10
>>
>> EQ
>>
>>
>>
>
>
>
>
>
> Below is what im planning to implement
>>
>>
>>
>> case class employee (*EmployeeID*: Int, *INCOME*: Int, INCOMEAGE: Int,
>> *JOINDATE*: Int,DEPT:String)
>>
>>
>>
>> *val *empSchema = *new *StructType().add(*"EmployeeID"*,*"Int"*).add(
>> *"INCOME"*, *"Int"*).add(*"INCOMEAGE"*,*"Date"*) . add(*"JOINDATE"*,
>> *"Date"*). add(*"DEPT"*,*"String"*)
>>
>>
>>
>> *//Reading from the File **import *sparkSession.implicits._
>>
>> *val *readEmpFile = sparkSession.read
>> .option(*"sep"*, *","*)
>> .schema(empSchema)
>> .csv(INPUT_DIRECTORY)
>>
>>
>> *//Create employee DataFrame **val *custDf = readEmpFile.as[employee]
>>
>>
>> *//Adding Salary Column **val *groupByDf = custDf.groupByKey(a => a.*
>> EmployeeID*)
>>
>>
>> *val *k = groupByDf.mapGroups((key,value) => performETL(value))
>>
>>
>>
>>
>>
>> *def *performETL(empData: Iterator[employee]) : new employee = {
>>
>> *val *empList = empData.toList
>>
>>
>> *//calculate income has Logic to figureout latest income for an account
>> and returns latest income val *income = calculateIncome(empList)
>>
>>
>> *for *(i <- empList) {
>>
>> *val *row = i
>>
>> *return new *employee(row.EmployeeID, row.INCOMEAGE , income)
>> }
>> *return "Done"*
>>
>>
>>
>> }
>>
>>
>>
>> Is this a better approach or even the right approach to implement the
>> same.If not please suggest a better way to implement the same?
>>
>>
>>
>> ------------------------------
>>
>> The information contained in this e-mail is confidential and/or
>> proprietary to Capital One and/or its affiliates and may only be used
>> solely in performance of work or services for Capital One. The information
>> transmitted herewith is intended only for use by the individual or entity
>> to which it is addressed. If the reader of this message is not the intended
>> recipient, you are hereby notified that any review, retransmission,
>> dissemination, distribution, copying or other use of, or taking of any
>> action in reliance upon this information is strictly prohibited. If you
>> have received this communication in error, please contact the sender and
>> delete the material from your computer.
>>
>
--
Best Regards,
Ayan Guha