Jdbc Hook in Spark Batch Application

2020-12-23 Thread lec ssmi
Hi:
   guys,  I have some spark programs that have database connection
operations.  I want to acquire the connection information, such as jdbc
connection properties ,  but not too intrusive to the code.
  Any good ideas ? Can java agent make it ?


Re: [Spark Structured Streaming] Not working while worker node is on different machine

2020-12-23 Thread lec ssmi
Any more detail about it ?

bannya  于2020年12月18日周五 上午11:25写道:

> Hi,
>
> I have a spark structured streaming application that is reading data from a
> Kafka topic (16 partitions). I am using standalone mode. I have two workers
> node, one node is on the same machine with masters and another one is on a
> different machine. Both of the worker nodes has 8 cores and 16G RAM with
> one
> executor.
>
> While I run the streaming application with one worker node which is on the
> same machine as the master, the application is working fine. But while I am
> running the application with two worker nodes, 8 tasks successfully
> completed running on worker node 1 (which is on the same machine as
> masters), but the other 8 tasks are scheduled on another worker node but
> it's got stuck in the RUNNING stage and application got stuck.
>
> The normal spark application is running fine with this setup.
>
> Can anyone help me with this?
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Spark 3.0.1 Structured streaming - checkpoints fail

2020-12-23 Thread David Morin
Thanks Jungtaek
Ok I got it. I'll test it and check if the loss of efficiency is acceptable.


Le mer. 23 déc. 2020 à 23:29, Jungtaek Lim  a
écrit :

> Please refer my previous answer -
> https://lists.apache.org/thread.html/r7dfc9e47cd9651fb974f97dde756013fd0b90e49d4f6382d7a3d68f7%40%3Cuser.spark.apache.org%3E
> Probably we may want to add it in the SS guide doc. We didn't need it as
> it just didn't work with eventually consistent model, and now it works
> anyway but is very inefficient.
>
>
> On Thu, Dec 24, 2020 at 6:16 AM David Morin 
> wrote:
>
>> Does it work with the standard AWS S3 solution and its new
>> consistency model
>> 
>> ?
>>
>> Le mer. 23 déc. 2020 à 18:48, David Morin  a
>> écrit :
>>
>>> Thanks.
>>> My Spark applications run on nodes based on docker images but this is a
>>> standalone mode (1 driver - n workers)
>>> Can we use S3 directly with consistency addon like s3guard (s3a) or AWS
>>> Consistent view
>>> 
>>>  ?
>>>
>>> Le mer. 23 déc. 2020 à 17:48, Lalwani, Jayesh  a
>>> écrit :
>>>
 Yes. It is necessary to have a distributed file system because all the
 workers need to read/write to the checkpoint. The distributed file system
 has to be immediately consistent: When one node writes to it, the other
 nodes should be able to read it immediately

 The solutions/workarounds depend on where you are hosting your Spark
 application.



 *From: *David Morin 
 *Date: *Wednesday, December 23, 2020 at 11:08 AM
 *To: *"user@spark.apache.org" 
 *Subject: *[EXTERNAL] Spark 3.0.1 Structured streaming - checkpoints
 fail



 *CAUTION*: This email originated from outside of the organization. Do
 not click links or open attachments unless you can confirm the sender and
 know the content is safe.



 Hello,



 I have an issue with my Pyspark job related to checkpoint.



 Caused by: org.apache.spark.SparkException: Job aborted due to stage
 failure: Task 3 in stage 16997.0 failed 4 times, most recent failure: Lost
 task 3.3 in stage 16997.0 (TID 206609, 10.XXX, executor 4):
 java.lang.IllegalStateException: Error reading delta file
 file:/opt/spark/workdir/query6/checkpointlocation/state/0/3/1.delta of
 HDFSStateStoreProvider[id = (op=0,part=3),dir =
 file:/opt/spark/workdir/query6/checkpointlocation/state/0/3]: 
 *file:/opt/spark/workdir/query6/checkpointlocation/state/0/3/1.delta
 does not exist*



 This job is based on Spark 3.0.1 and Structured Streaming

 This Spark cluster (1 driver and 6 executors) works without hdfs. And
 we don't want to manage an hdfs cluster if possible.

 Is it necessary to have a distributed filesystem ? What are the
 different solutions/workarounds ?



 Thanks in advance

 David

>>>


Re: Using UDF based on Numpy functions in Spark SQL

2020-12-23 Thread Sean Owen
Why do you want to use this function instead of the built-in stddev
function?

