JDBCConnectionProvider in Spark

2022-01-05 Thread Artemis User
Could someone provide some insight/examples on the usage of this API? 
https://spark.apache.org/docs/latest/api/scala/org/apache/spark/sql/jdbc/JdbcConnectionProvider.html


Why is it needed since this is an abstract class and there isn't any 
concrete implementation of it?   Thanks a lot in advance.


-- ND

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Newbie pyspark memory mgmt question

2022-01-05 Thread Andrew Davidson

Thanks Sean

Andy

From: Sean Owen 
Date: Wednesday, January 5, 2022 at 3:38 PM
To: Andrew Davidson , Nicholas Gustafson 

Cc: "user @spark" 
Subject: Re: Newbie pyspark memory mgmt question

There is no memory leak, no. You can .cache() or .persist() DataFrames, and 
that can use memory until you .unpersist(), but you're not doing that and they 
are garbage collected anyway.
Hard to say what's running out of memory without knowing more about your data 
size, partitions, cluster size, etc

On Wed, Jan 5, 2022 at 5:27 PM Andrew Davidson  
wrote:
Hi

I am running into OOM problems. My cluster should be much bigger than I need. I 
wonder if it has to do with the way I am writing my code. Below are three style 
cases. I wonder if they cause memory to be leaked?

Case 1 :

df1 = spark.read.load( cvs file)

df1 = df1.someTransform()

df1 = df1.sometranform()

df1.write(csv file)



I assume lazy evaluation. First action is write. So does not  leak memory



Case 2.

I added actions to make it easier to debug



df1 = spark.read.load( cvs file)

print( df.count() )



df1 = df1.someTransform()

print( df.count() )



df1 = df1.sometranform()

print( df.count() )



df1.write(csv file)

Does this leak memory?

Case 3.
If you remove the debug actions. You have the original version of my code.

For f in listOfFiles

df1 = spark.read.load( cvs file)

df1  = df.select( [“a”, “b”] )

print( df1.count() )

df1.createOrReplaceTempView( "df1" )



from \n\
   retDF as rc, \n\
   sample  \n\
where \n\
rc.Name == df1.Name \n'.format(“a”)
 if i == 0 :
retDF = df1
else :
retDF = self.spark.sql( sqlStmt )

   print( retDF.count() )
   retDF.createOrReplaceTempView( "retDF" )


Does this leak memory? Is there some sort of destroy(), delete(), ??? function 
I should be calling ?

I wonder if I would be better off using the dataframe version of join() ?

Kind regards

Andy



Re: Newbie pyspark memory mgmt question

2022-01-05 Thread Sean Owen
There is no memory leak, no. You can .cache() or .persist() DataFrames, and
that can use memory until you .unpersist(), but you're not doing that and
they are garbage collected anyway.
Hard to say what's running out of memory without knowing more about your
data size, partitions, cluster size, etc

On Wed, Jan 5, 2022 at 5:27 PM Andrew Davidson 
wrote:

> Hi
>
>
>
> I am running into OOM problems. My cluster should be much bigger than I
> need. I wonder if it has to do with the way I am writing my code. Below are
> three style cases. I wonder if they cause memory to be leaked?
>
>
>
> Case 1 :
>
> df1 = spark.read.load( cvs file)
>
> df1 = df1.someTransform()
>
> df1 = df1.sometranform()
>
> df1.write(csv file)
>
>
>
> I assume lazy evaluation. First action is write. So does not  leak memory
>
>
>
> Case 2.
>
> I added actions to make it easier to debug
>
>
>
> df1 = spark.read.load( cvs file)
>
> print( df.count() )
>
>
>
> df1 = df1.someTransform()
>
> print( df.count() )
>
>
>
> df1 = df1.sometranform()
>
> print( df.count() )
>
>
>
> df1.write(csv file)
>
>
>
> Does this leak memory?
>
>
>
> Case 3.
>
> If you remove the debug actions. You have the original version of my code.
>
>
>
> For f in listOfFiles
>
> df1 = spark.read.load( cvs file)
>
> df1  = df.select( [“a”, “b”] )
>
> print( df1.count() )
>
> df1.createOrReplaceTempView( "df1" )
>
>
>
> from \n\
>
>retDF as *rc*, \n\
>
>sample  \n\
>
> where \n\
>
> rc.Name == df1.Name \n'.format(“a”)
>
>  if i == 0 :
>
> retDF = df1
>
> else :
>
> retDF = self.spark.sql( sqlStmt )
>
>
>
>print( retDF.count() )
>
>retDF.createOrReplaceTempView( "retDF" )
>
>
>
>
>
> Does this leak memory? Is there some sort of destroy(), delete(), ???
> function I should be calling ?
>
>
>
> I wonder if I would be better off using the dataframe version of join() ?
>
>
>
> Kind regards
>
>
>
> Andy
>
>
>


