Re: Spark Sql group by less performant

2018-12-10 Thread Georg Heiler
See
https://databricks.com/blog/2016/05/19/approximate-algorithms-in-apache-spark-hyperloglog-and-quantiles.html
you most probably do not require exact counts.

Am Di., 11. Dez. 2018 um 02:09 Uhr schrieb 15313776907 <15313776...@163.com
>:

> i think you can add executer memory
>
> 15313776907
> 邮箱:15313776...@163.com
>
> 
>
> 签名由 网易邮箱大师  定制
>
> On 12/11/2018 08:28, lsn24  wrote:
> Hello,
>
> I have a requirement where I need to get total count of rows and total
> count of failedRows based on a grouping.
>
> The code looks like below:
>
> myDataset.createOrReplaceTempView("temp_view");
>
> Dataset  countDataset = sparkSession.sql("Select
> column1,column2,column3,column4,column5,column6,column7,column8, count(*)
> as
> totalRows, sum(CASE WHEN (column8 is NULL) THEN 1 ELSE 0 END) as
> failedRows
> from temp_view group by
> column1,column2,column3,column4,column5,column6,column7,column8");
>
>
> Up till around 50 Million records,  the query performance was ok. After
> that
> it gave it up. Mostly resulting in out of Memory exception.
>
> I read documentation and blogs, most of them gives me examples of
> RDD.reduceByKey. But here I got dataset and spark Sql.
>
> What  am I missing here ? .
>
> Any help will be appreciated.
>
> Thanks!
>
>
>
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Why does join use rows that were sent after watermark of 20 seconds?

2018-12-10 Thread Jungtaek Lim
Please refer the structured streaming guide doc which is very clear of
representing when the query will have unbounded state.

http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#inner-joins-with-optional-watermarking

Quoting the doc:



In other words, you will have to do the following additional steps in the
join.

   1.

   Define watermark delays on both inputs such that the engine knows how
   delayed the input can be (similar to streaming aggregations)
   2.

   Define a constraint on event-time across the two inputs such that the
   engine can figure out when old rows of one input is not going to be
   required (i.e. will not satisfy the time constraint) for matches with the
   other input. This constraint can be defined in one of the two ways.
   1.

  Time range join conditions (e.g. ...JOIN ON leftTime BETWEEN
  rightTime AND rightTime + INTERVAL 1 HOUR),
  2.

  Join on event-time windows (e.g. ...JOIN ON leftTimeWindow =
  rightTimeWindow).



So yes, join condition should directly deal with timestamp column,
otherwise state will grow infinitely.

Thanks,
Jungtaek Lim (HeartSaVioR)

2018년 12월 11일 (화) 오후 2:52, Abhijeet Kumar 님이
작성:

> You mean to say that Spark will store all the data in memory forever :)
>
>
> On 10-Dec-2018, at 6:16 PM, Sandeep Katta <
> sandeep0102.opensou...@gmail.com> wrote:
>
> Hi Abhijeet,
>
> You are using inner join with unbounded state which means every data in
> stream ll match with  other stream infinitely,
>   If you want the intended behaviour you should add time stamp conditions
> or window operator in join condition
>
>
>
> On Mon, 10 Dec 2018 at 5:23 PM, Abhijeet Kumar <
> abhijeet.ku...@sentienz.com> wrote:
>
>> Hello,
>>
>> I’m using watermark to join two streams as you can see below:
>>
>> val order_wm = order_details.withWatermark("tstamp_trans", "20 seconds")val 
>> invoice_wm = invoice_details.withWatermark("tstamp_trans", "20 seconds")val 
>> join_df = order_wm
>>   .join(invoice_wm, order_wm.col("s_order_id") === 
>> invoice_wm.col("order_id"))
>>
>> My understanding with the above code, it will keep each of the stream for
>> 20 secs. After it comes but, when I’m giving one stream now and the another
>> after 20secs then also both are getting joined. It seems like even after
>> watermark got finished Spark is holding the data in memory. I even tried
>> after 45 seconds and that was getting joined too.
>> I’m sending streams from two Kafka queues and tstamp_trans I’m creating
>> with current timestamp values.
>>
>> This is creating confusion in my mind regarding watermark.
>>
>> Thank you,
>> Abhijeet Kumar
>>
>
>


Re: Why does join use rows that were sent after watermark of 20 seconds?

