Proposed additional function to create fold_column for better integration of Spark data frames with H2O

2022-01-06 Thread Chester Gan
Idea: PySpark function to create fold indices (numbers from 0, ..., N-1,
where N := number of folds needed for k-fold CV during auto ML training) on
train & test datasets

```
# train & test are PySpark dataframes of the train & test datasets
respectively
import pyspark.sql.functions as F
from functools import reduce  # For Python 3.x
from pyspark.sql import DataFrame
def unionAll(*dfs):
return reduce(DataFrame.unionAll, dfs)

def create_fold_column(input_df,
list_of_fold_group_columns=list_of_fold_group_columns,
fold_column=fold_column, nfolds=nfolds):

if fold_column:
# split_into_frikkin_folds... we get a list of Spark dataframes of
group ids
fold_group_ids_list_of_dataframes =
input_df.drop_duplicates(subset=list_of_fold_group_columns)[list_of_fold_group_columns].randomSplit(nfolds
* [1 / nfolds], seed=42)

for index in range(0, len(fold_group_ids_list_of_dataframes)):
fold_group_ids_list_of_dataframes[index] =
fold_group_ids_list_of_dataframes[index].withColumn(fold_column,
F.lit(index))

fold_groups_ids_dataframes_union =
unionAll(*fold_group_ids_list_of_dataframes)
input_df = input_df.join(fold_groups_ids_dataframes_union,
on=list_of_fold_group_columns)

return(input_df)

train = train.transform(create_fold_column)
# Dummy fold_column with single number, nfolds (typically 5), to prevent
H2O from error-ing out
if fold_column:
test = test.withColumn(fold_column, F.lit(nfolds))

```


Re: Spark 3.2 - ReusedExchange not present in join execution plan

2022-01-06 Thread Abdeali Kothari
Thanks a lot for the reply Albert.

On looking at it and reading about it further - I do see that
"AdaptiveSparkPlan isFinalPlan=false" is mentioned.

Could you point me to how I can see the final plan ? I couldn't find that
in any of the resources I was referring to

On Fri, 7 Jan 2022, 07:25 Albert,  wrote:

> I happen to encounter something similar.
>
> it's probably because you are just `explain` it. when you actually `run`
> it. you will get the final spark plan in which case the exchange will be
> reused.
> right, this is different compared with 3.1 probably because the upgraded
> aqe.
>
> not sure whether this is expected though.
>
> On Thu, Jan 6, 2022 at 12:11 AM Abdeali Kothari 
> wrote:
>
>> Just thought I'd do a quick bump and add the dev mailing list - in case
>> there is some insight there
>> Feels like this should be categorized as a bug for spark 3.2.0
>>
>> On Wed, Dec 29, 2021 at 5:25 PM Abdeali Kothari 
>> wrote:
>>
>>> Hi,
>>> I am using pyspark for some projects. And one of the things we are doing
>>> is trying to find the tables/columns being used by Spark using the
>>> execution plan.
>>>
>>> When we upgrade to spark 3.2 - the spark plan seems to be different from
>>> previous versions - mainly when we are doing joins.
>>> Below is a reproducible example (you could run the same in versions 2.3
>>> to 3.1 to see the difference)
>>>
>>> My original data frames have the columns: id#0 and id#4
>>> But after doing the joins we are seeing new columns id#34 and id#19
>>> which are not created from the original dataframes I was working with.
>>> In previous versions of spark, this used to use a ReusedExchange step
>>> (shown below)
>>>
>>> I was trying to understand if this is expected in spark 3.2 where the
>>> execution plan seems to be creating a new data source which does not
>>> originate from df1 and df2 which I provided.
>>> NOTE: The same happens even if I read from parquet files
>>>
>>> In spark 3.2:
>>> In [1]: import pyspark
>>>...: spark = pyspark.sql.SparkSession.builder.getOrCreate()
>>>
>>> In [2]: df1 = spark.createDataFrame([[1, 10], [2, 20]], ['id', 'col1'])
>>>...: df2 = spark.createDataFrame([[1, 11], [2, 22], [2, 222]], ['id',
>>> 'col2'])
>>>...: df1.explain()
>>>...: df2.explain()
>>> == Physical Plan ==
>>> *(1) Scan ExistingRDD[id#0L,col1#1L]
>>>
>>> == Physical Plan ==
>>> *(1) Scan ExistingRDD[id#4L,col2#5L]
>>>
>>> In [3]: df3 = df1.join(df2, df1['id'] == df2['id']).drop(df2['id'])
>>>...: df4 = df2.join(df3, df1['id'] == df2['id'])
>>>...: df4.explain()
>>> == Physical Plan ==
>>> AdaptiveSparkPlan isFinalPlan=false
>>> +- SortMergeJoin [id#4L], [id#0L], Inner
>>>:- Sort [id#4L ASC NULLS FIRST], false, 0
>>>:  +- Exchange hashpartitioning(id#4L, 200), ENSURE_REQUIREMENTS,
>>> [id=#53]
>>>: +- Filter isnotnull(id#4L)
>>>:+- Scan ExistingRDD[id#4L,col2#5L]
>>>+- Project [id#0L, col1#1L, col2#20L]
>>>   +- SortMergeJoin [id#0L], [id#19L], Inner
>>>  :- Sort [id#0L ASC NULLS FIRST], false, 0
>>>  :  +- Exchange hashpartitioning(id#0L, 200),
>>> ENSURE_REQUIREMENTS, [id=#45]
>>>  : +- Filter isnotnull(id#0L)
>>>  :+- Scan ExistingRDD[id#0L,col1#1L]
>>>
>>>
>>>
>>> * +- Sort [id#19L ASC NULLS FIRST], false, 0+-
>>> Exchange hashpartitioning(id#19L, 200), ENSURE_REQUIREMENTS, [id=#46]
>>>  +- Filter isnotnull(id#19L)  +- Scan
>>> ExistingRDD[id#19L,col2#20L]*
>>>
>>> In [4]: df1.createOrReplaceTempView('df1')
>>>...: df2.createOrReplaceTempView('df2')
>>>...: df3 = spark.sql("""
>>>...: SELECT df1.id, df1.col1, df2.col2
>>>...: FROM df1 JOIN df2 ON df1.id = df2.id
>>>...: """)
>>>...: df3.createOrReplaceTempView('df3')
>>>...: df4 = spark.sql("""
>>>...: SELECT df2.*, df3.*
>>>...: FROM df2 JOIN df3 ON df2.id = df3.id
>>>...: """)
>>>...: df4.explain()
>>> == Physical Plan ==
>>> AdaptiveSparkPlan isFinalPlan=false
>>> +- SortMergeJoin [id#4L], [id#0L], Inner
>>>:- Sort [id#4L ASC NULLS FIRST], false, 0
>>>:  +- Exchange hashpartitioning(id#4L, 200), ENSURE_REQUIREMENTS,
>>> [id=#110]
>>>: +- Filter isnotnull(id#4L)
>>>:+- Scan ExistingRDD[id#4L,col2#5L]
>>>+- Project [id#0L, col1#1L, col2#35L]
>>>   +- SortMergeJoin [id#0L], [id#34L], Inner
>>>  :- Sort [id#0L ASC NULLS FIRST], false, 0
>>>  :  +- Exchange hashpartitioning(id#0L, 200),
>>> ENSURE_REQUIREMENTS, [id=#102]
>>>  : +- Filter isnotnull(id#0L)
>>>  :+- Scan ExistingRDD[id#0L,col1#1L]
>>>
>>>
>>>
>>> * +- Sort [id#34L ASC NULLS FIRST], false, 0+-
>>> Exchange hashpartitioning(id#34L, 200), ENSURE_REQUIREMENTS, [id=#103]
>>>  +- Filter isnotnull(id#34L)  +- Scan
>>> ExistingRDD[id#34L,col2#35L]*
>>>
>>>
>>> Doing this in spark 3.1.1 - the plan is:
>>>
>>> *(8) SortMergeJoin [id#4L], 

Re: Spark 3.2 - ReusedExchange not present in join execution plan

2022-01-06 Thread Albert
I happen to encounter something similar.

it's probably because you are just `explain` it. when you actually `run`
it. you will get the final spark plan in which case the exchange will be
reused.
right, this is different compared with 3.1 probably because the upgraded
aqe.

not sure whether this is expected though.

On Thu, Jan 6, 2022 at 12:11 AM Abdeali Kothari 
wrote:

> Just thought I'd do a quick bump and add the dev mailing list - in case
> there is some insight there
> Feels like this should be categorized as a bug for spark 3.2.0
>
> On Wed, Dec 29, 2021 at 5:25 PM Abdeali Kothari 
> wrote:
>
>> Hi,
>> I am using pyspark for some projects. And one of the things we are doing
>> is trying to find the tables/columns being used by Spark using the
>> execution plan.
>>
>> When we upgrade to spark 3.2 - the spark plan seems to be different from
>> previous versions - mainly when we are doing joins.
>> Below is a reproducible example (you could run the same in versions 2.3
>> to 3.1 to see the difference)
>>
>> My original data frames have the columns: id#0 and id#4
>> But after doing the joins we are seeing new columns id#34 and id#19 which
>> are not created from the original dataframes I was working with.
>> In previous versions of spark, this used to use a ReusedExchange step
>> (shown below)
>>
>> I was trying to understand if this is expected in spark 3.2 where the
>> execution plan seems to be creating a new data source which does not
>> originate from df1 and df2 which I provided.
>> NOTE: The same happens even if I read from parquet files
>>
>> In spark 3.2:
>> In [1]: import pyspark
>>...: spark = pyspark.sql.SparkSession.builder.getOrCreate()
>>
>> In [2]: df1 = spark.createDataFrame([[1, 10], [2, 20]], ['id', 'col1'])
>>...: df2 = spark.createDataFrame([[1, 11], [2, 22], [2, 222]], ['id',
>> 'col2'])
>>...: df1.explain()
>>...: df2.explain()
>> == Physical Plan ==
>> *(1) Scan ExistingRDD[id#0L,col1#1L]
>>
>> == Physical Plan ==
>> *(1) Scan ExistingRDD[id#4L,col2#5L]
>>
>> In [3]: df3 = df1.join(df2, df1['id'] == df2['id']).drop(df2['id'])
>>...: df4 = df2.join(df3, df1['id'] == df2['id'])
>>...: df4.explain()
>> == Physical Plan ==
>> AdaptiveSparkPlan isFinalPlan=false
>> +- SortMergeJoin [id#4L], [id#0L], Inner
>>:- Sort [id#4L ASC NULLS FIRST], false, 0
>>:  +- Exchange hashpartitioning(id#4L, 200), ENSURE_REQUIREMENTS,
>> [id=#53]
>>: +- Filter isnotnull(id#4L)
>>:+- Scan ExistingRDD[id#4L,col2#5L]
>>+- Project [id#0L, col1#1L, col2#20L]
>>   +- SortMergeJoin [id#0L], [id#19L], Inner
>>  :- Sort [id#0L ASC NULLS FIRST], false, 0
>>  :  +- Exchange hashpartitioning(id#0L, 200),
>> ENSURE_REQUIREMENTS, [id=#45]
>>  : +- Filter isnotnull(id#0L)
>>  :+- Scan ExistingRDD[id#0L,col1#1L]
>>
>>
>>
>> * +- Sort [id#19L ASC NULLS FIRST], false, 0+-
>> Exchange hashpartitioning(id#19L, 200), ENSURE_REQUIREMENTS, [id=#46]
>>  +- Filter isnotnull(id#19L)  +- Scan
>> ExistingRDD[id#19L,col2#20L]*
>>
>> In [4]: df1.createOrReplaceTempView('df1')
>>...: df2.createOrReplaceTempView('df2')
>>...: df3 = spark.sql("""
>>...: SELECT df1.id, df1.col1, df2.col2
>>...: FROM df1 JOIN df2 ON df1.id = df2.id
>>...: """)
>>...: df3.createOrReplaceTempView('df3')
>>...: df4 = spark.sql("""
>>...: SELECT df2.*, df3.*
>>...: FROM df2 JOIN df3 ON df2.id = df3.id
>>...: """)
>>...: df4.explain()
>> == Physical Plan ==
>> AdaptiveSparkPlan isFinalPlan=false
>> +- SortMergeJoin [id#4L], [id#0L], Inner
>>:- Sort [id#4L ASC NULLS FIRST], false, 0
>>:  +- Exchange hashpartitioning(id#4L, 200), ENSURE_REQUIREMENTS,
>> [id=#110]
>>: +- Filter isnotnull(id#4L)
>>:+- Scan ExistingRDD[id#4L,col2#5L]
>>+- Project [id#0L, col1#1L, col2#35L]
>>   +- SortMergeJoin [id#0L], [id#34L], Inner
>>  :- Sort [id#0L ASC NULLS FIRST], false, 0
>>  :  +- Exchange hashpartitioning(id#0L, 200),
>> ENSURE_REQUIREMENTS, [id=#102]
>>  : +- Filter isnotnull(id#0L)
>>  :+- Scan ExistingRDD[id#0L,col1#1L]
>>
>>
>>
>> * +- Sort [id#34L ASC NULLS FIRST], false, 0+-
>> Exchange hashpartitioning(id#34L, 200), ENSURE_REQUIREMENTS, [id=#103]
>>  +- Filter isnotnull(id#34L)  +- Scan
>> ExistingRDD[id#34L,col2#35L]*
>>
>>
>> Doing this in spark 3.1.1 - the plan is:
>>
>> *(8) SortMergeJoin [id#4L], [id#0L], Inner
>> :- *(2) Sort [id#4L ASC NULLS FIRST], false, 0
>> :  +- Exchange hashpartitioning(id#4L, 200), ENSURE_REQUIREMENTS, [id=#56]
>> : +- *(1) Filter isnotnull(id#4L)
>> :+- *(1) Scan ExistingRDD[id#4L,col2#5L]
>> +- *(7) Project [id#0L, col1#1L, col2#20L]
>>+- *(7) SortMergeJoin [id#0L], [id#19L], Inner
>>   :- *(4) Sort [id#0L ASC NULLS FIRST], false, 0
>>   :  +- Exchange hashpartitioning(id#0L, 

How to add a row number column with out reordering my data frame

2022-01-06 Thread Andrew Davidson
Hi

I am trying to work through a OOM error. I have 10411 files. I want to select a 
single column from each file and then join them into a single table.

The files have a row unique id. However it is a very long string. The data file 
with just the name and column of interest is about 470 M. The column of 
interest alone is 21 m. it is a column over 5 million real numbers.

So I thought I would save a lot of memory if I can join over row numbers.

# create dummy variable to orderby https://www.py4u.net/discuss/1840945
w = Window().orderBy(lit('A'))
sampleDF = sampleDF.select( ["NumReads"] )\
.withColumnRenamed( "NumReads", sampleName )\
.withColumn( "tid",row_number().over(w) )


This code seem pretty complicated as someone coming from pandas an R 
dataframes. My unit test works however it generates the following warning.



WARN WindowExec: No Partition Defined for Window operation! Moving all data to 
a single partition, this can cause serious performance degradation.


Is there a better  way to create a row number with our reordering my data? The 
order is important

Kind regards

Andy


Fwd: metastore bug when hive update spark table ?

2022-01-06 Thread Mich Talebzadeh
>From my experience this is an spark issue (more code base diversion on
spark-sql from Hive), but of course there is the work-around as below



-- Forwarded message -
From: Mich Talebzadeh 
Date: Thu, 6 Jan 2022 at 17:29
Subject: Re: metastore bug when hive update spark table ?
To: user 


Well I have seen this type of error before.

I tend to create the table in hive first and alter it in spark if needed.
This is spark 3.1.1 with Hive (version 3.1.1)

0: jdbc:hive2://rhes75:10099/default> create table my_table2 (col1 int,
col2 int)
0: jdbc:hive2://rhes75:10099/default> describe my_table2;
+---++--+
| col_name  | data_type  | comment  |
+---++--+
| col1  | int|  |
| col2  | int|  |
+---++--+
2 rows selected (0.17 seconds)

in Spark

>>> spark.sql("""ALTER TABLE my_table2 ADD column col3 string""")
DataFrame[]
>>> for c in spark.sql("""describe formatted my_table2 """).collect():
...   print(c)
...
*Row(col_name='col1', data_type='int', comment=None)*
*Row(col_name='col2', data_type='int', comment=None)*
*Row(col_name='col3', data_type='string', comment=None)*
Row(col_name='', data_type='', comment='')
Row(col_name='# Detailed Table Information', data_type='', comment='')
Row(col_name='Database', data_type='default', comment='')
Row(col_name='Table', data_type='my_table2', comment='')
Row(col_name='Owner', data_type='hduser', comment='')
Row(col_name='Created Time', data_type='Thu Jan 06 17:16:37 GMT 2022',
comment='')
Row(col_name='Last Access', data_type='UNKNOWN', comment='')
Row(col_name='Created By', data_type='Spark 2.2 or prior', comment='')
Row(col_name='Type', data_type='MANAGED', comment='')
Row(col_name='Provider', data_type='hive', comment='')
Row(col_name='Table Properties', data_type='[bucketing_version=2,
transient_lastDdlTime=1641489641]', comment='')
Row(col_name='Location',
data_type='hdfs://rhes75:9000/user/hive/warehouse/my_table2', comment='')
Row(col_name='Serde Library',
data_type='org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe', comment='')
Row(col_name='InputFormat',
data_type='org.apache.hadoop.mapred.TextInputFormat', comment='')
Row(col_name='OutputFormat',
data_type='org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat',
comment='')
Row(col_name='Storage Properties', data_type='[serialization.format=1]',
comment='')
Row(col_name='Partition Provider', data_type='Catalog', comment='')


This is my work around

HTH

   view my Linkedin profile




*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Thu, 6 Jan 2022 at 16:17, Nicolas Paris  wrote:

> Hi there.
>
> I also posted this problem in the spark list. I am no sure this is a
> spark or a hive metastore problem. Or if there is some metastore tunning
> configuration as workaround.
>
>
> Spark can't see hive schema updates partly because it stores the schema
> in a weird way in hive metastore.
>
>
> 1. FROM SPARK: create a table
> 
> >>> spark.sql("select 1 col1, 2
> col2").write.format("parquet").saveAsTable("my_table")
> >>> spark.table("my_table").printSchema()
> root
> |-- col1: integer (nullable = true)
> |-- col2: integer (nullable = true)
>
>
> 2. FROM HIVE: alter the schema
> ==
> 0: jdbc:hive2://localhost:1> ALTER TABLE my_table REPLACE
> COLUMNS(`col1` int, `col2` int, `col3` string);
> 0: jdbc:hive2://localhost:1> describe my_table;
> +---++--+
> | col_name | data_type | comment |
> +---++--+
> | col1 | int | |
> | col2 | int | |
> | col3 | string | |
> +---++--+
>
>
> 3. FROM SPARK: problem, column does not appear
> ==
> >>> spark.table("my_table").printSchema()
> root
> |-- col1: integer (nullable = true)
> |-- col2: integer (nullable = true)
>
>
> 4. FROM METASTORE DB: two ways of storing the columns
> ==
> metastore=# select * from "COLUMNS_V2";
> CD_ID | COMMENT | COLUMN_NAME | TYPE_NAME | INTEGER_IDX
> ---+-+-+---+-
> 2 | | col1 | int | 0
> 2 | | col2 | int | 1
> 2 | | col3 | string | 2
>
>
> metastore=# select * from "TABLE_PARAMS";
> TBL_ID | PARAM_KEY | PARAM_VALUE
>
>
> +---+-
> ---
> 1 | spark.sql.sources.provider | parquet
> 1 | spark.sql.sources.schema.part.0 |
>
> 

spark metadata metastore bug ?

2022-01-06 Thread Nicolas Paris
Spark can't see hive schema updates partly because it stores the schema
in a weird way in hive metastore.


1. FROM SPARK: create a table

>>> spark.sql("select 1 col1, 2 
>>> col2").write.format("parquet").saveAsTable("my_table")
>>> spark.table("my_table").printSchema()
root
 |-- col1: integer (nullable = true)
 |-- col2: integer (nullable = true)


2. FROM HIVE: alter the schema
==
0: jdbc:hive2://localhost:1> ALTER TABLE my_table REPLACE COLUMNS(`col1` 
int, `col2` int, `col3` string);
0: jdbc:hive2://localhost:1> describe my_table;
+---++--+
| col_name  | data_type  | comment  |
+---++--+
| col1  | int|  |
| col2  | int|  |
| col3  | string |  |
+---++--+


3. FROM SPARK: problem, column does not appear
==
>>> spark.table("my_table").printSchema()
root
 |-- col1: integer (nullable = true)
 |-- col2: integer (nullable = true)


4. FROM METASTORE DB: two ways of storing the columns
==
metastore=# select * from "COLUMNS_V2";
 CD_ID | COMMENT | COLUMN_NAME | TYPE_NAME | INTEGER_IDX
---+-+-+---+-
 2 | | col1| int   |   0
 2 | | col2| int   |   1
 2 | | col3| string|   2


metastore=# select * from "TABLE_PARAMS";
 TBL_ID | PARAM_KEY |   
 PARAM_VALUE

+---+-
---
  1 | spark.sql.sources.provider| parquet
  1 | spark.sql.sources.schema.part.0   | 
{"type":"struct","fields":[{"name":"col1","type":"integer","nullable":true,"metadata":{}},{"name":"col2","type":"integer","n
ullable":true,"metadata":{}}]}
  1 | spark.sql.create.version  | 2.4.8
  1 | spark.sql.sources.schema.numParts | 1
  1 | last_modified_time| 1641483180
  1 | transient_lastDdlTime | 1641483180
  1 | last_modified_by  | anonymous

metastore=# truncate "TABLE_PARAMS";
TRUNCATE TABLE


5. FROM SPARK: now the column magically appears
==
>>> spark.table("my_table").printSchema()
root
 |-- col1: integer (nullable = true)
 |-- col2: integer (nullable = true)
 |-- col3: string (nullable = true)


Then is it necessary to store that stuff in the TABLE_PARAMS ?


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: JDBCConnectionProvider in Spark

2022-01-06 Thread Sean Owen
They're in core/
under org.apache.spark.sql.execution.datasources.jdbc.connection
I don't quite understand, it's an abstraction over lots of concrete
implementations, just simple software design here.
You can implement your own provider too I suppose.

On Thu, Jan 6, 2022 at 8:22 AM Artemis User  wrote:

> The only example I saw in the Spark distribution was
> ExampleJdbcConnectionProvider file in the examples directory.  It basically
> just wraps the abstract class with overriding methods.  I guess my question
> was since Spark embeds the JDBC APIs in the DataFrame reader and writer,
> why such provider API is still needed?  Is there any use cases for using
> the provider API instead of the dataframe reader/writer when dealing with
> JDBC?  Thanks!
>
> On 1/6/22 9:09 AM, Sean Owen wrote:
>
> There are 8 concrete implementations of it? OracleConnectionProvider, etc
>
> On Wed, Jan 5, 2022 at 9:26 PM Artemis User 
> wrote:
>
>> Could someone provide some insight/examples on the usage of this API?
>>
>> https://spark.apache.org/docs/latest/api/scala/org/apache/spark/sql/jdbc/JdbcConnectionProvider.html
>>
>> Why is it needed since this is an abstract class and there isn't any
>> concrete implementation of it?   Thanks a lot in advance.
>>
>> -- ND
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>
>


Re: JDBCConnectionProvider in Spark

2022-01-06 Thread Artemis User
The only example I saw in the Spark distribution was 
ExampleJdbcConnectionProvider file in the examples directory.  It 
basically just wraps the abstract class with overriding methods.  I 
guess my question was since Spark embeds the JDBC APIs in the DataFrame 
reader and writer, why such provider API is still needed? Is there any 
use cases for using the provider API instead of the dataframe 
reader/writer when dealing with JDBC?  Thanks!


On 1/6/22 9:09 AM, Sean Owen wrote:

There are 8 concrete implementations of it? OracleConnectionProvider, etc

On Wed, Jan 5, 2022 at 9:26 PM Artemis User  
wrote:


Could someone provide some insight/examples on the usage of this API?

https://spark.apache.org/docs/latest/api/scala/org/apache/spark/sql/jdbc/JdbcConnectionProvider.html

Why is it needed since this is an abstract class and there isn't any
concrete implementation of it?   Thanks a lot in advance.

-- ND

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: JDBCConnectionProvider in Spark

2022-01-06 Thread Sean Owen
There are 8 concrete implementations of it? OracleConnectionProvider, etc

On Wed, Jan 5, 2022 at 9:26 PM Artemis User  wrote:

> Could someone provide some insight/examples on the usage of this API?
>
> https://spark.apache.org/docs/latest/api/scala/org/apache/spark/sql/jdbc/JdbcConnectionProvider.html
>
> Why is it needed since this is an abstract class and there isn't any
> concrete implementation of it?   Thanks a lot in advance.
>
> -- ND
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: pyspark

2022-01-06 Thread Gourav Sengupta
Hi,

I am not sure at all that we need to use SQLContext and HiveContext
anymore.

Can you please check your JAVA_HOME, and SPARK_HOME? I use findspark
library to enable all environment variables for me regarding spark, or use
conda to install pyspark using conda-forge


Regards,
Gourav Sengupta


On Wed, Jan 5, 2022 at 4:08 PM Mich Talebzadeh 
wrote:

> hm,
>
> If I understand correctly
>
> from pyspark.sql import SparkSession
> from pyspark import SparkContext
> from pyspark.sql import SQLContext, HiveContext
> import sys
>
> def spark_session(appName):
>   return SparkSession.builder \
> .appName(appName) \
> .enableHiveSupport() \
> .getOrCreate()
>
> def sparkcontext():
>   return SparkContext.getOrCreate()
>
> def hivecontext():
>   return HiveContext(sparkcontext())
>
>
> HTH
>
>
>view my Linkedin profile
> 
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Wed, 5 Jan 2022 at 16:00, 流年以东” <2538974...@qq.com.invalid> wrote:
>
>>
>> In the process of using pyspark,there is no spark context when opening
>> jupyter and input sc.master show that sc is not define.we want to
>> initialize the spark context with script. this is error.
>> hope to receive your reply
>> --
>> 发自我的iPhone
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>