On Wed, Dec 23, 2020 at 2:52 PM Mich Talebzadeh 
wrote:

> Hi,
>
>
> This is a shot in the dark so to speak.
>
>
> I would like to use the standard deviation std offered by numpy in
> PySpark. I am using SQL for now
>
>
> The code as below
>
>
>   sqltext = f"""
>
>   SELECT
>
>   rs.Customer_ID
>
> , rs.Number_of_orders
>
> , rs.Total_customer_amount
>
> , rs.Average_order
>
> , rs.Standard_deviation
>
>   FROM
>
>   (
>
> SELECT cust_id AS Customer_ID,
>
> COUNT(amount_sold) AS Number_of_orders,
>
> SUM(amount_sold) AS Total_customer_amount,
>
> AVG(amount_sold) AS Average_order,
>
>   *  STDDEV(amount_sold) AS Standard_deviation*
>
> FROM {DB}.{table}
>
> GROUP BY cust_id
>
> HAVING SUM(amount_sold) > 94000
>
> AND AVG(amount_sold) < STDDEV(amount_sold)
>
>   ) rs
>
>   ORDER BY
>
>   3 DESC
>
>   """
>
>   spark.sql(sqltext)
>
> Now if I wanted to use UDF based on numpy STD function, I can do
>
> import numpy as np
> from pyspark.sql.functions import UserDefinedFunction
> from pyspark.sql.types import DoubleType
> udf = UserDefinedFunction(np.std, DoubleType())
>
> How can I use that udf with spark SQL? I gather this is only possible
> through functional programming?
>
> Thanks,
>
> Mich
>
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>


Re: Spark 3.0.1 Structured streaming - checkpoints fail

2020-12-23 Thread Jungtaek Lim
Please refer my previous answer -
https://lists.apache.org/thread.html/r7dfc9e47cd9651fb974f97dde756013fd0b90e49d4f6382d7a3d68f7%40%3Cuser.spark.apache.org%3E
Probably we may want to add it in the SS guide doc. We didn't need it as it
just didn't work with eventually consistent model, and now it works anyway
but is very inefficient.


On Thu, Dec 24, 2020 at 6:16 AM David Morin 
wrote:

> Does it work with the standard AWS S3 solution and its new
> consistency model
> 
> ?
>
> Le mer. 23 déc. 2020 à 18:48, David Morin  a
> écrit :
>
>> Thanks.
>> My Spark applications run on nodes based on docker images but this is a
>> standalone mode (1 driver - n workers)
>> Can we use S3 directly with consistency addon like s3guard (s3a) or AWS
>> Consistent view
>> 
>>  ?
>>
>> Le mer. 23 déc. 2020 à 17:48, Lalwani, Jayesh  a
>> écrit :
>>
>>> Yes. It is necessary to have a distributed file system because all the
>>> workers need to read/write to the checkpoint. The distributed file system
>>> has to be immediately consistent: When one node writes to it, the other
>>> nodes should be able to read it immediately
>>>
>>> The solutions/workarounds depend on where you are hosting your Spark
>>> application.
>>>
>>>
>>>
>>> *From: *David Morin 
>>> *Date: *Wednesday, December 23, 2020 at 11:08 AM
>>> *To: *"user@spark.apache.org" 
>>> *Subject: *[EXTERNAL] Spark 3.0.1 Structured streaming - checkpoints
>>> fail
>>>
>>>
>>>
>>> *CAUTION*: This email originated from outside of the organization. Do
>>> not click links or open attachments unless you can confirm the sender and
>>> know the content is safe.
>>>
>>>
>>>
>>> Hello,
>>>
>>>
>>>
>>> I have an issue with my Pyspark job related to checkpoint.
>>>
>>>
>>>
>>> Caused by: org.apache.spark.SparkException: Job aborted due to stage
>>> failure: Task 3 in stage 16997.0 failed 4 times, most recent failure: Lost
>>> task 3.3 in stage 16997.0 (TID 206609, 10.XXX, executor 4):
>>> java.lang.IllegalStateException: Error reading delta file
>>> file:/opt/spark/workdir/query6/checkpointlocation/state/0/3/1.delta of
>>> HDFSStateStoreProvider[id = (op=0,part=3),dir =
>>> file:/opt/spark/workdir/query6/checkpointlocation/state/0/3]: 
>>> *file:/opt/spark/workdir/query6/checkpointlocation/state/0/3/1.delta
>>> does not exist*
>>>
>>>
>>>
>>> This job is based on Spark 3.0.1 and Structured Streaming
>>>
>>> This Spark cluster (1 driver and 6 executors) works without hdfs. And we
>>> don't want to manage an hdfs cluster if possible.
>>>
>>> Is it necessary to have a distributed filesystem ? What are the
>>> different solutions/workarounds ?
>>>
>>>
>>>
>>> Thanks in advance
>>>
>>> David
>>>
>>