Newbie pyspark memory mgmt question

2022-01-05 Thread Andrew Davidson
Hi

I am running into OOM problems. My cluster should be much bigger than I need. I 
wonder if it has to do with the way I am writing my code. Below are three style 
cases. I wonder if they cause memory to be leaked?

Case 1 :

df1 = spark.read.load( cvs file)

df1 = df1.someTransform()

df1 = df1.sometranform()

df1.write(csv file)



I assume lazy evaluation. First action is write. So does not  leak memory



Case 2.

I added actions to make it easier to debug



df1 = spark.read.load( cvs file)

print( df.count() )



df1 = df1.someTransform()

print( df.count() )



df1 = df1.sometranform()

print( df.count() )



df1.write(csv file)

Does this leak memory?

Case 3.
If you remove the debug actions. You have the original version of my code.

For f in listOfFiles

df1 = spark.read.load( cvs file)

df1  = df.select( [“a”, “b”] )

print( df1.count() )

df1.createOrReplaceTempView( "df1" )



from \n\
   retDF as rc, \n\
   sample  \n\
where \n\
rc.Name == df1.Name \n'.format(“a”)
 if i == 0 :
retDF = df1
else :
retDF = self.spark.sql( sqlStmt )

   print( retDF.count() )
   retDF.createOrReplaceTempView( "retDF" )


Does this leak memory? Is there some sort of destroy(), delete(), ??? function 
I should be calling ?

I wonder if I would be better off using the dataframe version of join() ?

Kind regards

Andy



Re: Spark 3.2 - ReusedExchange not present in join execution plan

2022-01-05 Thread Abdeali Kothari
Just thought I'd do a quick bump and add the dev mailing list - in case
there is some insight there
Feels like this should be categorized as a bug for spark 3.2.0

On Wed, Dec 29, 2021 at 5:25 PM Abdeali Kothari 
wrote:

> Hi,
> I am using pyspark for some projects. And one of the things we are doing
> is trying to find the tables/columns being used by Spark using the
> execution plan.
>
> When we upgrade to spark 3.2 - the spark plan seems to be different from
> previous versions - mainly when we are doing joins.
> Below is a reproducible example (you could run the same in versions 2.3 to
> 3.1 to see the difference)
>
> My original data frames have the columns: id#0 and id#4
> But after doing the joins we are seeing new columns id#34 and id#19 which
> are not created from the original dataframes I was working with.
> In previous versions of spark, this used to use a ReusedExchange step
> (shown below)
>
> I was trying to understand if this is expected in spark 3.2 where the
> execution plan seems to be creating a new data source which does not
> originate from df1 and df2 which I provided.
> NOTE: The same happens even if I read from parquet files
>
> In spark 3.2:
> In [1]: import pyspark
>...: spark = pyspark.sql.SparkSession.builder.getOrCreate()
>
> In [2]: df1 = spark.createDataFrame([[1, 10], [2, 20]], ['id', 'col1'])
>...: df2 = spark.createDataFrame([[1, 11], [2, 22], [2, 222]], ['id',
> 'col2'])
>...: df1.explain()
>...: df2.explain()
> == Physical Plan ==
> *(1) Scan ExistingRDD[id#0L,col1#1L]
>
> == Physical Plan ==
> *(1) Scan ExistingRDD[id#4L,col2#5L]
>
> In [3]: df3 = df1.join(df2, df1['id'] == df2['id']).drop(df2['id'])
>...: df4 = df2.join(df3, df1['id'] == df2['id'])
>...: df4.explain()
> == Physical Plan ==
> AdaptiveSparkPlan isFinalPlan=false
> +- SortMergeJoin [id#4L], [id#0L], Inner
>:- Sort [id#4L ASC NULLS FIRST], false, 0
>:  +- Exchange hashpartitioning(id#4L, 200), ENSURE_REQUIREMENTS,
> [id=#53]
>: +- Filter isnotnull(id#4L)
>:+- Scan ExistingRDD[id#4L,col2#5L]
>+- Project [id#0L, col1#1L, col2#20L]
>   +- SortMergeJoin [id#0L], [id#19L], Inner
>  :- Sort [id#0L ASC NULLS FIRST], false, 0
>  :  +- Exchange hashpartitioning(id#0L, 200), ENSURE_REQUIREMENTS,
> [id=#45]
>  : +- Filter isnotnull(id#0L)
>  :+- Scan ExistingRDD[id#0L,col1#1L]
>
>
>
> * +- Sort [id#19L ASC NULLS FIRST], false, 0+-
> Exchange hashpartitioning(id#19L, 200), ENSURE_REQUIREMENTS, [id=#46]
>  +- Filter isnotnull(id#19L)  +- Scan
> ExistingRDD[id#19L,col2#20L]*
>
> In [4]: df1.createOrReplaceTempView('df1')
>...: df2.createOrReplaceTempView('df2')
>...: df3 = spark.sql("""
>...: SELECT df1.id, df1.col1, df2.col2
>...: FROM df1 JOIN df2 ON df1.id = df2.id
>...: """)
>...: df3.createOrReplaceTempView('df3')
>...: df4 = spark.sql("""
>...: SELECT df2.*, df3.*
>...: FROM df2 JOIN df3 ON df2.id = df3.id
>...: """)
>...: df4.explain()
> == Physical Plan ==
> AdaptiveSparkPlan isFinalPlan=false
> +- SortMergeJoin [id#4L], [id#0L], Inner
>:- Sort [id#4L ASC NULLS FIRST], false, 0
>:  +- Exchange hashpartitioning(id#4L, 200), ENSURE_REQUIREMENTS,
> [id=#110]
>: +- Filter isnotnull(id#4L)
>:+- Scan ExistingRDD[id#4L,col2#5L]
>+- Project [id#0L, col1#1L, col2#35L]
>   +- SortMergeJoin [id#0L], [id#34L], Inner
>  :- Sort [id#0L ASC NULLS FIRST], false, 0
>  :  +- Exchange hashpartitioning(id#0L, 200), ENSURE_REQUIREMENTS,
> [id=#102]
>  : +- Filter isnotnull(id#0L)
>  :+- Scan ExistingRDD[id#0L,col1#1L]
>
>
>
> * +- Sort [id#34L ASC NULLS FIRST], false, 0+-
> Exchange hashpartitioning(id#34L, 200), ENSURE_REQUIREMENTS, [id=#103]
>  +- Filter isnotnull(id#34L)  +- Scan
> ExistingRDD[id#34L,col2#35L]*
>
>
> Doing this in spark 3.1.1 - the plan is:
>
> *(8) SortMergeJoin [id#4L], [id#0L], Inner
> :- *(2) Sort [id#4L ASC NULLS FIRST], false, 0
> :  +- Exchange hashpartitioning(id#4L, 200), ENSURE_REQUIREMENTS, [id=#56]
> : +- *(1) Filter isnotnull(id#4L)
> :+- *(1) Scan ExistingRDD[id#4L,col2#5L]
> +- *(7) Project [id#0L, col1#1L, col2#20L]
>+- *(7) SortMergeJoin [id#0L], [id#19L], Inner
>   :- *(4) Sort [id#0L ASC NULLS FIRST], false, 0
>   :  +- Exchange hashpartitioning(id#0L, 200), ENSURE_REQUIREMENTS,
> [id=#62]
>   : +- *(3) Filter isnotnull(id#0L)
>   :+- *(3) Scan ExistingRDD[id#0L,col1#1L]
>
> *  +- *(6) Sort [id#19L ASC NULLS FIRST], false, 0 +-
> ReusedExchange [id#19L, col2#20L], Exchange hashpartitioning(id#4L, 200),
> ENSURE_REQUIREMENTS, [id=#56]*
>
>


Re: pyspark

2022-01-05 Thread Mich Talebzadeh
hm,

If I understand correctly

from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark.sql import SQLContext, HiveContext
import sys

def spark_session(appName):
  return SparkSession.builder \
.appName(appName) \
.enableHiveSupport() \
.getOrCreate()

def sparkcontext():
  return SparkContext.getOrCreate()

def hivecontext():
  return HiveContext(sparkcontext())


HTH


   view my Linkedin profile




*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Wed, 5 Jan 2022 at 16:00, 流年以东” <2538974...@qq.com.invalid> wrote:

>
> In the process of using pyspark,there is no spark context when opening
> jupyter and input sc.master show that sc is not define.we want to
> initialize the spark context with script. this is error.
> hope to receive your reply
> --
> 发自我的iPhone
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org


Re: pyspark

2022-01-05 Thread Artemis User
Did you install and configure the proper Spark kernel (SparkMagic) on 
your Jupyter Lab or Hub?  See 
https://github.com/jupyter/jupyter/wiki/Jupyter-kernels for more info...



On 1/5/22 4:01 AM, 流年以东” wrote:


In the process of using pyspark,there is no spark context when opening 
jupyter and input sc.master show that sc is not define.we want to 
initialize the spark context with script. this is error.

hope to receive your reply

发自我的iPhone

-
To unsubscribe e-mail:user-unsubscr...@spark.apache.org