Re: Select entire row based on a logic applied on 2 columns across multiple rows
I see, as @ayan said, it's valid, but, why don't use API or SQL, the build-in options are optimized I understand that SQL API is hard when trying to build an api over that, but Spark API doesn't, and you can do a lot with that. regards, On Wed, Aug 30, 2017 at 10:31 AM, ayan guhawrote: > Well, using raw sql is a valid option, but if you do not want you can > always implement the concept using apis. All these constructs have api > counterparts, such as filter, window, over, row number etc. > > On Wed, 30 Aug 2017 at 10:49 pm, purna pradeep > wrote: > >> @Andres I need latest but it should less than 10 months based income_age >> column and don't want to use sql here >> >> On Wed, Aug 30, 2017 at 8:08 AM Andrés Ivaldi wrote: >> >>> Hi, if you need the last value from income in window function you can >>> use last_value. >>> No tested but meaby with @ayan sql >>> >>> spark.sql("select *, row_number(), last_value(income) over (partition by >>> id order by income_age_ts desc) r from t") >>> >>> >>> On Tue, Aug 29, 2017 at 11:30 PM, purna pradeep >> > wrote: >>> @ayan, Thanks for your response I would like to have functions in this case calculateIncome and the reason why I need function is to reuse in other parts of the application ..that's the reason I'm planning for mapgroups with function as argument which takes rowiterator ..but not sure if this is the best to implement as my initial dataframe is very large On Tue, Aug 29, 2017 at 10:24 PM ayan guha wrote: > Hi > > the tool you are looking for is window function. Example: > > >>> df.show() > +++---+--+-+ > |JoinDate|dept| id|income|income_age_ts| > +++---+--+-+ > | 4/20/13| ES|101| 19000| 4/20/17| > | 4/20/13| OS|101| 1| 10/3/15| > | 4/20/12| DS|102| 13000| 5/9/17| > | 4/20/12| CS|102| 12000| 5/8/17| > | 4/20/10| EQ|103| 1| 5/9/17| > | 4/20/10| MD|103| 9000| 5/8/17| > +++---+--+-+ > > >>> res = spark.sql("select *, row_number() over (partition by id > order by income_age_ts desc) r from t") > >>> res.show() > +++---+--+-+---+ > |JoinDate|dept| id|income|income_age_ts| r| > +++---+--+-+---+ > | 4/20/10| EQ|103| 1| 5/9/17| 1| > | 4/20/10| MD|103| 9000| 5/8/17| 2| > | 4/20/13| ES|101| 19000| 4/20/17| 1| > | 4/20/13| OS|101| 1| 10/3/15| 2| > | 4/20/12| DS|102| 13000| 5/9/17| 1| > | 4/20/12| CS|102| 12000| 5/8/17| 2| > +++---+--+-+---+ > > >>> res = spark.sql("select * from (select *, row_number() over > (partition by id order by income_age_ts desc) r from t) x where r=1") > >>> res.show() > +++---+--+-+---+ > |JoinDate|dept| id|income|income_age_ts| r| > +++---+--+-+---+ > | 4/20/10| EQ|103| 1| 5/9/17| 1| > | 4/20/13| ES|101| 19000| 4/20/17| 1| > | 4/20/12| DS|102| 13000| 5/9/17| 1| > +++---+--+-+---+ > > This should be better because it uses all in-built optimizations in > Spark. > > Best > Ayan > > On Wed, Aug 30, 2017 at 11:06 AM, purna pradeep < > purna2prad...@gmail.com> wrote: > >> Please click on unnamed text/html link for better view >> >> On Tue, Aug 29, 2017 at 8:11 PM purna pradeep < >> purna2prad...@gmail.com> wrote: >> >>> >>> -- Forwarded message - >>> From: Mamillapalli, Purna Pradeep >> capitalone.com> >>> Date: Tue, Aug 29, 2017 at 8:08 PM >>> Subject: Spark question >>> To: purna pradeep >>> >>> Below is the input Dataframe(In real this is a very large Dataframe) >>> >>> >>> >>> EmployeeID >>> >>> INCOME >>> >>> INCOME AGE TS >>> >>> JoinDate >>> >>> Dept >>> >>> 101 >>> >>> 19000 >>> >>> 4/20/17 >>> >>> 4/20/13 >>> >>> ES >>> >>> 101 >>> >>> 1 >>> >>> 10/3/15 >>> >>> 4/20/13 >>> >>> OS >>> >>> 102 >>> >>> 13000 >>> >>> 5/9/17 >>> >>> 4/20/12 >>> >>> DS >>> >>> 102 >>> >>> 12000 >>> >>> 5/8/17 >>> >>> 4/20/12 >>> >>> CS >>> >>> 103 >>> >>> 1 >>> >>> 5/9/17 >>> >>> 4/20/10 >>> >>> EQ >>> >>> 103 >>> >>> 9000 >>> >>> 5/8/15 >>> >>> 4/20/10 >>> >>> MD >>> >>> Get the
Re: Select entire row based on a logic applied on 2 columns across multiple rows
Well, using raw sql is a valid option, but if you do not want you can always implement the concept using apis. All these constructs have api counterparts, such as filter, window, over, row number etc. On Wed, 30 Aug 2017 at 10:49 pm, purna pradeepwrote: > @Andres I need latest but it should less than 10 months based income_age > column and don't want to use sql here > > On Wed, Aug 30, 2017 at 8:08 AM Andrés Ivaldi wrote: > >> Hi, if you need the last value from income in window function you can use >> last_value. >> No tested but meaby with @ayan sql >> >> spark.sql("select *, row_number(), last_value(income) over (partition by >> id order by income_age_ts desc) r from t") >> >> >> On Tue, Aug 29, 2017 at 11:30 PM, purna pradeep >> wrote: >> >>> @ayan, >>> >>> Thanks for your response >>> >>> I would like to have functions in this case calculateIncome and the >>> reason why I need function is to reuse in other parts of the application >>> ..that's the reason I'm planning for mapgroups with function as argument >>> which takes rowiterator ..but not sure if this is the best to implement as >>> my initial dataframe is very large >>> >>> On Tue, Aug 29, 2017 at 10:24 PM ayan guha wrote: >>> Hi the tool you are looking for is window function. Example: >>> df.show() +++---+--+-+ |JoinDate|dept| id|income|income_age_ts| +++---+--+-+ | 4/20/13| ES|101| 19000| 4/20/17| | 4/20/13| OS|101| 1| 10/3/15| | 4/20/12| DS|102| 13000| 5/9/17| | 4/20/12| CS|102| 12000| 5/8/17| | 4/20/10| EQ|103| 1| 5/9/17| | 4/20/10| MD|103| 9000| 5/8/17| +++---+--+-+ >>> res = spark.sql("select *, row_number() over (partition by id order by income_age_ts desc) r from t") >>> res.show() +++---+--+-+---+ |JoinDate|dept| id|income|income_age_ts| r| +++---+--+-+---+ | 4/20/10| EQ|103| 1| 5/9/17| 1| | 4/20/10| MD|103| 9000| 5/8/17| 2| | 4/20/13| ES|101| 19000| 4/20/17| 1| | 4/20/13| OS|101| 1| 10/3/15| 2| | 4/20/12| DS|102| 13000| 5/9/17| 1| | 4/20/12| CS|102| 12000| 5/8/17| 2| +++---+--+-+---+ >>> res = spark.sql("select * from (select *, row_number() over (partition by id order by income_age_ts desc) r from t) x where r=1") >>> res.show() +++---+--+-+---+ |JoinDate|dept| id|income|income_age_ts| r| +++---+--+-+---+ | 4/20/10| EQ|103| 1| 5/9/17| 1| | 4/20/13| ES|101| 19000| 4/20/17| 1| | 4/20/12| DS|102| 13000| 5/9/17| 1| +++---+--+-+---+ This should be better because it uses all in-built optimizations in Spark. Best Ayan On Wed, Aug 30, 2017 at 11:06 AM, purna pradeep < purna2prad...@gmail.com> wrote: > Please click on unnamed text/html link for better view > > On Tue, Aug 29, 2017 at 8:11 PM purna pradeep > wrote: > >> >> -- Forwarded message - >> From: Mamillapalli, Purna Pradeep < >> purnapradeep.mamillapa...@capitalone.com> >> Date: Tue, Aug 29, 2017 at 8:08 PM >> Subject: Spark question >> To: purna pradeep >> >> Below is the input Dataframe(In real this is a very large Dataframe) >> >> >> >> EmployeeID >> >> INCOME >> >> INCOME AGE TS >> >> JoinDate >> >> Dept >> >> 101 >> >> 19000 >> >> 4/20/17 >> >> 4/20/13 >> >> ES >> >> 101 >> >> 1 >> >> 10/3/15 >> >> 4/20/13 >> >> OS >> >> 102 >> >> 13000 >> >> 5/9/17 >> >> 4/20/12 >> >> DS >> >> 102 >> >> 12000 >> >> 5/8/17 >> >> 4/20/12 >> >> CS >> >> 103 >> >> 1 >> >> 5/9/17 >> >> 4/20/10 >> >> EQ >> >> 103 >> >> 9000 >> >> 5/8/15 >> >> 4/20/10 >> >> MD >> >> Get the latest income of an employee which has Income_age ts <10 >> months >> >> Expected output Dataframe >> >> EmployeeID >> >> INCOME >> >> INCOME AGE TS >> >> JoinDate >> >> Dept >> >> 101 >> >> 19000 >> >> 4/20/17 >> >> 4/20/13 >> >> ES >> >> 102 >> >> 13000 >> >> 5/9/17 >> >> 4/20/12 >> >> DS >> >> 103 >> >> 1 >> >> 5/9/17 >>
Re: Select entire row based on a logic applied on 2 columns across multiple rows
@Andres I need latest but it should less than 10 months based income_age column and don't want to use sql here On Wed, Aug 30, 2017 at 8:08 AM Andrés Ivaldiwrote: > Hi, if you need the last value from income in window function you can use > last_value. > No tested but meaby with @ayan sql > > spark.sql("select *, row_number(), last_value(income) over (partition by > id order by income_age_ts desc) r from t") > > > On Tue, Aug 29, 2017 at 11:30 PM, purna pradeep > wrote: > >> @ayan, >> >> Thanks for your response >> >> I would like to have functions in this case calculateIncome and the >> reason why I need function is to reuse in other parts of the application >> ..that's the reason I'm planning for mapgroups with function as argument >> which takes rowiterator ..but not sure if this is the best to implement as >> my initial dataframe is very large >> >> On Tue, Aug 29, 2017 at 10:24 PM ayan guha wrote: >> >>> Hi >>> >>> the tool you are looking for is window function. Example: >>> >>> >>> df.show() >>> +++---+--+-+ >>> |JoinDate|dept| id|income|income_age_ts| >>> +++---+--+-+ >>> | 4/20/13| ES|101| 19000| 4/20/17| >>> | 4/20/13| OS|101| 1| 10/3/15| >>> | 4/20/12| DS|102| 13000| 5/9/17| >>> | 4/20/12| CS|102| 12000| 5/8/17| >>> | 4/20/10| EQ|103| 1| 5/9/17| >>> | 4/20/10| MD|103| 9000| 5/8/17| >>> +++---+--+-+ >>> >>> >>> res = spark.sql("select *, row_number() over (partition by id order >>> by income_age_ts desc) r from t") >>> >>> res.show() >>> +++---+--+-+---+ >>> |JoinDate|dept| id|income|income_age_ts| r| >>> +++---+--+-+---+ >>> | 4/20/10| EQ|103| 1| 5/9/17| 1| >>> | 4/20/10| MD|103| 9000| 5/8/17| 2| >>> | 4/20/13| ES|101| 19000| 4/20/17| 1| >>> | 4/20/13| OS|101| 1| 10/3/15| 2| >>> | 4/20/12| DS|102| 13000| 5/9/17| 1| >>> | 4/20/12| CS|102| 12000| 5/8/17| 2| >>> +++---+--+-+---+ >>> >>> >>> res = spark.sql("select * from (select *, row_number() over >>> (partition by id order by income_age_ts desc) r from t) x where r=1") >>> >>> res.show() >>> +++---+--+-+---+ >>> |JoinDate|dept| id|income|income_age_ts| r| >>> +++---+--+-+---+ >>> | 4/20/10| EQ|103| 1| 5/9/17| 1| >>> | 4/20/13| ES|101| 19000| 4/20/17| 1| >>> | 4/20/12| DS|102| 13000| 5/9/17| 1| >>> +++---+--+-+---+ >>> >>> This should be better because it uses all in-built optimizations in >>> Spark. >>> >>> Best >>> Ayan >>> >>> On Wed, Aug 30, 2017 at 11:06 AM, purna pradeep >> > wrote: >>> Please click on unnamed text/html link for better view On Tue, Aug 29, 2017 at 8:11 PM purna pradeep wrote: > > -- Forwarded message - > From: Mamillapalli, Purna Pradeep < > purnapradeep.mamillapa...@capitalone.com> > Date: Tue, Aug 29, 2017 at 8:08 PM > Subject: Spark question > To: purna pradeep > > Below is the input Dataframe(In real this is a very large Dataframe) > > > > EmployeeID > > INCOME > > INCOME AGE TS > > JoinDate > > Dept > > 101 > > 19000 > > 4/20/17 > > 4/20/13 > > ES > > 101 > > 1 > > 10/3/15 > > 4/20/13 > > OS > > 102 > > 13000 > > 5/9/17 > > 4/20/12 > > DS > > 102 > > 12000 > > 5/8/17 > > 4/20/12 > > CS > > 103 > > 1 > > 5/9/17 > > 4/20/10 > > EQ > > 103 > > 9000 > > 5/8/15 > > 4/20/10 > > MD > > Get the latest income of an employee which has Income_age ts <10 > months > > Expected output Dataframe > > EmployeeID > > INCOME > > INCOME AGE TS > > JoinDate > > Dept > > 101 > > 19000 > > 4/20/17 > > 4/20/13 > > ES > > 102 > > 13000 > > 5/9/17 > > 4/20/12 > > DS > > 103 > > 1 > > 5/9/17 > > 4/20/10 > > EQ > > > Below is what im planning to implement > > > > case class employee (*EmployeeID*: Int, *INCOME*: Int, INCOMEAGE: > Int, *JOINDATE*: Int,DEPT:String) > > > > *val *empSchema = *new *StructType().add(*"EmployeeID"*,*"Int"*).add( > *"INCOME"*, *"Int"*).add(*"INCOMEAGE"*,*"Date"*) . add(*"JOINDATE"*, > *"Date"*). add(*"DEPT"*,*"String"*) > > > > *//Reading from
Re: Select entire row based on a logic applied on 2 columns across multiple rows
Hi, if you need the last value from income in window function you can use last_value. No tested but meaby with @ayan sql spark.sql("select *, row_number(), last_value(income) over (partition by id order by income_age_ts desc) r from t") On Tue, Aug 29, 2017 at 11:30 PM, purna pradeepwrote: > @ayan, > > Thanks for your response > > I would like to have functions in this case calculateIncome and the > reason why I need function is to reuse in other parts of the application > ..that's the reason I'm planning for mapgroups with function as argument > which takes rowiterator ..but not sure if this is the best to implement as > my initial dataframe is very large > > On Tue, Aug 29, 2017 at 10:24 PM ayan guha wrote: > >> Hi >> >> the tool you are looking for is window function. Example: >> >> >>> df.show() >> +++---+--+-+ >> |JoinDate|dept| id|income|income_age_ts| >> +++---+--+-+ >> | 4/20/13| ES|101| 19000| 4/20/17| >> | 4/20/13| OS|101| 1| 10/3/15| >> | 4/20/12| DS|102| 13000| 5/9/17| >> | 4/20/12| CS|102| 12000| 5/8/17| >> | 4/20/10| EQ|103| 1| 5/9/17| >> | 4/20/10| MD|103| 9000| 5/8/17| >> +++---+--+-+ >> >> >>> res = spark.sql("select *, row_number() over (partition by id order >> by income_age_ts desc) r from t") >> >>> res.show() >> +++---+--+-+---+ >> |JoinDate|dept| id|income|income_age_ts| r| >> +++---+--+-+---+ >> | 4/20/10| EQ|103| 1| 5/9/17| 1| >> | 4/20/10| MD|103| 9000| 5/8/17| 2| >> | 4/20/13| ES|101| 19000| 4/20/17| 1| >> | 4/20/13| OS|101| 1| 10/3/15| 2| >> | 4/20/12| DS|102| 13000| 5/9/17| 1| >> | 4/20/12| CS|102| 12000| 5/8/17| 2| >> +++---+--+-+---+ >> >> >>> res = spark.sql("select * from (select *, row_number() over >> (partition by id order by income_age_ts desc) r from t) x where r=1") >> >>> res.show() >> +++---+--+-+---+ >> |JoinDate|dept| id|income|income_age_ts| r| >> +++---+--+-+---+ >> | 4/20/10| EQ|103| 1| 5/9/17| 1| >> | 4/20/13| ES|101| 19000| 4/20/17| 1| >> | 4/20/12| DS|102| 13000| 5/9/17| 1| >> +++---+--+-+---+ >> >> This should be better because it uses all in-built optimizations in Spark. >> >> Best >> Ayan >> >> On Wed, Aug 30, 2017 at 11:06 AM, purna pradeep >> wrote: >> >>> Please click on unnamed text/html link for better view >>> >>> On Tue, Aug 29, 2017 at 8:11 PM purna pradeep >>> wrote: >>> -- Forwarded message - From: Mamillapalli, Purna Pradeep Date: Tue, Aug 29, 2017 at 8:08 PM Subject: Spark question To: purna pradeep Below is the input Dataframe(In real this is a very large Dataframe) EmployeeID INCOME INCOME AGE TS JoinDate Dept 101 19000 4/20/17 4/20/13 ES 101 1 10/3/15 4/20/13 OS 102 13000 5/9/17 4/20/12 DS 102 12000 5/8/17 4/20/12 CS 103 1 5/9/17 4/20/10 EQ 103 9000 5/8/15 4/20/10 MD Get the latest income of an employee which has Income_age ts <10 months Expected output Dataframe EmployeeID INCOME INCOME AGE TS JoinDate Dept 101 19000 4/20/17 4/20/13 ES 102 13000 5/9/17 4/20/12 DS 103 1 5/9/17 4/20/10 EQ >>> >>> >>> >>> >>> >>> Below is what im planning to implement case class employee (*EmployeeID*: Int, *INCOME*: Int, INCOMEAGE: Int, *JOINDATE*: Int,DEPT:String) *val *empSchema = *new *StructType().add(*"EmployeeID"*,*"Int"*).add( *"INCOME"*, *"Int"*).add(*"INCOMEAGE"*,*"Date"*) . add(*"JOINDATE"*, *"Date"*). add(*"DEPT"*,*"String"*) *//Reading from the File **import *sparkSession.implicits._ *val *readEmpFile = sparkSession.read .option(*"sep"*, *","*) .schema(empSchema) .csv(INPUT_DIRECTORY) *//Create employee DataFrame **val *custDf = readEmpFile.as[employee] *//Adding Salary Column **val *groupByDf = custDf.groupByKey(a => a.* EmployeeID*) *val *k = groupByDf.mapGroups((key,value) =>
Re: Select entire row based on a logic applied on 2 columns across multiple rows
@ayan, Thanks for your response I would like to have functions in this case calculateIncome and the reason why I need function is to reuse in other parts of the application ..that's the reason I'm planning for mapgroups with function as argument which takes rowiterator ..but not sure if this is the best to implement as my initial dataframe is very large On Tue, Aug 29, 2017 at 10:24 PM ayan guhawrote: > Hi > > the tool you are looking for is window function. Example: > > >>> df.show() > +++---+--+-+ > |JoinDate|dept| id|income|income_age_ts| > +++---+--+-+ > | 4/20/13| ES|101| 19000| 4/20/17| > | 4/20/13| OS|101| 1| 10/3/15| > | 4/20/12| DS|102| 13000| 5/9/17| > | 4/20/12| CS|102| 12000| 5/8/17| > | 4/20/10| EQ|103| 1| 5/9/17| > | 4/20/10| MD|103| 9000| 5/8/17| > +++---+--+-+ > > >>> res = spark.sql("select *, row_number() over (partition by id order by > income_age_ts desc) r from t") > >>> res.show() > +++---+--+-+---+ > |JoinDate|dept| id|income|income_age_ts| r| > +++---+--+-+---+ > | 4/20/10| EQ|103| 1| 5/9/17| 1| > | 4/20/10| MD|103| 9000| 5/8/17| 2| > | 4/20/13| ES|101| 19000| 4/20/17| 1| > | 4/20/13| OS|101| 1| 10/3/15| 2| > | 4/20/12| DS|102| 13000| 5/9/17| 1| > | 4/20/12| CS|102| 12000| 5/8/17| 2| > +++---+--+-+---+ > > >>> res = spark.sql("select * from (select *, row_number() over (partition > by id order by income_age_ts desc) r from t) x where r=1") > >>> res.show() > +++---+--+-+---+ > |JoinDate|dept| id|income|income_age_ts| r| > +++---+--+-+---+ > | 4/20/10| EQ|103| 1| 5/9/17| 1| > | 4/20/13| ES|101| 19000| 4/20/17| 1| > | 4/20/12| DS|102| 13000| 5/9/17| 1| > +++---+--+-+---+ > > This should be better because it uses all in-built optimizations in Spark. > > Best > Ayan > > On Wed, Aug 30, 2017 at 11:06 AM, purna pradeep > wrote: > >> Please click on unnamed text/html link for better view >> >> On Tue, Aug 29, 2017 at 8:11 PM purna pradeep >> wrote: >> >>> >>> -- Forwarded message - >>> From: Mamillapalli, Purna Pradeep < >>> purnapradeep.mamillapa...@capitalone.com> >>> Date: Tue, Aug 29, 2017 at 8:08 PM >>> Subject: Spark question >>> To: purna pradeep >>> >>> Below is the input Dataframe(In real this is a very large Dataframe) >>> >>> >>> >>> EmployeeID >>> >>> INCOME >>> >>> INCOME AGE TS >>> >>> JoinDate >>> >>> Dept >>> >>> 101 >>> >>> 19000 >>> >>> 4/20/17 >>> >>> 4/20/13 >>> >>> ES >>> >>> 101 >>> >>> 1 >>> >>> 10/3/15 >>> >>> 4/20/13 >>> >>> OS >>> >>> 102 >>> >>> 13000 >>> >>> 5/9/17 >>> >>> 4/20/12 >>> >>> DS >>> >>> 102 >>> >>> 12000 >>> >>> 5/8/17 >>> >>> 4/20/12 >>> >>> CS >>> >>> 103 >>> >>> 1 >>> >>> 5/9/17 >>> >>> 4/20/10 >>> >>> EQ >>> >>> 103 >>> >>> 9000 >>> >>> 5/8/15 >>> >>> 4/20/10 >>> >>> MD >>> >>> Get the latest income of an employee which has Income_age ts <10 months >>> >>> Expected output Dataframe >>> >>> EmployeeID >>> >>> INCOME >>> >>> INCOME AGE TS >>> >>> JoinDate >>> >>> Dept >>> >>> 101 >>> >>> 19000 >>> >>> 4/20/17 >>> >>> 4/20/13 >>> >>> ES >>> >>> 102 >>> >>> 13000 >>> >>> 5/9/17 >>> >>> 4/20/12 >>> >>> DS >>> >>> 103 >>> >>> 1 >>> >>> 5/9/17 >>> >>> 4/20/10 >>> >>> EQ >>> >>> >>> >> >> >> >> >> >> Below is what im planning to implement >>> >>> >>> >>> case class employee (*EmployeeID*: Int, *INCOME*: Int, INCOMEAGE: Int, >>> *JOINDATE*: Int,DEPT:String) >>> >>> >>> >>> *val *empSchema = *new *StructType().add(*"EmployeeID"*,*"Int"*).add( >>> *"INCOME"*, *"Int"*).add(*"INCOMEAGE"*,*"Date"*) . add(*"JOINDATE"*, >>> *"Date"*). add(*"DEPT"*,*"String"*) >>> >>> >>> >>> *//Reading from the File **import *sparkSession.implicits._ >>> >>> *val *readEmpFile = sparkSession.read >>> .option(*"sep"*, *","*) >>> .schema(empSchema) >>> .csv(INPUT_DIRECTORY) >>> >>> >>> *//Create employee DataFrame **val *custDf = readEmpFile.as[employee] >>> >>> >>> *//Adding Salary Column **val *groupByDf = custDf.groupByKey(a => a.* >>> EmployeeID*) >>> >>> >>> *val *k = groupByDf.mapGroups((key,value) => performETL(value)) >>> >>> >>> >>> >>> >>> *def *performETL(empData: Iterator[employee]) : new employee = { >>> >>> *val *empList = empData.toList >>> >>> >>> *//calculate income has Logic to figureout latest income for an account >>> and returns latest income val *income = calculateIncome(empList) >>> >>> >>> *for *(i <- empList) { >>> >>> *val *row = i >>> >>> *return new *employee(row.EmployeeID, row.INCOMEAGE , income) >>> } >>> *return "Done"* >>> >>> >>> >>> } >>> >>> >>> >>> Is this a better approach or even the right approach to
Re: Select entire row based on a logic applied on 2 columns across multiple rows
Hi the tool you are looking for is window function. Example: >>> df.show() +++---+--+-+ |JoinDate|dept| id|income|income_age_ts| +++---+--+-+ | 4/20/13| ES|101| 19000| 4/20/17| | 4/20/13| OS|101| 1| 10/3/15| | 4/20/12| DS|102| 13000| 5/9/17| | 4/20/12| CS|102| 12000| 5/8/17| | 4/20/10| EQ|103| 1| 5/9/17| | 4/20/10| MD|103| 9000| 5/8/17| +++---+--+-+ >>> res = spark.sql("select *, row_number() over (partition by id order by income_age_ts desc) r from t") >>> res.show() +++---+--+-+---+ |JoinDate|dept| id|income|income_age_ts| r| +++---+--+-+---+ | 4/20/10| EQ|103| 1| 5/9/17| 1| | 4/20/10| MD|103| 9000| 5/8/17| 2| | 4/20/13| ES|101| 19000| 4/20/17| 1| | 4/20/13| OS|101| 1| 10/3/15| 2| | 4/20/12| DS|102| 13000| 5/9/17| 1| | 4/20/12| CS|102| 12000| 5/8/17| 2| +++---+--+-+---+ >>> res = spark.sql("select * from (select *, row_number() over (partition by id order by income_age_ts desc) r from t) x where r=1") >>> res.show() +++---+--+-+---+ |JoinDate|dept| id|income|income_age_ts| r| +++---+--+-+---+ | 4/20/10| EQ|103| 1| 5/9/17| 1| | 4/20/13| ES|101| 19000| 4/20/17| 1| | 4/20/12| DS|102| 13000| 5/9/17| 1| +++---+--+-+---+ This should be better because it uses all in-built optimizations in Spark. Best Ayan On Wed, Aug 30, 2017 at 11:06 AM, purna pradeepwrote: > Please click on unnamed text/html link for better view > > On Tue, Aug 29, 2017 at 8:11 PM purna pradeep > wrote: > >> >> -- Forwarded message - >> From: Mamillapalli, Purna Pradeep > capitalone.com> >> Date: Tue, Aug 29, 2017 at 8:08 PM >> Subject: Spark question >> To: purna pradeep >> >> Below is the input Dataframe(In real this is a very large Dataframe) >> >> >> >> EmployeeID >> >> INCOME >> >> INCOME AGE TS >> >> JoinDate >> >> Dept >> >> 101 >> >> 19000 >> >> 4/20/17 >> >> 4/20/13 >> >> ES >> >> 101 >> >> 1 >> >> 10/3/15 >> >> 4/20/13 >> >> OS >> >> 102 >> >> 13000 >> >> 5/9/17 >> >> 4/20/12 >> >> DS >> >> 102 >> >> 12000 >> >> 5/8/17 >> >> 4/20/12 >> >> CS >> >> 103 >> >> 1 >> >> 5/9/17 >> >> 4/20/10 >> >> EQ >> >> 103 >> >> 9000 >> >> 5/8/15 >> >> 4/20/10 >> >> MD >> >> Get the latest income of an employee which has Income_age ts <10 months >> >> Expected output Dataframe >> >> EmployeeID >> >> INCOME >> >> INCOME AGE TS >> >> JoinDate >> >> Dept >> >> 101 >> >> 19000 >> >> 4/20/17 >> >> 4/20/13 >> >> ES >> >> 102 >> >> 13000 >> >> 5/9/17 >> >> 4/20/12 >> >> DS >> >> 103 >> >> 1 >> >> 5/9/17 >> >> 4/20/10 >> >> EQ >> >> >> > > > > > > Below is what im planning to implement >> >> >> >> case class employee (*EmployeeID*: Int, *INCOME*: Int, INCOMEAGE: Int, >> *JOINDATE*: Int,DEPT:String) >> >> >> >> *val *empSchema = *new *StructType().add(*"EmployeeID"*,*"Int"*).add( >> *"INCOME"*, *"Int"*).add(*"INCOMEAGE"*,*"Date"*) . add(*"JOINDATE"*, >> *"Date"*). add(*"DEPT"*,*"String"*) >> >> >> >> *//Reading from the File **import *sparkSession.implicits._ >> >> *val *readEmpFile = sparkSession.read >> .option(*"sep"*, *","*) >> .schema(empSchema) >> .csv(INPUT_DIRECTORY) >> >> >> *//Create employee DataFrame **val *custDf = readEmpFile.as[employee] >> >> >> *//Adding Salary Column **val *groupByDf = custDf.groupByKey(a => a.* >> EmployeeID*) >> >> >> *val *k = groupByDf.mapGroups((key,value) => performETL(value)) >> >> >> >> >> >> *def *performETL(empData: Iterator[employee]) : new employee = { >> >> *val *empList = empData.toList >> >> >> *//calculate income has Logic to figureout latest income for an account >> and returns latest income val *income = calculateIncome(empList) >> >> >> *for *(i <- empList) { >> >> *val *row = i >> >> *return new *employee(row.EmployeeID, row.INCOMEAGE , income) >> } >> *return "Done"* >> >> >> >> } >> >> >> >> Is this a better approach or even the right approach to implement the >> same.If not please suggest a better way to implement the same? >> >> >> >> -- >> >> The information contained in this e-mail is confidential and/or >> proprietary to Capital One and/or its affiliates and may only be used >> solely in performance of work or services for Capital One. The information >> transmitted herewith is intended only for use by the individual or entity >> to which it is addressed. If the reader of this message is not the intended >> recipient, you are hereby notified that any review, retransmission, >> dissemination, distribution, copying or other use of, or taking of any >> action in reliance upon this information is strictly prohibited. If you >>
Re: Select entire row based on a logic applied on 2 columns across multiple rows
Please click on unnamed text/html link for better view On Tue, Aug 29, 2017 at 8:11 PM purna pradeepwrote: > > -- Forwarded message - > From: Mamillapalli, Purna Pradeep < > purnapradeep.mamillapa...@capitalone.com> > Date: Tue, Aug 29, 2017 at 8:08 PM > Subject: Spark question > To: purna pradeep > > Below is the input Dataframe(In real this is a very large Dataframe) > > > > EmployeeID > > INCOME > > INCOME AGE TS > > JoinDate > > Dept > > 101 > > 19000 > > 4/20/17 > > 4/20/13 > > ES > > 101 > > 1 > > 10/3/15 > > 4/20/13 > > OS > > 102 > > 13000 > > 5/9/17 > > 4/20/12 > > DS > > 102 > > 12000 > > 5/8/17 > > 4/20/12 > > CS > > 103 > > 1 > > 5/9/17 > > 4/20/10 > > EQ > > 103 > > 9000 > > 5/8/15 > > 4/20/10 > > MD > > Get the latest income of an employee which has Income_age ts <10 months > > Expected output Dataframe > > EmployeeID > > INCOME > > INCOME AGE TS > > JoinDate > > Dept > > 101 > > 19000 > > 4/20/17 > > 4/20/13 > > ES > > 102 > > 13000 > > 5/9/17 > > 4/20/12 > > DS > > 103 > > 1 > > 5/9/17 > > 4/20/10 > > EQ > > > Below is what im planning to implement > > > > case class employee (*EmployeeID*: Int, *INCOME*: Int, INCOMEAGE: Int, > *JOINDATE*: Int,DEPT:String) > > > > *val *empSchema = *new *StructType().add(*"EmployeeID"*,*"Int"*).add( > *"INCOME"*, *"Int"*).add(*"INCOMEAGE"*,*"Date"*) . add(*"JOINDATE"*, > *"Date"*). add(*"DEPT"*,*"String"*) > > > > *//Reading from the File **import *sparkSession.implicits._ > > *val *readEmpFile = sparkSession.read > .option(*"sep"*, *","*) > .schema(empSchema) > .csv(INPUT_DIRECTORY) > > > *//Create employee DataFrame **val *custDf = readEmpFile.as[employee] > > > *//Adding Salary Column **val *groupByDf = custDf.groupByKey(a => a.* > EmployeeID*) > > > *val *k = groupByDf.mapGroups((key,value) => performETL(value)) > > > > > > *def *performETL(empData: Iterator[employee]) : new employee = { > > *val *empList = empData.toList > > > *//calculate income has Logic to figureout latest income for an account > and returns latest income val *income = calculateIncome(empList) > > > *for *(i <- empList) { > > *val *row = i > > *return new *employee(row.EmployeeID, row.INCOMEAGE , income) > } > *return "Done"* > > > > } > > > > Is this a better approach or even the right approach to implement the > same.If not please suggest a better way to implement the same? > > > > -- > > The information contained in this e-mail is confidential and/or > proprietary to Capital One and/or its affiliates and may only be used > solely in performance of work or services for Capital One. The information > transmitted herewith is intended only for use by the individual or entity > to which it is addressed. If the reader of this message is not the intended > recipient, you are hereby notified that any review, retransmission, > dissemination, distribution, copying or other use of, or taking of any > action in reliance upon this information is strictly prohibited. If you > have received this communication in error, please contact the sender and > delete the material from your computer. >
Select entire row based on a logic applied on 2 columns across multiple rows
-- Forwarded message - From: Mamillapalli, Purna PradeepDate: Tue, Aug 29, 2017 at 8:08 PM Subject: Spark question To: purna pradeep Below is the input Dataframe(In real this is a very large Dataframe) EmployeeID INCOME INCOME AGE TS JoinDate Dept 101 19000 4/20/17 4/20/13 ES 101 1 10/3/15 4/20/13 OS 102 13000 5/9/17 4/20/12 DS 102 12000 5/8/17 4/20/12 CS 103 1 5/9/17 4/20/10 EQ 103 9000 5/8/15 4/20/10 MD Get the latest income of an employee which has Income_age ts <10 months Expected output Dataframe EmployeeID INCOME INCOME AGE TS JoinDate Dept 101 19000 4/20/17 4/20/13 ES 102 13000 5/9/17 4/20/12 DS 103 1 5/9/17 4/20/10 EQ Below is what im planning to implement case class employee (*EmployeeID*: Int, *INCOME*: Int, INCOMEAGE: Int, *JOINDATE*: Int,DEPT:String) *val *empSchema = *new *StructType().add(*"EmployeeID"*,*"Int"*).add( *"INCOME"*, *"Int"*).add(*"INCOMEAGE"*,*"Date"*) . add(*"JOINDATE"*,*"Date"*). add(*"DEPT"*,*"String"*) *//Reading from the File **import *sparkSession.implicits._ *val *readEmpFile = sparkSession.read .option(*"sep"*, *","*) .schema(empSchema) .csv(INPUT_DIRECTORY) *//Create employee DataFrame **val *custDf = readEmpFile.as[employee] *//Adding Salary Column **val *groupByDf = custDf.groupByKey(a => a.* EmployeeID*) *val *k = groupByDf.mapGroups((key,value) => performETL(value)) *def *performETL(empData: Iterator[employee]) : new employee = { *val *empList = empData.toList *//calculate income has Logic to figureout latest income for an account and returns latest income val *income = calculateIncome(empList) *for *(i <- empList) { *val *row = i *return new *employee(row.EmployeeID, row.INCOMEAGE , income) } *return "Done"* } Is this a better approach or even the right approach to implement the same.If not please suggest a better way to implement the same? -- The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates and may only be used solely in performance of work or services for Capital One. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer.