@Andres I need latest but it should less than 10 months based income_age
column and don't want to use sql here

On Wed, Aug 30, 2017 at 8:08 AM Andrés Ivaldi <iaiva...@gmail.com> wrote:

> Hi, if you need the last value from income in window function you can use
> last_value.
> No tested but meaby with @ayan sql
>
> spark.sql("select *, row_number(), last_value(income) over (partition by
> id order by income_age_ts desc) r from t")
>
>
> On Tue, Aug 29, 2017 at 11:30 PM, purna pradeep <purna2prad...@gmail.com>
> wrote:
>
>> @ayan,
>>
>> Thanks for your response
>>
>> I would like to have functions in this case  calculateIncome and the
>> reason why I need function is to reuse in other parts of the application
>> ..that's the reason I'm planning for mapgroups with function as argument
>> which takes rowiterator ..but not sure if this is the best to implement as
>> my initial dataframe is very large
>>
>> On Tue, Aug 29, 2017 at 10:24 PM ayan guha <guha.a...@gmail.com> wrote:
>>
>>> Hi
>>>
>>> the tool you are looking for is window function.  Example:
>>>
>>> >>> df.show()
>>> +--------+----+---+------+-------------+
>>> |JoinDate|dept| id|income|income_age_ts|
>>> +--------+----+---+------+-------------+
>>> | 4/20/13|  ES|101| 19000|      4/20/17|
>>> | 4/20/13|  OS|101| 10000|      10/3/15|
>>> | 4/20/12|  DS|102| 13000|       5/9/17|
>>> | 4/20/12|  CS|102| 12000|       5/8/17|
>>> | 4/20/10|  EQ|103| 10000|       5/9/17|
>>> | 4/20/10|  MD|103|  9000|       5/8/17|
>>> +--------+----+---+------+-------------+
>>>
>>> >>> res = spark.sql("select *, row_number() over (partition by id order
>>> by income_age_ts desc) r from t")
>>> >>> res.show()
>>> +--------+----+---+------+-------------+---+
>>> |JoinDate|dept| id|income|income_age_ts|  r|
>>> +--------+----+---+------+-------------+---+
>>> | 4/20/10|  EQ|103| 10000|       5/9/17|  1|
>>> | 4/20/10|  MD|103|  9000|       5/8/17|  2|
>>> | 4/20/13|  ES|101| 19000|      4/20/17|  1|
>>> | 4/20/13|  OS|101| 10000|      10/3/15|  2|
>>> | 4/20/12|  DS|102| 13000|       5/9/17|  1|
>>> | 4/20/12|  CS|102| 12000|       5/8/17|  2|
>>> +--------+----+---+------+-------------+---+
>>>
>>> >>> res = spark.sql("select * from (select *, row_number() over
>>> (partition by id order by income_age_ts desc) r from t) x where r=1")
>>> >>> res.show()
>>> +--------+----+---+------+-------------+---+
>>> |JoinDate|dept| id|income|income_age_ts|  r|
>>> +--------+----+---+------+-------------+---+
>>> | 4/20/10|  EQ|103| 10000|       5/9/17|  1|
>>> | 4/20/13|  ES|101| 19000|      4/20/17|  1|
>>> | 4/20/12|  DS|102| 13000|       5/9/17|  1|
>>> +--------+----+---+------+-------------+---+
>>>
>>> This should be better because it uses all in-built optimizations in
>>> Spark.
>>>
>>> Best
>>> Ayan
>>>
>>> On Wed, Aug 30, 2017 at 11:06 AM, purna pradeep <purna2prad...@gmail.com
>>> > wrote:
>>>
>>>> Please click on unnamed text/html  link for better view
>>>>
>>>> On Tue, Aug 29, 2017 at 8:11 PM purna pradeep <purna2prad...@gmail.com>
>>>> wrote:
>>>>
>>>>>
>>>>> ---------- Forwarded message ---------
>>>>> From: Mamillapalli, Purna Pradeep <
>>>>> purnapradeep.mamillapa...@capitalone.com>
>>>>> Date: Tue, Aug 29, 2017 at 8:08 PM
>>>>> Subject: Spark question
>>>>> To: purna pradeep <purna2prad...@gmail.com>
>>>>>
>>>>> Below is the input Dataframe(In real this is a very large Dataframe)
>>>>>
>>>>>
>>>>>
>>>>> EmployeeID
>>>>>
>>>>> INCOME
>>>>>
>>>>> INCOME AGE TS
>>>>>
>>>>> JoinDate
>>>>>
>>>>> Dept
>>>>>
>>>>> 101
>>>>>
>>>>> 19000
>>>>>
>>>>> 4/20/17
>>>>>
>>>>> 4/20/13
>>>>>
>>>>> ES
>>>>>
>>>>> 101
>>>>>
>>>>> 10000
>>>>>
>>>>> 10/3/15
>>>>>
>>>>> 4/20/13
>>>>>
>>>>> OS
>>>>>
>>>>> 102
>>>>>
>>>>> 13000
>>>>>
>>>>> 5/9/17
>>>>>
>>>>> 4/20/12
>>>>>
>>>>> DS
>>>>>
>>>>> 102
>>>>>
>>>>> 12000
>>>>>
>>>>> 5/8/17
>>>>>
>>>>> 4/20/12
>>>>>
>>>>> CS
>>>>>
>>>>> 103
>>>>>
>>>>> 10000
>>>>>
>>>>> 5/9/17
>>>>>
>>>>> 4/20/10
>>>>>
>>>>> EQ
>>>>>
>>>>> 103
>>>>>
>>>>> 9000
>>>>>
>>>>> 5/8/15
>>>>>
>>>>> 4/20/10
>>>>>
>>>>> MD
>>>>>
>>>>> Get the latest income of an employee which has  Income_age ts <10
>>>>> months
>>>>>
>>>>> Expected output Dataframe
>>>>>
>>>>> EmployeeID
>>>>>
>>>>> INCOME
>>>>>
>>>>> INCOME AGE TS
>>>>>
>>>>> JoinDate
>>>>>
>>>>> Dept
>>>>>
>>>>> 101
>>>>>
>>>>> 19000
>>>>>
>>>>> 4/20/17
>>>>>
>>>>> 4/20/13
>>>>>
>>>>> ES
>>>>>
>>>>> 102
>>>>>
>>>>> 13000
>>>>>
>>>>> 5/9/17
>>>>>
>>>>> 4/20/12
>>>>>
>>>>> DS
>>>>>
>>>>> 103
>>>>>
>>>>> 10000
>>>>>
>>>>> 5/9/17
>>>>>
>>>>> 4/20/10
>>>>>
>>>>> EQ
>>>>>
>>>>>
>>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> Below is what im planning to implement
>>>>>
>>>>>
>>>>>
>>>>> case class employee (*EmployeeID*: Int, *INCOME*: Int, INCOMEAGE:
>>>>> Int, *JOINDATE*: Int,DEPT:String)
>>>>>
>>>>>
>>>>>
>>>>> *val *empSchema = *new *StructType().add(*"EmployeeID"*,*"Int"*).add(
>>>>> *"INCOME"*, *"Int"*).add(*"INCOMEAGE"*,*"Date"*) . add(*"JOINDATE"*,
>>>>> *"Date"*). add(*"DEPT"*,*"String"*)
>>>>>
>>>>>
>>>>>
>>>>> *//Reading from the File **import *sparkSession.implicits._
>>>>>
>>>>> *val *readEmpFile = sparkSession.read
>>>>>   .option(*"sep"*, *","*)
>>>>>   .schema(empSchema)
>>>>>   .csv(INPUT_DIRECTORY)
>>>>>
>>>>>
>>>>> *//Create employee DataFrame **val *custDf = readEmpFile.as[employee]
>>>>>
>>>>>
>>>>> *//Adding Salary Column **val *groupByDf = custDf.groupByKey(a => a.*
>>>>> EmployeeID*)
>>>>>
>>>>>
>>>>> *val *k = groupByDf.mapGroups((key,value) => performETL(value))
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> *def *performETL(empData: Iterator[employee]) : new employee  = {
>>>>>
>>>>>   *val *empList = empData.toList
>>>>>
>>>>>
>>>>> *//calculate income has Logic to figureout latest income for an
>>>>> account and returns latest income   val *income =
>>>>> calculateIncome(empList)
>>>>>
>>>>>
>>>>>   *for *(i <- empList) {
>>>>>
>>>>>       *val *row = i
>>>>>
>>>>> *return new *employee(row.EmployeeID, row.INCOMEAGE , income)
>>>>>   }
>>>>>   *return  "Done"*
>>>>>
>>>>>
>>>>>
>>>>> }
>>>>>
>>>>>
>>>>>
>>>>> Is this a better approach or even the right approach to implement the
>>>>> same.If not please suggest a better way to implement the same?
>>>>>
>>>>>
>>>>>
>>>>> ------------------------------
>>>>>
>>>>> The information contained in this e-mail is confidential and/or
>>>>> proprietary to Capital One and/or its affiliates and may only be used
>>>>> solely in performance of work or services for Capital One. The information
>>>>> transmitted herewith is intended only for use by the individual or entity
>>>>> to which it is addressed. If the reader of this message is not the 
>>>>> intended
>>>>> recipient, you are hereby notified that any review, retransmission,
>>>>> dissemination, distribution, copying or other use of, or taking of any
>>>>> action in reliance upon this information is strictly prohibited. If you
>>>>> have received this communication in error, please contact the sender and
>>>>> delete the material from your computer.
>>>>>
>>>>
>>>
>>>
>>> --
>>> Best Regards,
>>> Ayan Guha
>>>
>>
>
>
> --
> Ing. Ivaldi Andres
>

Reply via email to