Re: Using UDF based on Numpy functions in Spark SQL

2020-12-23 Thread Mich Talebzadeh
OK Thanks for the tip.

I found this link useful for Python from Databricks

User-defined functions - Python — Databricks Documentation






LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*





*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Wed, 23 Dec 2020 at 21:31, Peyman Mohajerian  wrote:

>
> https://stackoverflow.com/questions/43484269/how-to-register-udf-to-use-in-sql-and-dataframe
>
> On Wed, Dec 23, 2020 at 12:52 PM Mich Talebzadeh <
> mich.talebza...@gmail.com> wrote:
>
>> Hi,
>>
>>
>> This is a shot in the dark so to speak.
>>
>>
>> I would like to use the standard deviation std offered by numpy in
>> PySpark. I am using SQL for now
>>
>>
>> The code as below
>>
>>
>>   sqltext = f"""
>>
>>   SELECT
>>
>>   rs.Customer_ID
>>
>> , rs.Number_of_orders
>>
>> , rs.Total_customer_amount
>>
>> , rs.Average_order
>>
>> , rs.Standard_deviation
>>
>>   FROM
>>
>>   (
>>
>> SELECT cust_id AS Customer_ID,
>>
>> COUNT(amount_sold) AS Number_of_orders,
>>
>> SUM(amount_sold) AS Total_customer_amount,
>>
>> AVG(amount_sold) AS Average_order,
>>
>>   *  STDDEV(amount_sold) AS Standard_deviation*
>>
>> FROM {DB}.{table}
>>
>> GROUP BY cust_id
>>
>> HAVING SUM(amount_sold) > 94000
>>
>> AND AVG(amount_sold) < STDDEV(amount_sold)
>>
>>   ) rs
>>
>>   ORDER BY
>>
>>   3 DESC
>>
>>   """
>>
>>   spark.sql(sqltext)
>>
>> Now if I wanted to use UDF based on numpy STD function, I can do
>>
>> import numpy as np
>> from pyspark.sql.functions import UserDefinedFunction
>> from pyspark.sql.types import DoubleType
>> udf = UserDefinedFunction(np.std, DoubleType())
>>
>> How can I use that udf with spark SQL? I gather this is only possible
>> through functional programming?
>>
>> Thanks,
>>
>> Mich
>>
>>
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> *
>>
>>
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>


Re: Using UDF based on Numpy functions in Spark SQL

2020-12-23 Thread Peyman Mohajerian
https://stackoverflow.com/questions/43484269/how-to-register-udf-to-use-in-sql-and-dataframe

On Wed, Dec 23, 2020 at 12:52 PM Mich Talebzadeh 
wrote:

> Hi,
>
>
> This is a shot in the dark so to speak.
>
>
> I would like to use the standard deviation std offered by numpy in
> PySpark. I am using SQL for now
>
>
> The code as below
>
>
>   sqltext = f"""
>
>   SELECT
>
>   rs.Customer_ID
>
> , rs.Number_of_orders
>
> , rs.Total_customer_amount
>
> , rs.Average_order
>
> , rs.Standard_deviation
>
>   FROM
>
>   (
>
> SELECT cust_id AS Customer_ID,
>
> COUNT(amount_sold) AS Number_of_orders,
>
> SUM(amount_sold) AS Total_customer_amount,
>
> AVG(amount_sold) AS Average_order,
>
>   *  STDDEV(amount_sold) AS Standard_deviation*
>
> FROM {DB}.{table}
>
> GROUP BY cust_id
>
> HAVING SUM(amount_sold) > 94000
>
> AND AVG(amount_sold) < STDDEV(amount_sold)
>
>   ) rs
>
>   ORDER BY
>
>   3 DESC
>
>   """
>
>   spark.sql(sqltext)
>
> Now if I wanted to use UDF based on numpy STD function, I can do
>
> import numpy as np
> from pyspark.sql.functions import UserDefinedFunction
> from pyspark.sql.types import DoubleType
> udf = UserDefinedFunction(np.std, DoubleType())
>
> How can I use that udf with spark SQL? I gather this is only possible
> through functional programming?
>
> Thanks,
>
> Mich
>
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>


