Re: Subqueries
This sounds like a perfect example of using windowing functions. Have you tried something like the following: select ACCT_ID, CR_RVKD_STAT_CD, ACCT_SFX_NUM, SCURT_FRD_STAT_CD, CLSD_REAS_CD from (select *, max(instnc_id) *over ()* as max_inst_id FROM Stat_hist) where instnc_id=max_inst_id However, I have seen instances where window functions without partitioning clauses will cause all partitions to be executed on one task (and spark usually warns about this condition) and this will be very slow. It might actually be more performant to use the inner join which, even though it is scanning through the raw data twice, is more parallelizable. If you have your data stored in a columnar compressed data format like parquet or orc, the query on the right side of the join should only have a single column, so I/O on that column would be significantly less than the full table; you might even be able to squeeze some more performance out of it (depending on the size of the table), by caching it beforehand. Nicholas Szandor Hakobian, Ph.D. Staff Data Scientist Rally Health nicholas.hakob...@rallyhealth.com On Fri, Dec 29, 2017 at 1:02 PM, Lalwani, Jayesh < jayesh.lalw...@capitalone.com> wrote: > I have a table, and I want to find the latest records in the table. The > table has a column called instnc_id that is incremented everyday. So, I > want to find the records that have the max instnc_id. > > > > I am trying to do this using subqueries, but it gives me an error. For > example, when I try this > > > > select ACCT_ID, CR_RVKD_STAT_CD, ACCT_SFX_NUM, SCURT_FRD_STAT_CD, > CLSD_REAS_CD from (select *, max(instnc_id) as max_inst_id FROM Stat_hist) > where instnc_id=max_inst_id > > > > the error I get is > > > > Caused by: org.apache.spark.sql.AnalysisException: cannot resolve > '`max_inst_id`' given input columns: [CR_RVKD_STAT_CD, ACCT_SFX_NUM, > CLSD_REAS_CD, ACCT_ID, instnc_id, SCURT_FRD_STAT_CD]; line 1 pos 172; > > 'Project ['ACCT_ID, 'CR_RVKD_STAT_CD, 'ACCT_SFX_NUM, 'SCURT_FRD_STAT_CD, > CLSD_REAS_CD, scalar-subquery#298 [] AS max_inst_id#299] > > : +- 'Project [unresolvedalias('max('instnc_id), None)] > > : +- 'UnresolvedRelation `Stat_hist` > > +- 'Filter (instnc_id#92 = 'max_inst_id) > >+- SubqueryAlias stat_hist > > +- Project [ACCT_ID#0, ACCT_SFX_NUM#1, CR_RVKD_STAT_CD#23, > SCURT_FRD_STAT_CD#34, CLSD_REAS_CD#19, instnc_id#92] > > > > I have tried various combinations but I keep getting into the same > problem: It doesn’t recognize max_inst_id as a column. > > > > The only thing that works is if I get max_inst_id in a dataframe and then > inner join it with the original table > > -- > > The information contained in this e-mail is confidential and/or > proprietary to Capital One and/or its affiliates and may only be used > solely in performance of work or services for Capital One. The information > transmitted herewith is intended only for use by the individual or entity > to which it is addressed. If the reader of this message is not the intended > recipient, you are hereby notified that any review, retransmission, > dissemination, distribution, copying or other use of, or taking of any > action in reliance upon this information is strictly prohibited. If you > have received this communication in error, please contact the sender and > delete the material from your computer. >
Re: [Structured Streaming] Reuse computation result
There is no way to solve this within spark. One option you could do is break up your application into multiple application. First application can filter and write the filtered results into a kafka queue. Second application can read from queue and sum. Third application can read from queue and do count. From: Shu Li ZhengDate: Tuesday, December 26, 2017 at 5:32 AM To: "user@spark.apache.org" Subject: [Structured Streaming] Reuse computation result Hi all, I have a scenario like this: val df = dataframe.map().filter() // agg 1 val query1 = df.sum.writeStream.start // agg 2 val query2 = df.count.writeStream.start With spark streaming, we can apply persist() on rdd to reuse the df computation result, when we call persist() after filter() map().filter() operator only run once. With SS, we can’t apply persist() direct on dataframe. query1 and query2 will not reuse result after filter. map/filter run twice. So is there a way to solve this. Regards, Shu li Zheng The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates and may only be used solely in performance of work or services for Capital One. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer.
Subqueries
I have a table, and I want to find the latest records in the table. The table has a column called instnc_id that is incremented everyday. So, I want to find the records that have the max instnc_id. I am trying to do this using subqueries, but it gives me an error. For example, when I try this select ACCT_ID, CR_RVKD_STAT_CD, ACCT_SFX_NUM, SCURT_FRD_STAT_CD, CLSD_REAS_CD from (select *, max(instnc_id) as max_inst_id FROM Stat_hist) where instnc_id=max_inst_id the error I get is Caused by: org.apache.spark.sql.AnalysisException: cannot resolve '`max_inst_id`' given input columns: [CR_RVKD_STAT_CD, ACCT_SFX_NUM, CLSD_REAS_CD, ACCT_ID, instnc_id, SCURT_FRD_STAT_CD]; line 1 pos 172; 'Project ['ACCT_ID, 'CR_RVKD_STAT_CD, 'ACCT_SFX_NUM, 'SCURT_FRD_STAT_CD, CLSD_REAS_CD, scalar-subquery#298 [] AS max_inst_id#299] : +- 'Project [unresolvedalias('max('instnc_id), None)] : +- 'UnresolvedRelation `Stat_hist` +- 'Filter (instnc_id#92 = 'max_inst_id) +- SubqueryAlias stat_hist +- Project [ACCT_ID#0, ACCT_SFX_NUM#1, CR_RVKD_STAT_CD#23, SCURT_FRD_STAT_CD#34, CLSD_REAS_CD#19, instnc_id#92] I have tried various combinations but I keep getting into the same problem: It doesn’t recognize max_inst_id as a column. The only thing that works is if I get max_inst_id in a dataframe and then inner join it with the original table The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates and may only be used solely in performance of work or services for Capital One. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer.
Re: Spark on EMR suddenly stalling
you may have to recreate your cluster with below configuration at emr creation "Configurations": [ { "Properties": { "maximizeResourceAllocation": "false" }, "Classification": "spark" } ] On Fri, Dec 29, 2017 at 11:57 PM, Jeroen Millerwrote: > On 28 Dec 2017, at 19:25, Patrick Alwell wrote: > > Dynamic allocation is great; but sometimes I’ve found explicitly setting > the num executors, cores per executor, and memory per executor to be a > better alternative. > > No difference with spark.dynamicAllocation.enabled set to false. > > JM > > > - > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > >
Re: Spark on EMR suddenly stalling
On 28 Dec 2017, at 19:25, Patrick Alwellwrote: > Dynamic allocation is great; but sometimes I’ve found explicitly setting the > num executors, cores per executor, and memory per executor to be a better > alternative. No difference with spark.dynamicAllocation.enabled set to false. JM - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Fwd: Spark on EMR suddenly stalling
Hello, Just a quick update as I did not made much progress yet. On 28 Dec 2017, at 21:09, Gourav Senguptawrote: > can you try to then use the EMR version 5.10 instead or EMR version 5.11 > instead? Same issue with EMR 5.11.0. Task 0 in one stage never finishes. > can you please try selecting a subnet which is in a different availability > zone? I did not try this yet. But why should that make a difference? > if possible just try to increase the number of task instances and see the > difference? I tried with 512 partitions -- no difference. > also in case you are using caching, No caching used. > Also can you please report the number of containers that your job is creating > by looking at the metrics in the EMR console? 8 containers if I trust the directories in j-xxx/containers/application_xxx/. > Also if you see the spark UI then you can easily see which particular step is > taking the longest period of time - you just have to drill in a bit in order > to see that. Generally in case shuffling is an issue then it definitely > appears in the SPARK UI as I drill into the steps and see which particular > one is taking the longest. I always have issues with the Spark UI on EC2 -- it never seems to be up to date. JM - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Custom line/record delimiter
Hi, Do we have an option to write a csv or text file with a custom record/line separator through spark ? I could not find any ref on the api. I have a issue while loading data into a warehouse as one of the column on csv have a new line character and the warehouse is not letting to escape that new line character . Thank you , Sk