Hi here is a quick setup (Based on airlines.txt dataset):
-------------------------------------------------------------------------------------------------------------------------------------------------------------- *from datetime import datetime, timedelta* *from pyspark.sql.types import ** *from pyspark.sql.functions import udf, col,rank,min* *from pyspark import SparkContext, HiveContext* *import sys* *from pyspark.sql import Window* *sc = SparkContext()* *hc = HiveContext(sc)* *customSchema = StructType([ \* *StructField("airport_id", IntegerType(), True) , \* *StructField("name", StringType(), True) , \* *StructField("city", StringType(), True) , \* *StructField("country", StringType(), True) , \* *StructField("iata", StringType(), True) , \* *StructField("icao", StringType(), True) , \* *StructField("latitude", DecimalType(precision=20,scale=10), True) , \* *StructField("longitude",DecimalType(precision=20,scale=10), True) , \* *StructField("altitude", IntegerType(), True) , \* *StructField("timezone", DoubleType(), True) , \* *StructField("dst", StringType(), True) , \* *StructField("tz_name", StringType(), True)* *])* *inFile = sys.argv[1]* *df1 = df = hc.read.format('com.databricks.spark.csv').options(header='false', inferschema='true').load(inFile,schema=customSchema)* *df1.registerTempTable("airlines")* *df2 = hc.sql("select airport_id,altitude,r from (select *,rank() over (order by altitude desc) r from airlines where altitude>100) rs where r=1")* *print df2.take(10)* *w = Window.orderBy(df['altitude'].desc())* *df3 = df1.filter(df1.altitude > 100).select(df1.airport_id,df1.altitude,rank().over(w).alias("r")).filter("r=1")* *print df3.take(10)* *sc.stop()* -------------------------------------------------------------------------------------------------------------------------------------------------------------- Here <https://databricks.com/blog/2015/07/15/introducing-window-functions-in-spark-sql.html>is an awesome blog from Databricks. HTH.... Ayan On Sun, Jul 31, 2016 at 8:58 PM, Mich Talebzadeh <mich.talebza...@gmail.com> wrote: > It is true that whatever an analytic function does can be done by standard > SQL, with join and sub-queries. But the same routine done by analytic > function is always faster, or at least as fast, when compared to standard > SQL. > > I will try to see if I can do analytic functions with Spark FP on Data > Frames. It is essentially replacing the base table with DF and using JAVA > functions instead of SQL ones on top > > Also some text based search functions say LIKE in SQL can be replaced with > CONTAINS in FP. > > Thanks > > Dr Mich Talebzadeh > > > > LinkedIn * > https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw > <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* > > > > http://talebzadehmich.wordpress.com > > > *Disclaimer:* Use it at your own risk. Any and all responsibility for any > loss, damage or destruction of data or any other property which may arise > from relying on this email's technical content is explicitly disclaimed. > The author will in no case be liable for any monetary damages arising from > such loss, damage or destruction. > > > > On 31 July 2016 at 10:56, ayan guha <guha.a...@gmail.com> wrote: > >> The point is, window functions are supposed designed to be faster by >> doing the calculations in one pass, instead of 2 pass in case of max. >> >> DF supports window functions (using sql.Window) so instead of writing >> sql, you can use it as well. >> >> Best >> Ayan >> >> On Sun, Jul 31, 2016 at 7:48 PM, Mich Talebzadeh < >> mich.talebza...@gmail.com> wrote: >> >>> yes reserved word issue thanks >>> >>> hive> select * >>> > from (select transactiondate, transactiondescription, debitamount >>> > , rank() over (order by transactiondate desc) r >>> > from accounts.ll_18740868 where transactiondescription like >>> '%HARRODS%' >>> > ) RS >>> > where r=1 >>> > ; >>> Query ID = hduser_20160731104724_f8e5f426-770a-49fc-a4a5-f0f645c06e8c >>> Total jobs = 1 >>> Launching Job 1 out of 1 >>> In order to change the average load for a reducer (in bytes): >>> set hive.exec.reducers.bytes.per.reducer=<number> >>> In order to limit the maximum number of reducers: >>> set hive.exec.reducers.max=<number> >>> In order to set a constant number of reducers: >>> set mapreduce.job.reduces=<number> >>> Starting Spark Job = 7727d5df-ccf9-4f98-8563-1cdec2634d99 >>> Query Hive on Spark job[0] stages: >>> 0 >>> 1 >>> Status: Running (Hive on Spark job[0]) >>> Job Progress Format >>> CurrentTime StageId_StageAttemptId: >>> SucceededTasksCount(+RunningTasksCount-FailedTasksCount)/TotalTasksCount >>> [StageCost] >>> 2016-07-31 10:48:28,726 Stage-0_0: 0/1 Stage-1_0: 0/1 >>> 2016-07-31 10:48:31,750 Stage-0_0: 0/1 Stage-1_0: 0/1 >>> 2016-07-31 10:48:32,758 Stage-0_0: 0(+1)/1 Stage-1_0: 0/1 >>> 2016-07-31 10:48:34,772 Stage-0_0: 1/1 Finished Stage-1_0: 0(+1)/1 >>> 2016-07-31 10:48:35,780 Stage-0_0: 1/1 Finished Stage-1_0: 1/1 Finished >>> Status: Finished successfully in 10.10 seconds >>> OK >>> 2015-12-15 HARRODS LTD CD 4636 10.95 1 >>> Time taken: 46.546 seconds, Fetched: 1 row(s) >>> >>> Dr Mich Talebzadeh >>> >>> >>> >>> LinkedIn * >>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw >>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* >>> >>> >>> >>> http://talebzadehmich.wordpress.com >>> >>> >>> *Disclaimer:* Use it at your own risk. Any and all responsibility for >>> any loss, damage or destruction of data or any other property which may >>> arise from relying on this email's technical content is explicitly >>> disclaimed. The author will in no case be liable for any monetary damages >>> arising from such loss, damage or destruction. >>> >>> >>> >>> On 31 July 2016 at 10:36, ayan guha <guha.a...@gmail.com> wrote: >>> >>>> I think the word "INNER" is reserved in Hive. Please change the alias >>>> to something else. >>>> >>>> Not sure about scala, but essentially it is string replacement. >>>> >>>> On Sun, Jul 31, 2016 at 7:27 PM, Mich Talebzadeh < >>>> mich.talebza...@gmail.com> wrote: >>>> >>>>> thanks how about scala? >>>>> >>>>> BTW the same analytic code fails in Hive itself:( >>>>> >>>>> hive> select * >>>>> > from (select transactiondate, transactiondescription, debitamount >>>>> > from (select transactiondate, transactiondescription, debitamount >>>>> > , rank() over (order by transactiondate desc) r >>>>> > from ll_18740868 where transactiondescription like '%XYZ%' >>>>> > ) inner >>>>> > where r=1 >>>>> > ; >>>>> >>>>> FailedPredicateException(identifier,{useSQL11ReservedKeywordsForIdentifier()}?) >>>>> at >>>>> org.apache.hadoop.hive.ql.parse.HiveParser_IdentifiersParser.identifier(HiveParser_IdentifiersParser.java:11833) >>>>> at >>>>> org.apache.hadoop.hive.ql.parse.HiveParser.identifier(HiveParser.java:47987) >>>>> at >>>>> org.apache.hadoop.hive.ql.parse.HiveParser_FromClauseParser.subQuerySource(HiveParser_FromClauseParser.java:5520) >>>>> at >>>>> org.apache.hadoop.hive.ql.parse.HiveParser_FromClauseParser.fromSource0(HiveParser_FromClauseParser.java:3918) >>>>> at >>>>> org.apache.hadoop.hive.ql.parse.HiveParser_FromClauseParser.fromSource(HiveParser_FromClauseParser.java:3818) >>>>> at >>>>> org.apache.hadoop.hive.ql.parse.HiveParser_FromClauseParser.joinSource(HiveParser_FromClauseParser.java:1909) >>>>> at >>>>> org.apache.hadoop.hive.ql.parse.HiveParser_FromClauseParser.fromClause(HiveParser_FromClauseParser.java:1546) >>>>> at >>>>> org.apache.hadoop.hive.ql.parse.HiveParser.fromClause(HiveParser.java:48001) >>>>> at >>>>> org.apache.hadoop.hive.ql.parse.HiveParser.selectStatement(HiveParser.java:42252) >>>>> at >>>>> org.apache.hadoop.hive.ql.parse.HiveParser.regularBody(HiveParser.java:42138) >>>>> at >>>>> org.apache.hadoop.hive.ql.parse.HiveParser.queryStatementExpressionBody(HiveParser.java:41154) >>>>> at >>>>> org.apache.hadoop.hive.ql.parse.HiveParser.queryStatementExpression(HiveParser.java:41024) >>>>> at >>>>> org.apache.hadoop.hive.ql.parse.HiveParser_FromClauseParser.subQuerySource(HiveParser_FromClauseParser.java:5492) >>>>> at >>>>> org.apache.hadoop.hive.ql.parse.HiveParser_FromClauseParser.fromSource0(HiveParser_FromClauseParser.java:3918) >>>>> at >>>>> org.apache.hadoop.hive.ql.parse.HiveParser_FromClauseParser.fromSource(HiveParser_FromClauseParser.java:3818) >>>>> at >>>>> org.apache.hadoop.hive.ql.parse.HiveParser_FromClauseParser.joinSource(HiveParser_FromClauseParser.java:1909) >>>>> at >>>>> org.apache.hadoop.hive.ql.parse.HiveParser_FromClauseParser.fromClause(HiveParser_FromClauseParser.java:1546) >>>>> at >>>>> org.apache.hadoop.hive.ql.parse.HiveParser.fromClause(HiveParser.java:48001) >>>>> at >>>>> org.apache.hadoop.hive.ql.parse.HiveParser.selectStatement(HiveParser.java:42252) >>>>> at >>>>> org.apache.hadoop.hive.ql.parse.HiveParser.regularBody(HiveParser.java:42138) >>>>> at >>>>> org.apache.hadoop.hive.ql.parse.HiveParser.queryStatementExpressionBody(HiveParser.java:41154) >>>>> at >>>>> org.apache.hadoop.hive.ql.parse.HiveParser.queryStatementExpression(HiveParser.java:41024) >>>>> at >>>>> org.apache.hadoop.hive.ql.parse.HiveParser.execStatement(HiveParser.java:1653) >>>>> at >>>>> org.apache.hadoop.hive.ql.parse.HiveParser.statement(HiveParser.java:1137) >>>>> at >>>>> org.apache.hadoop.hive.ql.parse.ParseDriver.parse(ParseDriver.java:204) >>>>> at >>>>> org.apache.hadoop.hive.ql.parse.ParseDriver.parse(ParseDriver.java:166) >>>>> at org.apache.hadoop.hive.ql.Driver.compile(Driver.java:446) >>>>> at org.apache.hadoop.hive.ql.Driver.compile(Driver.java:319) >>>>> at >>>>> org.apache.hadoop.hive.ql.Driver.compileInternal(Driver.java:1255) >>>>> at >>>>> org.apache.hadoop.hive.ql.Driver.runInternal(Driver.java:1301) >>>>> at org.apache.hadoop.hive.ql.Driver.run(Driver.java:1184) >>>>> at org.apache.hadoop.hive.ql.Driver.run(Driver.java:1172) >>>>> at >>>>> org.apache.hadoop.hive.cli.CliDriver.processLocalCmd(CliDriver.java:233) >>>>> at >>>>> org.apache.hadoop.hive.cli.CliDriver.processCmd(CliDriver.java:184) >>>>> at >>>>> org.apache.hadoop.hive.cli.CliDriver.processLine(CliDriver.java:400) >>>>> at >>>>> org.apache.hadoop.hive.cli.CliDriver.executeDriver(CliDriver.java:778) >>>>> at org.apache.hadoop.hive.cli.CliDriver.run(CliDriver.java:717) >>>>> at >>>>> org.apache.hadoop.hive.cli.CliDriver.main(CliDriver.java:645) >>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >>>>> at >>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >>>>> at >>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>>>> at java.lang.reflect.Method.invoke(Method.java:498) >>>>> at org.apache.hadoop.util.RunJar.run(RunJar.java:221) >>>>> at org.apache.hadoop.util.RunJar.main(RunJar.java:136) >>>>> *FAILED: ParseException line 6:7 Failed to recognize predicate >>>>> 'inner'. Failed rule: 'identifier' in subquery source* >>>>> >>>>> >>>>> Dr Mich Talebzadeh >>>>> >>>>> >>>>> >>>>> LinkedIn * >>>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw >>>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* >>>>> >>>>> >>>>> >>>>> http://talebzadehmich.wordpress.com >>>>> >>>>> >>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for >>>>> any loss, damage or destruction of data or any other property which may >>>>> arise from relying on this email's technical content is explicitly >>>>> disclaimed. The author will in no case be liable for any monetary damages >>>>> arising from such loss, damage or destruction. >>>>> >>>>> >>>>> >>>>> On 31 July 2016 at 10:21, ayan guha <guha.a...@gmail.com> wrote: >>>>> >>>>>> Hi >>>>>> >>>>>> This is because Spark does not provide a way to "bind" variables >>>>>> like Oracle does. >>>>>> >>>>>> So you can build the sql string, like below (in python) >>>>>> >>>>>> val = 'XYZ' >>>>>> sqlbase = "select ..... where col = '<val>'".replace('<val>,val) >>>>>> >>>>>> >>>>>> >>>>>> On Sun, Jul 31, 2016 at 6:25 PM, Mich Talebzadeh < >>>>>> mich.talebza...@gmail.com> wrote: >>>>>> >>>>>>> Thanks Ayan. >>>>>>> >>>>>>> This is the one I used >>>>>>> >>>>>>> scala> sqltext = """ >>>>>>> | select * >>>>>>> | from (select transactiondate, transactiondescription, >>>>>>> debitamount >>>>>>> | , rank() over (order by transactiondate desc) r >>>>>>> | from ll_18740868 where transactiondescription like '%XYZ%' >>>>>>> | ) inner >>>>>>> | where r=1 >>>>>>> | """ >>>>>>> >>>>>>> scala> HiveContext.sql(sqltext).show >>>>>>> +---------------+----------------------+-----------+---+ >>>>>>> |transactiondate|transactiondescription|debitamount| r| >>>>>>> +---------------+----------------------+-----------+---+ >>>>>>> | 2015-12-15| XYZ LTD CD 4636 | 10.95| 1| >>>>>>> +---------------+----------------------+-----------+---+ >>>>>>> >>>>>>> The issue I see is that in SQL here I cannot pass HASHTAG as a >>>>>>> variable to SQL. For example in RDBMS I can do this >>>>>>> >>>>>>> 1> declare @pattern varchar(50) >>>>>>> 2> set @pattern = 'Direct' >>>>>>> 3> select CHANNEL_DESC from CHANNELS where CHANNEL_DESC like >>>>>>> '%'||@pattern||'%' >>>>>>> 4> go >>>>>>> (1 row affected) >>>>>>> CHANNEL_DESC >>>>>>> -------------------- >>>>>>> Direct Sales >>>>>>> >>>>>>> but not in Hive or Spark SQL >>>>>>> >>>>>>> whereas with FP it does it implicitly. >>>>>>> >>>>>>> col("CHANNELS").contains(HASHTAG)) >>>>>>> >>>>>>> Unless there is a way of doing it? >>>>>>> >>>>>>> Thanks >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> Dr Mich Talebzadeh >>>>>>> >>>>>>> >>>>>>> >>>>>>> LinkedIn * >>>>>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw >>>>>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* >>>>>>> >>>>>>> >>>>>>> >>>>>>> http://talebzadehmich.wordpress.com >>>>>>> >>>>>>> >>>>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility >>>>>>> for any loss, damage or destruction of data or any other property which >>>>>>> may >>>>>>> arise from relying on this email's technical content is explicitly >>>>>>> disclaimed. The author will in no case be liable for any monetary >>>>>>> damages >>>>>>> arising from such loss, damage or destruction. >>>>>>> >>>>>>> >>>>>>> >>>>>>> On 31 July 2016 at 01:20, ayan guha <guha.a...@gmail.com> wrote: >>>>>>> >>>>>>>> select * >>>>>>>> from (select *, >>>>>>>> rank() over (order by transactiondate) r >>>>>>>> from ll_18740868 where transactiondescription='XYZ' >>>>>>>> ) inner >>>>>>>> where r=1 >>>>>>>> >>>>>>>> Hi Mitch, >>>>>>>> >>>>>>>> If using SQL is fine, you can try the code above. You need to >>>>>>>> register ll_18740868 as temp table. >>>>>>>> >>>>>>>> On Sun, Jul 31, 2016 at 6:49 AM, Mich Talebzadeh < >>>>>>>> mich.talebza...@gmail.com> wrote: >>>>>>>> >>>>>>>>> >>>>>>>>> Hi, >>>>>>>>> >>>>>>>>> I would like to find out when it was the last time I paid a >>>>>>>>> company with Debit Card >>>>>>>>> >>>>>>>>> >>>>>>>>> This is the way I do it. >>>>>>>>> >>>>>>>>> 1) Find the date when I paid last >>>>>>>>> 2) Find the rest of details from the row(s) >>>>>>>>> >>>>>>>>> So >>>>>>>>> >>>>>>>>> var HASHTAG = "XYZ" >>>>>>>>> scala> var maxdate = >>>>>>>>> ll_18740868.filter(col("transactiondescription").contains(HASHTAG)).agg(max("transactiondate")).collect.apply(0) >>>>>>>>> maxdate: org.apache.spark.sql.Row = [2015-12-15] >>>>>>>>> >>>>>>>>> OK so it was 2015-12-15 >>>>>>>>> >>>>>>>>> >>>>>>>>> Now I want to get the rest of the columns. This one works when I >>>>>>>>> hard code the maxdate! >>>>>>>>> >>>>>>>>> >>>>>>>>> scala> >>>>>>>>> ll_18740868.filter(col("transactiondescription").contains(HASHTAG) >>>>>>>>> && col("transactiondate") === "2015-12-15").select("transactiondate", >>>>>>>>> "transactiondescription", "debitamount").show >>>>>>>>> +---------------+----------------------+-----------+ >>>>>>>>> |transactiondate|transactiondescription|debitamount| >>>>>>>>> +---------------+----------------------+-----------+ >>>>>>>>> | 2015-12-15| XYZ LTD CD 4636 | 10.95| >>>>>>>>> +---------------+----------------------+-----------+ >>>>>>>>> >>>>>>>>> Now if I want to use the var maxdate in place of "2015-12-15", how >>>>>>>>> would I do that? >>>>>>>>> >>>>>>>>> I tried lit(maxdate) etc but they are all giving me error? >>>>>>>>> >>>>>>>>> java.lang.RuntimeException: Unsupported literal type class >>>>>>>>> org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema >>>>>>>>> [2015-12-15] >>>>>>>>> >>>>>>>>> >>>>>>>>> Thanks >>>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> -- >>>>>>>> Best Regards, >>>>>>>> Ayan Guha >>>>>>>> >>>>>>> >>>>>>> >>>>>> >>>>>> >>>>>> -- >>>>>> Best Regards, >>>>>> Ayan Guha >>>>>> >>>>> >>>>> >>>> >>>> >>>> -- >>>> Best Regards, >>>> Ayan Guha >>>> >>> >>> >> >> >> -- >> Best Regards, >> Ayan Guha >> > > -- Best Regards, Ayan Guha