Re: Spark 3.0.1 Structured streaming - checkpoints fail

2020-12-23 Thread David Morin
Does it work with the standard AWS S3 solution and its new consistency model

?

Le mer. 23 déc. 2020 à 18:48, David Morin  a
écrit :

> Thanks.
> My Spark applications run on nodes based on docker images but this is a
> standalone mode (1 driver - n workers)
> Can we use S3 directly with consistency addon like s3guard (s3a) or AWS
> Consistent view
> 
>  ?
>
> Le mer. 23 déc. 2020 à 17:48, Lalwani, Jayesh  a
> écrit :
>
>> Yes. It is necessary to have a distributed file system because all the
>> workers need to read/write to the checkpoint. The distributed file system
>> has to be immediately consistent: When one node writes to it, the other
>> nodes should be able to read it immediately
>>
>> The solutions/workarounds depend on where you are hosting your Spark
>> application.
>>
>>
>>
>> *From: *David Morin 
>> *Date: *Wednesday, December 23, 2020 at 11:08 AM
>> *To: *"user@spark.apache.org" 
>> *Subject: *[EXTERNAL] Spark 3.0.1 Structured streaming - checkpoints fail
>>
>>
>>
>> *CAUTION*: This email originated from outside of the organization. Do
>> not click links or open attachments unless you can confirm the sender and
>> know the content is safe.
>>
>>
>>
>> Hello,
>>
>>
>>
>> I have an issue with my Pyspark job related to checkpoint.
>>
>>
>>
>> Caused by: org.apache.spark.SparkException: Job aborted due to stage
>> failure: Task 3 in stage 16997.0 failed 4 times, most recent failure: Lost
>> task 3.3 in stage 16997.0 (TID 206609, 10.XXX, executor 4):
>> java.lang.IllegalStateException: Error reading delta file
>> file:/opt/spark/workdir/query6/checkpointlocation/state/0/3/1.delta of
>> HDFSStateStoreProvider[id = (op=0,part=3),dir =
>> file:/opt/spark/workdir/query6/checkpointlocation/state/0/3]: 
>> *file:/opt/spark/workdir/query6/checkpointlocation/state/0/3/1.delta
>> does not exist*
>>
>>
>>
>> This job is based on Spark 3.0.1 and Structured Streaming
>>
>> This Spark cluster (1 driver and 6 executors) works without hdfs. And we
>> don't want to manage an hdfs cluster if possible.
>>
>> Is it necessary to have a distributed filesystem ? What are the different
>> solutions/workarounds ?
>>
>>
>>
>> Thanks in advance
>>
>> David
>>
>


Using UDF based on Numpy functions in Spark SQL

2020-12-23 Thread Mich Talebzadeh
Hi,


This is a shot in the dark so to speak.


I would like to use the standard deviation std offered by numpy in PySpark.
I am using SQL for now


The code as below


  sqltext = f"""

  SELECT

  rs.Customer_ID

, rs.Number_of_orders

, rs.Total_customer_amount

, rs.Average_order

, rs.Standard_deviation

  FROM

  (

SELECT cust_id AS Customer_ID,

COUNT(amount_sold) AS Number_of_orders,

SUM(amount_sold) AS Total_customer_amount,

AVG(amount_sold) AS Average_order,

  *  STDDEV(amount_sold) AS Standard_deviation*

FROM {DB}.{table}

GROUP BY cust_id

HAVING SUM(amount_sold) > 94000

AND AVG(amount_sold) < STDDEV(amount_sold)

  ) rs

  ORDER BY

  3 DESC

  """

  spark.sql(sqltext)

Now if I wanted to use UDF based on numpy STD function, I can do

import numpy as np
from pyspark.sql.functions import UserDefinedFunction
from pyspark.sql.types import DoubleType
udf = UserDefinedFunction(np.std, DoubleType())

How can I use that udf with spark SQL? I gather this is only possible
through functional programming?

Thanks,

Mich




LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*





*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.


Re: Spark 3.0.1 Structured streaming - checkpoints fail

2020-12-23 Thread David Morin
Thanks.
My Spark applications run on nodes based on docker images but this is a
standalone mode (1 driver - n workers)
Can we use S3 directly with consistency addon like s3guard (s3a) or AWS
Consistent view

 ?

Le mer. 23 déc. 2020 à 17:48, Lalwani, Jayesh  a
écrit :

