Re: Subqueries

2017-12-29 Thread Nicholas Hakobian
This sounds like a perfect example of using windowing functions. Have you
tried something like the following:

select ACCT_ID, CR_RVKD_STAT_CD, ACCT_SFX_NUM, SCURT_FRD_STAT_CD,
CLSD_REAS_CD from (select *, max(instnc_id) *over ()* as max_inst_id FROM
Stat_hist) where instnc_id=max_inst_id

However, I have seen instances where window functions without partitioning
clauses will cause all partitions to be executed on one task (and spark
usually warns about this condition) and this will be very slow. It might
actually be more performant to use the inner join which, even though it is
scanning through the raw data twice, is more parallelizable.

If you have your data stored in a columnar compressed data format like
parquet or orc, the query on the right side of the join should only have a
single column, so I/O on that column would be significantly less than the
full table; you might even be able to squeeze some more performance out of
it (depending on the size of the table), by caching it beforehand.

Nicholas Szandor Hakobian, Ph.D.
Staff Data Scientist
Rally Health
nicholas.hakob...@rallyhealth.com

On Fri, Dec 29, 2017 at 1:02 PM, Lalwani, Jayesh <
jayesh.lalw...@capitalone.com> wrote:

> I have a table, and I want to find the latest records in the table. The
> table has a column called instnc_id that is incremented everyday. So, I
> want to find the records that have the max instnc_id.
>
>
>
> I am trying to do this using subqueries, but it gives me an error. For
> example, when I try this
>
>
>
> select ACCT_ID, CR_RVKD_STAT_CD, ACCT_SFX_NUM, SCURT_FRD_STAT_CD,
> CLSD_REAS_CD from (select *, max(instnc_id) as max_inst_id FROM Stat_hist)
> where instnc_id=max_inst_id
>
>
>
> the error I get is
>
>
>
> Caused by: org.apache.spark.sql.AnalysisException: cannot resolve
> '`max_inst_id`' given input columns: [CR_RVKD_STAT_CD, ACCT_SFX_NUM,
> CLSD_REAS_CD, ACCT_ID, instnc_id, SCURT_FRD_STAT_CD]; line 1 pos 172;
>
> 'Project ['ACCT_ID, 'CR_RVKD_STAT_CD, 'ACCT_SFX_NUM, 'SCURT_FRD_STAT_CD,
> CLSD_REAS_CD, scalar-subquery#298 [] AS max_inst_id#299]
>
> :  +- 'Project [unresolvedalias('max('instnc_id), None)]
>
> : +- 'UnresolvedRelation `Stat_hist`
>
> +- 'Filter (instnc_id#92 = 'max_inst_id)
>
>+- SubqueryAlias stat_hist
>
>   +- Project [ACCT_ID#0, ACCT_SFX_NUM#1, CR_RVKD_STAT_CD#23,
> SCURT_FRD_STAT_CD#34, CLSD_REAS_CD#19, instnc_id#92]
>
>
>
> I have tried various combinations but I keep getting into the same
> problem: It doesn’t recognize max_inst_id as a column.
>
>
>
> The only thing that works is if I get max_inst_id in a dataframe and then
> inner join it with the original table
>
> --
>
> The information contained in this e-mail is confidential and/or
> proprietary to Capital One and/or its affiliates and may only be used
> solely in performance of work or services for Capital One. The information
> transmitted herewith is intended only for use by the individual or entity
> to which it is addressed. If the reader of this message is not the intended
> recipient, you are hereby notified that any review, retransmission,
> dissemination, distribution, copying or other use of, or taking of any
> action in reliance upon this information is strictly prohibited. If you
> have received this communication in error, please contact the sender and
> delete the material from your computer.
>


Re: [Structured Streaming] Reuse computation result

2017-12-29 Thread Lalwani, Jayesh
There is no way to solve this within spark.

One option you could do is break up your application into multiple application. 
First application can filter and write the filtered results into a kafka queue. 
Second application can read from queue and sum. Third application can read from 
queue and do count.

From: Shu Li Zheng 
Date: Tuesday, December 26, 2017 at 5:32 AM
To: "user@spark.apache.org" 
Subject: [Structured Streaming] Reuse computation result

Hi all,

I have a scenario like this:

val df = dataframe.map().filter()
// agg 1
val query1 = df.sum.writeStream.start
// agg 2
val query2 = df.count.writeStream.start

With spark streaming, we can apply persist() on rdd to reuse the df computation 
result, when we call persist() after filter() map().filter() operator only run 
once.
With SS, we can’t apply persist() direct on dataframe. query1 and query2 will 
not reuse result after filter. map/filter run twice. So is there a way to solve 
this.

Regards,

Shu li Zheng




The information contained in this e-mail is confidential and/or proprietary to 
Capital One and/or its affiliates and may only be used solely in performance of 
work or services for Capital One. The information transmitted herewith is 
intended only for use by the individual or entity to which it is addressed. If 
the reader of this message is not the intended recipient, you are hereby 
notified that any review, retransmission, dissemination, distribution, copying 
or other use of, or taking of any action in reliance upon this information is 
strictly prohibited. If you have received this communication in error, please 
contact the sender and delete the material from your computer.


Subqueries

2017-12-29 Thread Lalwani, Jayesh
I have a table, and I want to find the latest records in the table. The table 
has a column called instnc_id that is incremented everyday. So, I want to find 
the records that have the max instnc_id.

I am trying to do this using subqueries, but it gives me an error. For example, 
when I try this

select ACCT_ID, CR_RVKD_STAT_CD, ACCT_SFX_NUM, SCURT_FRD_STAT_CD, CLSD_REAS_CD 
from (select *, max(instnc_id) as max_inst_id FROM Stat_hist) where 
instnc_id=max_inst_id

the error I get is

Caused by: org.apache.spark.sql.AnalysisException: cannot resolve 
'`max_inst_id`' given input columns: [CR_RVKD_STAT_CD, ACCT_SFX_NUM, 
CLSD_REAS_CD, ACCT_ID, instnc_id, SCURT_FRD_STAT_CD]; line 1 pos 172;
'Project ['ACCT_ID, 'CR_RVKD_STAT_CD, 'ACCT_SFX_NUM, 'SCURT_FRD_STAT_CD, 
CLSD_REAS_CD, scalar-subquery#298 [] AS max_inst_id#299]
:  +- 'Project [unresolvedalias('max('instnc_id), None)]
: +- 'UnresolvedRelation `Stat_hist`
+- 'Filter (instnc_id#92 = 'max_inst_id)
   +- SubqueryAlias stat_hist
  +- Project [ACCT_ID#0, ACCT_SFX_NUM#1, CR_RVKD_STAT_CD#23, 
SCURT_FRD_STAT_CD#34, CLSD_REAS_CD#19, instnc_id#92]

I have tried various combinations but I keep getting into the same problem: It 
doesn’t recognize max_inst_id as a column.

The only thing that works is if I get max_inst_id in a dataframe and then inner 
join it with the original table


The information contained in this e-mail is confidential and/or proprietary to 
Capital One and/or its affiliates and may only be used solely in performance of 
work or services for Capital One. The information transmitted herewith is 
intended only for use by the individual or entity to which it is addressed. If 
the reader of this message is not the intended recipient, you are hereby 
notified that any review, retransmission, dissemination, distribution, copying 
or other use of, or taking of any action in reliance upon this information is 
strictly prohibited. If you have received this communication in error, please 
contact the sender and delete the material from your computer.


Re: Spark on EMR suddenly stalling

2017-12-29 Thread Shushant Arora
you may have to recreate your cluster with below configuration at emr
creation
"Configurations": [
{
"Properties": {
"maximizeResourceAllocation": "false"
},
"Classification": "spark"
}
]

On Fri, Dec 29, 2017 at 11:57 PM, Jeroen Miller 
wrote:

> On 28 Dec 2017, at 19:25, Patrick Alwell  wrote:
> > Dynamic allocation is great; but sometimes I’ve found explicitly setting
> the num executors, cores per executor, and memory per executor to be a
> better alternative.
>
> No difference with spark.dynamicAllocation.enabled set to false.
>
> JM
>
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Spark on EMR suddenly stalling

2017-12-29 Thread Jeroen Miller
On 28 Dec 2017, at 19:25, Patrick Alwell  wrote:
> Dynamic allocation is great; but sometimes I’ve found explicitly setting the 
> num executors, cores per executor, and memory per executor to be a better 
> alternative.

No difference with spark.dynamicAllocation.enabled set to false.

JM


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Fwd: Spark on EMR suddenly stalling

2017-12-29 Thread Jeroen Miller
Hello,

Just a quick update as I did not made much progress yet.

On 28 Dec 2017, at 21:09, Gourav Sengupta  wrote:
> can you try to then use the EMR version 5.10 instead or EMR version 5.11 
> instead? 

Same issue with EMR 5.11.0. Task 0 in one stage never finishes.

> can you please try selecting a subnet which is in a different availability 
> zone?

I did not try this yet. But why should that make a difference?

> if possible just try to increase the number of task instances and see the 
> difference?

I tried with 512 partitions -- no difference.

> also in case you are using caching,

No caching used.

> Also can you please report the number of containers that your job is creating 
> by looking at the metrics in the EMR console?

8 containers if I trust the directories in j-xxx/containers/application_xxx/.

> Also if you see the spark UI then you can easily see which particular step is 
> taking the longest period of time - you just have to drill in a bit in order 
> to see that. Generally in case shuffling is an issue then it definitely 
> appears in the SPARK UI as I drill into the steps and see which particular 
> one is taking the longest.

I always have issues with the Spark UI on EC2 -- it never seems to be up to 
date.

JM



-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Custom line/record delimiter

2017-12-29 Thread sk skk
Hi,

Do we have an option to write a csv or text file with a custom record/line
separator through spark ?

I could not find any ref on the api. I have a issue while loading data into
a warehouse as one of the column on csv have a new line character and the
warehouse is not letting to escape that new line character .

Thank you ,
Sk