2018-12-10 Thread Abhijeet Kumar
You mean to say that Spark will store all the data in memory forever :)

> On 10-Dec-2018, at 6:16 PM, Sandeep Katta  
> wrote:
> 
> Hi Abhijeet,
> 
> You are using inner join with unbounded state which means every data in 
> stream ll match with  other stream infinitely, 
>   If you want the intended behaviour you should add time stamp conditions or 
> window operator in join condition
> 
> 
> 
> On Mon, 10 Dec 2018 at 5:23 PM, Abhijeet Kumar  > wrote:
> Hello,
> 
> I’m using watermark to join two streams as you can see below:
> 
> val order_wm = order_details.withWatermark("tstamp_trans", "20 seconds")
> val invoice_wm = invoice_details.withWatermark("tstamp_trans", "20 seconds")
> val join_df = order_wm
>   .join(invoice_wm, order_wm.col("s_order_id") === invoice_wm.col("order_id"))
> My understanding with the above code, it will keep each of the stream for 20 
> secs. After it comes but, when I’m giving one stream now and the another 
> after 20secs then also both are getting joined. It seems like even after 
> watermark got finished Spark is holding the data in memory. I even tried 
> after 45 seconds and that was getting joined too.
> 
> I’m sending streams from two Kafka queues and tstamp_trans I’m creating with 
> current timestamp values.
> This is creating confusion in my mind regarding watermark.
> 
> 
> Thank you,
> Abhijeet Kumar



Re: Spark Sql group by less performant

2018-12-10 Thread 15313776907
i think you can add executer memory


| |
15313776907
|
|
邮箱:15313776...@163.com
|

签名由 网易邮箱大师 定制

On 12/11/2018 08:28, lsn24 wrote:
Hello,

I have a requirement where I need to get total count of rows and total
count of failedRows based on a grouping.

The code looks like below:

myDataset.createOrReplaceTempView("temp_view");

Dataset  countDataset = sparkSession.sql("Select
column1,column2,column3,column4,column5,column6,column7,column8, count(*) as
totalRows, sum(CASE WHEN (column8 is NULL) THEN 1 ELSE 0 END) as failedRows
from temp_view group by
column1,column2,column3,column4,column5,column6,column7,column8");


Up till around 50 Million records,  the query performance was ok. After that
it gave it up. Mostly resulting in out of Memory exception.

I read documentation and blogs, most of them gives me examples of
RDD.reduceByKey. But here I got dataset and spark Sql.

What  am I missing here ? .

Any help will be appreciated.

Thanks!






--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org


Spark Sql group by less performant

2018-12-10 Thread lsn24
Hello,

 I have a requirement where I need to get total count of rows and total
count of failedRows based on a grouping.

The code looks like below:

 myDataset.createOrReplaceTempView("temp_view");

Dataset  countDataset = sparkSession.sql("Select
column1,column2,column3,column4,column5,column6,column7,column8, count(*) as
totalRows, sum(CASE WHEN (column8 is NULL) THEN 1 ELSE 0 END) as failedRows 
from temp_view group by
column1,column2,column3,column4,column5,column6,column7,column8");


Up till around 50 Million records,  the query performance was ok. After that
it gave it up. Mostly resulting in out of Memory exception.

I read documentation and blogs, most of them gives me examples of
RDD.reduceByKey. But here I got dataset and spark Sql.

What  am I missing here ? .

Any help will be appreciated.

Thanks!






--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Why does join use rows that were sent after watermark of 20 seconds?

2018-12-10 Thread Abhijeet Kumar
Hello,

I’m using watermark to join two streams as you can see below:

val order_wm = order_details.withWatermark("tstamp_trans", "20 seconds")
val invoice_wm = invoice_details.withWatermark("tstamp_trans", "20 seconds")
val join_df = order_wm
  .join(invoice_wm, order_wm.col("s_order_id") === invoice_wm.col("order_id"))
My understanding with the above code, it will keep each of the stream for 20 
secs. After it comes but, when I’m giving one stream now and the another after 
20secs then also both are getting joined. It seems like even after watermark 
got finished Spark is holding the data in memory. I even tried after 45 seconds 
and that was getting joined too.

I’m sending streams from two Kafka queues and tstamp_trans I’m creating with 
current timestamp values.
This is creating confusion in my mind regarding watermark.


Thank you,
Abhijeet Kumar