@ayan,

Thanks for your response

I would like to have functions in this case  calculateIncome and the reason
why I need function is to reuse in other parts of the application ..that's
the reason I'm planning for mapgroups with function as argument which takes
rowiterator ..but not sure if this is the best to implement as my initial
dataframe is very large

On Tue, Aug 29, 2017 at 10:24 PM ayan guha <guha.a...@gmail.com> wrote:

> Hi
>
> the tool you are looking for is window function.  Example:
>
> >>> df.show()
> +--------+----+---+------+-------------+
> |JoinDate|dept| id|income|income_age_ts|
> +--------+----+---+------+-------------+
> | 4/20/13|  ES|101| 19000|      4/20/17|
> | 4/20/13|  OS|101| 10000|      10/3/15|
> | 4/20/12|  DS|102| 13000|       5/9/17|
> | 4/20/12|  CS|102| 12000|       5/8/17|
> | 4/20/10|  EQ|103| 10000|       5/9/17|
> | 4/20/10|  MD|103|  9000|       5/8/17|
> +--------+----+---+------+-------------+
>
> >>> res = spark.sql("select *, row_number() over (partition by id order by
> income_age_ts desc) r from t")
> >>> res.show()
> +--------+----+---+------+-------------+---+
> |JoinDate|dept| id|income|income_age_ts|  r|
> +--------+----+---+------+-------------+---+
> | 4/20/10|  EQ|103| 10000|       5/9/17|  1|
> | 4/20/10|  MD|103|  9000|       5/8/17|  2|
> | 4/20/13|  ES|101| 19000|      4/20/17|  1|
> | 4/20/13|  OS|101| 10000|      10/3/15|  2|
> | 4/20/12|  DS|102| 13000|       5/9/17|  1|
> | 4/20/12|  CS|102| 12000|       5/8/17|  2|
> +--------+----+---+------+-------------+---+
>
> >>> res = spark.sql("select * from (select *, row_number() over (partition
> by id order by income_age_ts desc) r from t) x where r=1")
> >>> res.show()
> +--------+----+---+------+-------------+---+
> |JoinDate|dept| id|income|income_age_ts|  r|
> +--------+----+---+------+-------------+---+
> | 4/20/10|  EQ|103| 10000|       5/9/17|  1|
> | 4/20/13|  ES|101| 19000|      4/20/17|  1|
> | 4/20/12|  DS|102| 13000|       5/9/17|  1|
> +--------+----+---+------+-------------+---+
>
> This should be better because it uses all in-built optimizations in Spark.
>
> Best
> Ayan
>
> On Wed, Aug 30, 2017 at 11:06 AM, purna pradeep <purna2prad...@gmail.com>
> wrote:
>
>> Please click on unnamed text/html  link for better view
>>
>> On Tue, Aug 29, 2017 at 8:11 PM purna pradeep <purna2prad...@gmail.com>
>> wrote:
>>
>>>
>>> ---------- Forwarded message ---------
>>> From: Mamillapalli, Purna Pradeep <
>>> purnapradeep.mamillapa...@capitalone.com>
>>> Date: Tue, Aug 29, 2017 at 8:08 PM
>>> Subject: Spark question
>>> To: purna pradeep <purna2prad...@gmail.com>
>>>
>>> Below is the input Dataframe(In real this is a very large Dataframe)
>>>
>>>
>>>
>>> EmployeeID
>>>
>>> INCOME
>>>
>>> INCOME AGE TS
>>>
>>> JoinDate
>>>
>>> Dept
>>>
>>> 101
>>>
>>> 19000
>>>
>>> 4/20/17
>>>
>>> 4/20/13
>>>
>>> ES
>>>
>>> 101
>>>
>>> 10000
>>>
>>> 10/3/15
>>>
>>> 4/20/13
>>>
>>> OS
>>>
>>> 102
>>>
>>> 13000
>>>
>>> 5/9/17
>>>
>>> 4/20/12
>>>
>>> DS
>>>
>>> 102
>>>
>>> 12000
>>>
>>> 5/8/17
>>>
>>> 4/20/12
>>>
>>> CS
>>>
>>> 103
>>>
>>> 10000
>>>
>>> 5/9/17
>>>
>>> 4/20/10
>>>
>>> EQ
>>>
>>> 103
>>>
>>> 9000
>>>
>>> 5/8/15
>>>
>>> 4/20/10
>>>
>>> MD
>>>
>>> Get the latest income of an employee which has  Income_age ts <10 months
>>>
>>> Expected output Dataframe
>>>
>>> EmployeeID
>>>
>>> INCOME
>>>
>>> INCOME AGE TS
>>>
>>> JoinDate
>>>
>>> Dept
>>>
>>> 101
>>>
>>> 19000
>>>
>>> 4/20/17
>>>
>>> 4/20/13
>>>
>>> ES
>>>
>>> 102
>>>
>>> 13000
>>>
>>> 5/9/17
>>>
>>> 4/20/12
>>>
>>> DS
>>>
>>> 103
>>>
>>> 10000
>>>
>>> 5/9/17
>>>
>>> 4/20/10
>>>
>>> EQ
>>>
>>>
>>>
>>
>>
>>
>>
>>
>> Below is what im planning to implement
>>>
>>>
>>>
>>> case class employee (*EmployeeID*: Int, *INCOME*: Int, INCOMEAGE: Int,
>>> *JOINDATE*: Int,DEPT:String)
>>>
>>>
>>>
>>> *val *empSchema = *new *StructType().add(*"EmployeeID"*,*"Int"*).add(
>>> *"INCOME"*, *"Int"*).add(*"INCOMEAGE"*,*"Date"*) . add(*"JOINDATE"*,
>>> *"Date"*). add(*"DEPT"*,*"String"*)
>>>
>>>
>>>
>>> *//Reading from the File **import *sparkSession.implicits._
>>>
>>> *val *readEmpFile = sparkSession.read
>>>   .option(*"sep"*, *","*)
>>>   .schema(empSchema)
>>>   .csv(INPUT_DIRECTORY)
>>>
>>>
>>> *//Create employee DataFrame **val *custDf = readEmpFile.as[employee]
>>>
>>>
>>> *//Adding Salary Column **val *groupByDf = custDf.groupByKey(a => a.*
>>> EmployeeID*)
>>>
>>>
>>> *val *k = groupByDf.mapGroups((key,value) => performETL(value))
>>>
>>>
>>>
>>>
>>>
>>> *def *performETL(empData: Iterator[employee]) : new employee  = {
>>>
>>>   *val *empList = empData.toList
>>>
>>>
>>> *//calculate income has Logic to figureout latest income for an account
>>> and returns latest income   val *income = calculateIncome(empList)
>>>
>>>
>>>   *for *(i <- empList) {
>>>
>>>       *val *row = i
>>>
>>> *return new *employee(row.EmployeeID, row.INCOMEAGE , income)
>>>   }
>>>   *return  "Done"*
>>>
>>>
>>>
>>> }
>>>
>>>
>>>
>>> Is this a better approach or even the right approach to implement the
>>> same.If not please suggest a better way to implement the same?
>>>
>>>
>>>
>>> ------------------------------
>>>
>>> The information contained in this e-mail is confidential and/or
>>> proprietary to Capital One and/or its affiliates and may only be used
>>> solely in performance of work or services for Capital One. The information
>>> transmitted herewith is intended only for use by the individual or entity
>>> to which it is addressed. If the reader of this message is not the intended
>>> recipient, you are hereby notified that any review, retransmission,
>>> dissemination, distribution, copying or other use of, or taking of any
>>> action in reliance upon this information is strictly prohibited. If you
>>> have received this communication in error, please contact the sender and
>>> delete the material from your computer.
>>>
>>
>
>
> --
> Best Regards,
> Ayan Guha
>

Reply via email to