> Yes. It is necessary to have a distributed file system because all the
> workers need to read/write to the checkpoint. The distributed file system
> has to be immediately consistent: When one node writes to it, the other
> nodes should be able to read it immediately
>
> The solutions/workarounds depend on where you are hosting your Spark
> application.
>
>
>
> *From: *David Morin 
> *Date: *Wednesday, December 23, 2020 at 11:08 AM
> *To: *"user@spark.apache.org" 
> *Subject: *[EXTERNAL] Spark 3.0.1 Structured streaming - checkpoints fail
>
>
>
> *CAUTION*: This email originated from outside of the organization. Do not
> click links or open attachments unless you can confirm the sender and know
> the content is safe.
>
>
>
> Hello,
>
>
>
> I have an issue with my Pyspark job related to checkpoint.
>
>
>
> Caused by: org.apache.spark.SparkException: Job aborted due to stage
> failure: Task 3 in stage 16997.0 failed 4 times, most recent failure: Lost
> task 3.3 in stage 16997.0 (TID 206609, 10.XXX, executor 4):
> java.lang.IllegalStateException: Error reading delta file
> file:/opt/spark/workdir/query6/checkpointlocation/state/0/3/1.delta of
> HDFSStateStoreProvider[id = (op=0,part=3),dir =
> file:/opt/spark/workdir/query6/checkpointlocation/state/0/3]: 
> *file:/opt/spark/workdir/query6/checkpointlocation/state/0/3/1.delta
> does not exist*
>
>
>
> This job is based on Spark 3.0.1 and Structured Streaming
>
> This Spark cluster (1 driver and 6 executors) works without hdfs. And we
> don't want to manage an hdfs cluster if possible.
>
> Is it necessary to have a distributed filesystem ? What are the different
> solutions/workarounds ?
>
>
>
> Thanks in advance
>
> David
>


Re: Spark 3.0.1 Structured streaming - checkpoints fail

2020-12-23 Thread Lalwani, Jayesh
Yes. It is necessary to have a distributed file system because all the workers 
need to read/write to the checkpoint. The distributed file system has to be 
immediately consistent: When one node writes to it, the other nodes should be 
able to read it immediately
The solutions/workarounds depend on where you are hosting your Spark 
application.

From: David Morin 
Date: Wednesday, December 23, 2020 at 11:08 AM
To: "user@spark.apache.org" 
Subject: [EXTERNAL] Spark 3.0.1 Structured streaming - checkpoints fail


CAUTION: This email originated from outside of the organization. Do not click 
links or open attachments unless you can confirm the sender and know the 
content is safe.


Hello,

I have an issue with my Pyspark job related to checkpoint.

Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: 
Task 3 in stage 16997.0 failed 4 times, most recent failure: Lost task 3.3 in 
stage 16997.0 (TID 206609, 10.XXX, executor 4): 
java.lang.IllegalStateException: Error reading delta file 
file:/opt/spark/workdir/query6/checkpointlocation/state/0/3/1.delta of 
HDFSStateStoreProvider[id = (op=0,part=3),dir = 
file:/opt/spark/workdir/query6/checkpointlocation/state/0/3]: 
file:/opt/spark/workdir/query6/checkpointlocation/state/0/3/1.delta does not 
exist

This job is based on Spark 3.0.1 and Structured Streaming
This Spark cluster (1 driver and 6 executors) works without hdfs. And we don't 
want to manage an hdfs cluster if possible.
Is it necessary to have a distributed filesystem ? What are the different 
solutions/workarounds ?

Thanks in advance
David


Spark 3.0.1 Structured streaming - checkpoints fail

2020-12-23 Thread David Morin
Hello,

I have an issue with my Pyspark job related to checkpoint.

Caused by: org.apache.spark.SparkException: Job aborted due to stage
failure: Task 3 in stage 16997.0 failed 4 times, most recent failure: Lost
task 3.3 in stage 16997.0 (TID 206609, 10.XXX, executor 4):
java.lang.IllegalStateException: Error reading delta file
file:/opt/spark/workdir/query6/checkpointlocation/state/0/3/1.delta of
HDFSStateStoreProvider[id = (op=0,part=3),dir =
file:/opt/spark/workdir/query6/checkpointlocation/state/0/3]:
*file:/opt/spark/workdir/query6/checkpointlocation/state/0/3/1.delta
does not exist*

This job is based on Spark 3.0.1 and Structured Streaming
This Spark cluster (1 driver and 6 executors) works without hdfs. And we
don't want to manage an hdfs cluster if possible.
Is it necessary to have a distributed filesystem ? What are the different
solutions/workarounds ?

Thanks in advance
David