Re: Unsubscribe

2022-03-11 Thread Bitfox
please send an empty email to:
user-unsubscr...@spark.apache.org
to unsubscribe yourself from the list.


On Sat, Mar 12, 2022 at 2:42 PM Aziret Satybaldiev <
satybaldiev.azi...@gmail.com> wrote:

>


Unsubscribe

2022-03-11 Thread Aziret Satybaldiev



unsubscribe

2022-03-11 Thread Basavaraj



smime.p7s
Description: S/MIME cryptographic signature


StructuredStreaming - processing data based on new events in Kafka topic

2022-03-11 Thread karan alang
Hello All,

I have a structured Streaming program, which reads data from Kafka topic,
and does some processing, and finally puts data into target Kafka Topic.

Note : the processing is done in function - convertToDictForEachBatch(),
which is called using - foreachBatch(convertToDictForEachBatch)

As part of the processing, it reads another Kafka Topic (events_topic), and
if there is New record(s) after the last read, it does some additional
processing - reloads data from BigQuery table, and persists it.

Here is the code :

```

df_stream = spark.readStream.format('kafka') \
.option("kafka.security.protocol", "SSL") \
.option("kafka.ssl.truststore.location", ssl_truststore_location) \
.option("kafka.ssl.truststore.password", ssl_truststore_password) \
.option("kafka.ssl.keystore.location", ssl_keystore_location) \
.option("kafka.ssl.keystore.password", ssl_keystore_password) \
.option("kafka.bootstrap.servers",kafkaBrokers)\
.option("subscribe", topic) \
.option("kafka.group.id", consumerGroupId)\
.option("startingOffsets", "latest") \
.option("failOnDataLoss", "false") \
.option("maxOffsetsPerTrigger", 1) \
.load()


print(" df_stream -> ", df_stream)
query = df_stream.selectExpr("CAST(value AS STRING)",
"timestamp").writeStream \
.outputMode("append") \
.trigger(processingTime='4 minutes') \
.option("numRows",1)\
.option("truncate", "false") \
.option("checkpointLocation", checkpoint) \
.foreachBatch(convertToDictForEachBatch) \
.start()

query.awaitTermination()

```

# called from - foreachbatch
def convertToDictForEachBatch(df, batchId):

# checks for event in topic - events_topic and further processing
takes place if there is new data in the topic
events = spark.read.format('kafka') \
.option("kafka.bootstrap.servers", kafkaBrokers) \
.option("kafka.security.protocol", "SSL") \
.option("kafka.ssl.truststore.location", ssl_truststore_location) \
.option("kafka.ssl.truststore.password", ssl_truststore_password) \
.option("kafka.ssl.keystore.location",
ssl_keystore_location_reloadpred) \
.option("kafka.ssl.keystore.password",
ssl_keystore_password_reloadpred) \
.option("subscribe", topic_reloadpred) \
.option("kafka.group.id", consumerGroupId_reloadpred) \
.load()

# events is passed to a function, and processing is done if new
events are generated

```

What is the best way to achieve this ? The current code is reading the
entire data in the kafka topic, i need it to read only the new data.

Additional Details in stackoverflow :

https://stackoverflow.com/questions/71446023/structuredstreaming-processing-data-based-on-new-events-in-kafka-topic


tia!


Re: Show create table on a Hive Table in Spark SQL - Treats CHAR, VARCHAR as STRING

2022-03-11 Thread Venkatesan Muniappan
ok. I work for an org where such upgrades take a few months. Not an
immediate task.

Thanks,
Venkat
2016173438


On Fri, Mar 11, 2022 at 4:38 PM Mich Talebzadeh 
wrote:

> yes in spark 3.1.1. Best to upgrade it to spark 3+.
>
>
>
>view my Linkedin profile
> 
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Fri, 11 Mar 2022 at 21:35, Venkatesan Muniappan <
> m.venkatbe...@gmail.com> wrote:
>
>> Thank you. I am trying to get the table definition for the existing
>> tables. BTW, the create and show command that you executed, was it on Spark
>> 3.x ? .
>>
>> Thanks,
>> Venkat
>> 2016173438
>>
>>
>> On Fri, Mar 11, 2022 at 4:28 PM Mich Talebzadeh <
>> mich.talebza...@gmail.com> wrote:
>>
>>> Well I do not know what has changed. However, this should not affect
>>> your work.
>>>
>>>
>>> Try to create table in Spark
>>>
>>>
>>> sqltext: String =
>>>
>>>   CREATE TABLE if not exists test.etcs(
>>>
>>>  ID INT
>>>
>>>, CLUSTERED INT
>>>
>>>, SCATTERED INT
>>>
>>>, RANDOMISED INT
>>>
>>>, RANDOM_STRING VARCHAR(50)
>>>
>>>, SMALL_VC VARCHAR(10)
>>>
>>>, PADDING  VARCHAR(4000)
>>>
>>>, PADDING2 STRING
>>>
>>>   )
>>>
>>>   CLUSTERED BY (ID) INTO 256 BUCKETS
>>>
>>>   STORED AS PARQUET
>>>
>>>   TBLPROPERTIES (
>>>
>>>   "parquet.compress"="SNAPPY"
>>>
>>>  )
>>>
>>>
>>> scala> spark.sql (sqltext)
>>>
>>> scala> spark.sql("show create table test.etcs").show(false)
>>>
>>>
>>> ++
>>>
>>> |createtab_stmt
>>>
>>>
>>>
>>> |
>>>
>>>
>>> ++
>>>
>>> |CREATE TABLE `test`.`etcs` (
>>>
>>>   `ID` INT,
>>>
>>>   `CLUSTERED` INT,
>>>
>>>   `SCATTERED` INT,
>>>
>>>   `RANDOMISED` INT,
>>>
>>>   `RANDOM_STRING` VARCHAR(50),
>>>
>>>   `SMALL_VC` VARCHAR(10),
>>>
>>>   `PADDING` VARCHAR(4000),
>>>
>>>   `PADDING2` STRING)
>>>
>>> USING parquet
>>>
>>> CLUSTERED BY (ID)
>>>
>>> INTO 256 BUCKETS
>>>
>>> TBLPROPERTIES (
>>>
>>>   'transient_lastDdlTime' = '1647033659',
>>>
>>>   'parquet.compress' = 'SNAPPY')
>>>
>>> |
>>>
>>>
>>> +--
>>>
>>>
>>> Note that columns are OK.
>>>
>>>
>>> Also check this link for the differences between CHAR, VARCHAR and
>>> STRING types in Hive
>>>
>>>
>>> https://cwiki.apache.org/confluence/display/hive/languagemanual+types
>>>
>>>
>>> HTH
>>>
>>>
>>>
>>>   view my Linkedin profile
>>> 
>>>
>>>
>>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>>
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>>
>>> On Fri, 11 Mar 2022 at 20:55, Venkatesan Muniappan <
>>> m.venkatbe...@gmail.com> wrote:
>>>
 Thank you Mich Talebzadeh for your answer. It's good to know that
 VARCHAR and CHAR are properly showing in Spark 3. Do you know what changed
 in Spark 3 that made this possible?. Or how can I achieve the same output
 in Spark 2.4.1? If there are some conf options, that would be helpful.

 Thanks,
 Venkat
 2016173438


 On Fri, Mar 11, 2022 at 2:06 PM Mich Talebzadeh <
 mich.talebza...@gmail.com> wrote:

> Hive 3.1.1
> Spark 3.1.1
>
> Your stack overflow issue raised and I quote:
>
> "I have a need to generate DDL statements for Hive tables & views
> programmatically. I tried using Spark and Beeline for this task. Beeline
> takes around 5-10 seconds for each of the statements whereas Spark
> completes the same thing in a few milliseconds. I am planning to use Spark
> since it is 

Re: Show create table on a Hive Table in Spark SQL - Treats CHAR, VARCHAR as STRING

2022-03-11 Thread Venkatesan Muniappan
Thank you. I am trying to get the table definition for the existing tables.
BTW, the create and show command that you executed, was it on Spark 3.x ? .

Thanks,
Venkat
2016173438


On Fri, Mar 11, 2022 at 4:28 PM Mich Talebzadeh 
wrote:

> Well I do not know what has changed. However, this should not affect your
> work.
>
>
> Try to create table in Spark
>
>
> sqltext: String =
>
>   CREATE TABLE if not exists test.etcs(
>
>  ID INT
>
>, CLUSTERED INT
>
>, SCATTERED INT
>
>, RANDOMISED INT
>
>, RANDOM_STRING VARCHAR(50)
>
>, SMALL_VC VARCHAR(10)
>
>, PADDING  VARCHAR(4000)
>
>, PADDING2 STRING
>
>   )
>
>   CLUSTERED BY (ID) INTO 256 BUCKETS
>
>   STORED AS PARQUET
>
>   TBLPROPERTIES (
>
>   "parquet.compress"="SNAPPY"
>
>  )
>
>
> scala> spark.sql (sqltext)
>
> scala> spark.sql("show create table test.etcs").show(false)
>
>
> ++
>
> |createtab_stmt
>
>
>
>   |
>
>
> ++
>
> |CREATE TABLE `test`.`etcs` (
>
>   `ID` INT,
>
>   `CLUSTERED` INT,
>
>   `SCATTERED` INT,
>
>   `RANDOMISED` INT,
>
>   `RANDOM_STRING` VARCHAR(50),
>
>   `SMALL_VC` VARCHAR(10),
>
>   `PADDING` VARCHAR(4000),
>
>   `PADDING2` STRING)
>
> USING parquet
>
> CLUSTERED BY (ID)
>
> INTO 256 BUCKETS
>
> TBLPROPERTIES (
>
>   'transient_lastDdlTime' = '1647033659',
>
>   'parquet.compress' = 'SNAPPY')
>
> |
>
>
> +--
>
>
> Note that columns are OK.
>
>
> Also check this link for the differences between CHAR, VARCHAR and STRING
> types in Hive
>
>
> https://cwiki.apache.org/confluence/display/hive/languagemanual+types
>
>
> HTH
>
>
>
>   view my Linkedin profile
> 
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Fri, 11 Mar 2022 at 20:55, Venkatesan Muniappan <
> m.venkatbe...@gmail.com> wrote:
>
>> Thank you Mich Talebzadeh for your answer. It's good to know that VARCHAR
>> and CHAR are properly showing in Spark 3. Do you know what changed in Spark
>> 3 that made this possible?. Or how can I achieve the same output in Spark
>> 2.4.1? If there are some conf options, that would be helpful.
>>
>> Thanks,
>> Venkat
>> 2016173438
>>
>>
>> On Fri, Mar 11, 2022 at 2:06 PM Mich Talebzadeh <
>> mich.talebza...@gmail.com> wrote:
>>
>>> Hive 3.1.1
>>> Spark 3.1.1
>>>
>>> Your stack overflow issue raised and I quote:
>>>
>>> "I have a need to generate DDL statements for Hive tables & views
>>> programmatically. I tried using Spark and Beeline for this task. Beeline
>>> takes around 5-10 seconds for each of the statements whereas Spark
>>> completes the same thing in a few milliseconds. I am planning to use Spark
>>> since it is faster compared to beeline. One downside of using spark for
>>> getting DDL statements from the hive is, it treats CHAR, VARCHAR characters
>>> as String and it doesn't preserve the length information that goes with
>>> CHAR,VARCHAR data types. At the same time beeline preserves the data type
>>> and the length information for CHAR,VARCHAR data types. *I am using
>>> Spark 2.4.1 and Beeline 2.1.1.*
>>>
>>> Given below the sample create table command and its show create table
>>> output."
>>>
>>> Create a simple table in *Hive* in test database
>>>
>>> hive> *use test;*
>>> OK
>>> hive> *create table etc(ID BIGINT, col1 VARCHAR(30), col2 STRING);*
>>> OK
>>> hive> *desc formatted etc;*
>>> # col_name  data_type   comment
>>> *id  bigint*
>>> *col1varchar(30)*
>>> *col2string*
>>>
>>> # Detailed Table Information
>>> Database:   test
>>> OwnerType:  USER
>>> Owner:  hduser
>>> CreateTime: Fri Mar 11 18:29:34 GMT 2022
>>> LastAccessTime: UNKNOWN
>>> Retention:  0
>>> Location:
>>>  

Re: Show create table on a Hive Table in Spark SQL - Treats CHAR, VARCHAR as STRING

2022-03-11 Thread Mich Talebzadeh
yes in spark 3.1.1. Best to upgrade it to spark 3+.



   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Fri, 11 Mar 2022 at 21:35, Venkatesan Muniappan 
wrote:

> Thank you. I am trying to get the table definition for the existing
> tables. BTW, the create and show command that you executed, was it on Spark
> 3.x ? .
>
> Thanks,
> Venkat
> 2016173438
>
>
> On Fri, Mar 11, 2022 at 4:28 PM Mich Talebzadeh 
> wrote:
>
>> Well I do not know what has changed. However, this should not affect your
>> work.
>>
>>
>> Try to create table in Spark
>>
>>
>> sqltext: String =
>>
>>   CREATE TABLE if not exists test.etcs(
>>
>>  ID INT
>>
>>, CLUSTERED INT
>>
>>, SCATTERED INT
>>
>>, RANDOMISED INT
>>
>>, RANDOM_STRING VARCHAR(50)
>>
>>, SMALL_VC VARCHAR(10)
>>
>>, PADDING  VARCHAR(4000)
>>
>>, PADDING2 STRING
>>
>>   )
>>
>>   CLUSTERED BY (ID) INTO 256 BUCKETS
>>
>>   STORED AS PARQUET
>>
>>   TBLPROPERTIES (
>>
>>   "parquet.compress"="SNAPPY"
>>
>>  )
>>
>>
>> scala> spark.sql (sqltext)
>>
>> scala> spark.sql("show create table test.etcs").show(false)
>>
>>
>> ++
>>
>> |createtab_stmt
>>
>>
>>
>> |
>>
>>
>> ++
>>
>> |CREATE TABLE `test`.`etcs` (
>>
>>   `ID` INT,
>>
>>   `CLUSTERED` INT,
>>
>>   `SCATTERED` INT,
>>
>>   `RANDOMISED` INT,
>>
>>   `RANDOM_STRING` VARCHAR(50),
>>
>>   `SMALL_VC` VARCHAR(10),
>>
>>   `PADDING` VARCHAR(4000),
>>
>>   `PADDING2` STRING)
>>
>> USING parquet
>>
>> CLUSTERED BY (ID)
>>
>> INTO 256 BUCKETS
>>
>> TBLPROPERTIES (
>>
>>   'transient_lastDdlTime' = '1647033659',
>>
>>   'parquet.compress' = 'SNAPPY')
>>
>> |
>>
>>
>> +--
>>
>>
>> Note that columns are OK.
>>
>>
>> Also check this link for the differences between CHAR, VARCHAR and STRING
>> types in Hive
>>
>>
>> https://cwiki.apache.org/confluence/display/hive/languagemanual+types
>>
>>
>> HTH
>>
>>
>>
>>   view my Linkedin profile
>> 
>>
>>
>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> On Fri, 11 Mar 2022 at 20:55, Venkatesan Muniappan <
>> m.venkatbe...@gmail.com> wrote:
>>
>>> Thank you Mich Talebzadeh for your answer. It's good to know that
>>> VARCHAR and CHAR are properly showing in Spark 3. Do you know what changed
>>> in Spark 3 that made this possible?. Or how can I achieve the same output
>>> in Spark 2.4.1? If there are some conf options, that would be helpful.
>>>
>>> Thanks,
>>> Venkat
>>> 2016173438
>>>
>>>
>>> On Fri, Mar 11, 2022 at 2:06 PM Mich Talebzadeh <
>>> mich.talebza...@gmail.com> wrote:
>>>
 Hive 3.1.1
 Spark 3.1.1

 Your stack overflow issue raised and I quote:

 "I have a need to generate DDL statements for Hive tables & views
 programmatically. I tried using Spark and Beeline for this task. Beeline
 takes around 5-10 seconds for each of the statements whereas Spark
 completes the same thing in a few milliseconds. I am planning to use Spark
 since it is faster compared to beeline. One downside of using spark for
 getting DDL statements from the hive is, it treats CHAR, VARCHAR characters
 as String and it doesn't preserve the length information that goes with
 CHAR,VARCHAR data types. At the same time beeline preserves the data type
 and the length information for CHAR,VARCHAR data types. *I am using
 Spark 2.4.1 and Beeline 2.1.1.*

 

Re: Show create table on a Hive Table in Spark SQL - Treats CHAR, VARCHAR as STRING

2022-03-11 Thread Mich Talebzadeh
Well I do not know what has changed. However, this should not affect your
work.


Try to create table in Spark


sqltext: String =

  CREATE TABLE if not exists test.etcs(

 ID INT

   , CLUSTERED INT

   , SCATTERED INT

   , RANDOMISED INT

   , RANDOM_STRING VARCHAR(50)

   , SMALL_VC VARCHAR(10)

   , PADDING  VARCHAR(4000)

   , PADDING2 STRING

  )

  CLUSTERED BY (ID) INTO 256 BUCKETS

  STORED AS PARQUET

  TBLPROPERTIES (

  "parquet.compress"="SNAPPY"

 )


scala> spark.sql (sqltext)

scala> spark.sql("show create table test.etcs").show(false)

++

|createtab_stmt



  |

++

|CREATE TABLE `test`.`etcs` (

  `ID` INT,

  `CLUSTERED` INT,

  `SCATTERED` INT,

  `RANDOMISED` INT,

  `RANDOM_STRING` VARCHAR(50),

  `SMALL_VC` VARCHAR(10),

  `PADDING` VARCHAR(4000),

  `PADDING2` STRING)

USING parquet

CLUSTERED BY (ID)

INTO 256 BUCKETS

TBLPROPERTIES (

  'transient_lastDdlTime' = '1647033659',

  'parquet.compress' = 'SNAPPY')

|

+--


Note that columns are OK.


Also check this link for the differences between CHAR, VARCHAR and STRING
types in Hive


https://cwiki.apache.org/confluence/display/hive/languagemanual+types


HTH



  view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Fri, 11 Mar 2022 at 20:55, Venkatesan Muniappan 
wrote:

> Thank you Mich Talebzadeh for your answer. It's good to know that VARCHAR
> and CHAR are properly showing in Spark 3. Do you know what changed in Spark
> 3 that made this possible?. Or how can I achieve the same output in Spark
> 2.4.1? If there are some conf options, that would be helpful.
>
> Thanks,
> Venkat
> 2016173438
>
>
> On Fri, Mar 11, 2022 at 2:06 PM Mich Talebzadeh 
> wrote:
>
>> Hive 3.1.1
>> Spark 3.1.1
>>
>> Your stack overflow issue raised and I quote:
>>
>> "I have a need to generate DDL statements for Hive tables & views
>> programmatically. I tried using Spark and Beeline for this task. Beeline
>> takes around 5-10 seconds for each of the statements whereas Spark
>> completes the same thing in a few milliseconds. I am planning to use Spark
>> since it is faster compared to beeline. One downside of using spark for
>> getting DDL statements from the hive is, it treats CHAR, VARCHAR characters
>> as String and it doesn't preserve the length information that goes with
>> CHAR,VARCHAR data types. At the same time beeline preserves the data type
>> and the length information for CHAR,VARCHAR data types. *I am using
>> Spark 2.4.1 and Beeline 2.1.1.*
>>
>> Given below the sample create table command and its show create table
>> output."
>>
>> Create a simple table in *Hive* in test database
>>
>> hive> *use test;*
>> OK
>> hive> *create table etc(ID BIGINT, col1 VARCHAR(30), col2 STRING);*
>> OK
>> hive> *desc formatted etc;*
>> # col_name  data_type   comment
>> *id  bigint*
>> *col1varchar(30)*
>> *col2string*
>>
>> # Detailed Table Information
>> Database:   test
>> OwnerType:  USER
>> Owner:  hduser
>> CreateTime: Fri Mar 11 18:29:34 GMT 2022
>> LastAccessTime: UNKNOWN
>> Retention:  0
>> Location:   hdfs://rhes75:9000/user/hive/warehouse/test.db/etc
>> Table Type: MANAGED_TABLE
>> Table Parameters:
>> COLUMN_STATS_ACCURATE
>>  
>> {\"BASIC_STATS\":\"true\",\"COLUMN_STATS\":{\"col1\":\"true\",\"col2\":\"true\",\"id\":\"true\"}}
>> bucketing_version   2
>> numFiles0
>> numRows 0
>> rawDataSize 0
>> totalSize   0
>> transient_lastDdlTime   1647023374
>>
>> # Storage Information
>> SerDe Library:  

Re: [SPARK-38438] pyspark - how to update spark.jars.packages on existing default context?

2022-03-11 Thread Artemis User
OK, I see the confusions in terminologies.  However, what were suggested 
should still work.  A Luigi worker in this case would function like a 
Spark client, responsible for submitting a Spark application (or job in 
Luigi's term).  In other words, you just define all necessary jars for 
all your jobs in your SparkContext (or to make things easier, define in 
the spark-default.conf file or just place them in the spark's jars 
directory).  This should work 100% especially when you don't know which 
"job" (should be called application or task) needs which jars in advance.


For other questions unrelated to this discussion, I'd suggest starting a 
new thread to make things clear. Thanks!


On 3/11/22 1:09 PM, Rafał Wojdyła wrote:
I don't know why I don't see my last message in the thread here: 
https://lists.apache.org/thread/5wgdqp746nj4f6ovdl42rt82wc8ltkcn
Also don't get messages from Artemis in my mail, I can only see them 
in the thread web UI, which is very confusing.
On top of that when I click on "reply via your own email client" in 
the web UI, I get: Bad Request Error 400


Anyways to answer to your last comment Artemis:

> I guess there are several misconceptions here:

There's no confusion on my side, all that makes sense. When I said 
"worker" in that comment I meant the scheduler worker not Spark 
worker, which in the Spark realm would be the client.
Everything else you said is undoubtedly correct, but unrelated to the 
issue/problem at hand.


Sean, Artemis - I appreciate your feedback about the infra setup, but 
it's beside the problem behind this issue.


Let me describe a simpler setup/example with the same problem, say:
 1. I have a jupyter notebook
 2. use local/driver spark mode only
 3. I start the driver, process some data, store it in pandas dataframe
 4. now say I want to add a package to spark driver (or increase the 
JVM memory etc)


There's currently no way to do the step 4 without restarting the 
notebook process which holds the "reference" to the Spark driver/JVM. 
If I restart the Jupter notebook I would lose all the data in memory 
(e.g. pandas data), ofc I can save that data to e.g. disk but that's 
beside the point.


I understand you don't want to provide this functionality in Spark, 
nor warn users on changes in Spark Configuration that won't actually 
work - as a user I wish I could get at least a warning in that case, 
but I respect your decision. It seems like the workaround to shutdown 
the JVM works in this case, I would much appreciate your feedback 
about **that specific workaround** please. Any reason not to use it?

Cheers - Rafal

On Thu, 10 Mar 2022 at 18:50, Rafał Wojdyła  wrote:

If you have a long running python orchestrator worker (e.g. Luigi
worker), and say it's gets a DAG of A -> B ->C, and say the worker
first creates a spark driver for A (which doesn't need extra
jars/packages), then it gets B which is also a spark job but it
needs an extra package, it won't be able to create a new spark
driver with extra packages since it's "not possible" to create a
new driver JVM. I would argue it's the same scenario if you have
multiple spark jobs that need different amounts of memory or
anything that requires JVM restart. Of course I can use the
workaround to shut down the driver/JVM, do you have any feedback
about that workaround (see my previous comment or the issue).

On Thu, 10 Mar 2022 at 18:12, Sean Owen  wrote:

Wouldn't these be separately submitted jobs for separate
workloads? You can of course dynamically change each job
submitted to have whatever packages you like, from whatever is
orchestrating. A single job doing everything sound right.

On Thu, Mar 10, 2022, 12:05 PM Rafał Wojdyła
 wrote:

Because I can't (and should not) know ahead of time which
jobs will be executed, that's the job of the orchestration
layer (and can be dynamic). I know I can specify multiple
packages. Also not worried about memory.

On Thu, 10 Mar 2022 at 13:54, Artemis User
 wrote:

If changing packages or jars isn't your concern, why
not just specify ALL packages that you would need for
the Spark environment?  You know you can define
multiple packages under the packages option.  This
shouldn't cause memory issues since JVM uses dynamic
class loading...

On 3/9/22 10:03 PM, Rafał Wojdyła wrote:

Hi Artemis,
Thanks for your input, to answer your questions:

> You may want to ask yourself why it is necessary to
change the jar packages during runtime.

I have a long running orchestrator process, which
executes multiple spark jobs, currently on a single
VM/driver, some of those jobs might require 

Re: Show create table on a Hive Table in Spark SQL - Treats CHAR, VARCHAR as STRING

2022-03-11 Thread Venkatesan Muniappan
Thank you Mich Talebzadeh for your answer. It's good to know that VARCHAR
and CHAR are properly showing in Spark 3. Do you know what changed in Spark
3 that made this possible?. Or how can I achieve the same output in Spark
2.4.1? If there are some conf options, that would be helpful.

Thanks,
Venkat
2016173438


On Fri, Mar 11, 2022 at 2:06 PM Mich Talebzadeh 
wrote:

> Hive 3.1.1
> Spark 3.1.1
>
> Your stack overflow issue raised and I quote:
>
> "I have a need to generate DDL statements for Hive tables & views
> programmatically. I tried using Spark and Beeline for this task. Beeline
> takes around 5-10 seconds for each of the statements whereas Spark
> completes the same thing in a few milliseconds. I am planning to use Spark
> since it is faster compared to beeline. One downside of using spark for
> getting DDL statements from the hive is, it treats CHAR, VARCHAR characters
> as String and it doesn't preserve the length information that goes with
> CHAR,VARCHAR data types. At the same time beeline preserves the data type
> and the length information for CHAR,VARCHAR data types. *I am using Spark
> 2.4.1 and Beeline 2.1.1.*
>
> Given below the sample create table command and its show create table
> output."
>
> Create a simple table in *Hive* in test database
>
> hive> *use test;*
> OK
> hive> *create table etc(ID BIGINT, col1 VARCHAR(30), col2 STRING);*
> OK
> hive> *desc formatted etc;*
> # col_name  data_type   comment
> *id  bigint*
> *col1varchar(30)*
> *col2string*
>
> # Detailed Table Information
> Database:   test
> OwnerType:  USER
> Owner:  hduser
> CreateTime: Fri Mar 11 18:29:34 GMT 2022
> LastAccessTime: UNKNOWN
> Retention:  0
> Location:   hdfs://rhes75:9000/user/hive/warehouse/test.db/etc
> Table Type: MANAGED_TABLE
> Table Parameters:
> COLUMN_STATS_ACCURATE
>  
> {\"BASIC_STATS\":\"true\",\"COLUMN_STATS\":{\"col1\":\"true\",\"col2\":\"true\",\"id\":\"true\"}}
> bucketing_version   2
> numFiles0
> numRows 0
> rawDataSize 0
> totalSize   0
> transient_lastDdlTime   1647023374
>
> # Storage Information
> SerDe Library:  org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
> InputFormat:org.apache.hadoop.mapred.TextInputFormat
> OutputFormat:
>  org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
> Compressed: No
> Num Buckets:-1
> Bucket Columns: []
> Sort Columns:   []
> Storage Desc Params:
> serialization.format1
>
> Now let's go to spark-shell
>   ^
> scala> *spark.sql("show create table test.etc").show(false)*
>
> ++
> |createtab_stmt
>
>   |
>
> ++
> |CREATE TABLE `test`.`etc` (
> *  `id` BIGINT,*
> *  `col1` VARCHAR(30),*
> *  `col2` STRING)*
> USING text
> TBLPROPERTIES (
>   'bucketing_version' = '2',
>   'transient_lastDdlTime' = '1647023374')
> |
>
> ++
>
> You can see Spark shows columns correctly
>
> Now let us go and create the same table in hive through beeline
>
>
> 0: jdbc:hive2://rhes75:10099/default>* use test*
>
> No rows affected (0.019 seconds)
>
> 0: jdbc:hive2://rhes75:10099/default> *create table etc(ID BIGINT, col1
> VARCHAR(30), col2 STRING)*
>
> . . . . . . . . . . . . . . . . . . > No rows affected (0.304 seconds)
>
> 0: jdbc:hive2://rhes75:10099/default> *desc formatted etc*
>
> . . . . . . . . . . . . . . . . . . >
> +---+++
>
> |   col_name| data_type
>   |  comment   |
>
>
> +---+++
>
> | # col_name| data_type
>   | comment|
>
> *| id| bigint
>||*
>
> *| col1  | varchar(30)
> ||*
>
> *| col2  

Re: Show create table on a Hive Table in Spark SQL - Treats CHAR, VARCHAR as STRING

2022-03-11 Thread Mich Talebzadeh
Hive 3.1.1
Spark 3.1.1

Your stack overflow issue raised and I quote:

"I have a need to generate DDL statements for Hive tables & views
programmatically. I tried using Spark and Beeline for this task. Beeline
takes around 5-10 seconds for each of the statements whereas Spark
completes the same thing in a few milliseconds. I am planning to use Spark
since it is faster compared to beeline. One downside of using spark for
getting DDL statements from the hive is, it treats CHAR, VARCHAR characters
as String and it doesn't preserve the length information that goes with
CHAR,VARCHAR data types. At the same time beeline preserves the data type
and the length information for CHAR,VARCHAR data types. *I am using Spark
2.4.1 and Beeline 2.1.1.*

Given below the sample create table command and its show create table
output."

Create a simple table in *Hive* in test database

hive> *use test;*
OK
hive> *create table etc(ID BIGINT, col1 VARCHAR(30), col2 STRING);*
OK
hive> *desc formatted etc;*
# col_name  data_type   comment
*id  bigint*
*col1varchar(30)*
*col2string*

# Detailed Table Information
Database:   test
OwnerType:  USER
Owner:  hduser
CreateTime: Fri Mar 11 18:29:34 GMT 2022
LastAccessTime: UNKNOWN
Retention:  0
Location:   hdfs://rhes75:9000/user/hive/warehouse/test.db/etc
Table Type: MANAGED_TABLE
Table Parameters:
COLUMN_STATS_ACCURATE
 
{\"BASIC_STATS\":\"true\",\"COLUMN_STATS\":{\"col1\":\"true\",\"col2\":\"true\",\"id\":\"true\"}}
bucketing_version   2
numFiles0
numRows 0
rawDataSize 0
totalSize   0
transient_lastDdlTime   1647023374

# Storage Information
SerDe Library:  org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
InputFormat:org.apache.hadoop.mapred.TextInputFormat
OutputFormat:
 org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
Compressed: No
Num Buckets:-1
Bucket Columns: []
Sort Columns:   []
Storage Desc Params:
serialization.format1

Now let's go to spark-shell
  ^
scala> *spark.sql("show create table test.etc").show(false)*
++
|createtab_stmt

  |
++
|CREATE TABLE `test`.`etc` (
*  `id` BIGINT,*
*  `col1` VARCHAR(30),*
*  `col2` STRING)*
USING text
TBLPROPERTIES (
  'bucketing_version' = '2',
  'transient_lastDdlTime' = '1647023374')
|
++

You can see Spark shows columns correctly

Now let us go and create the same table in hive through beeline


0: jdbc:hive2://rhes75:10099/default>* use test*

No rows affected (0.019 seconds)

0: jdbc:hive2://rhes75:10099/default> *create table etc(ID BIGINT, col1
VARCHAR(30), col2 STRING)*

. . . . . . . . . . . . . . . . . . > No rows affected (0.304 seconds)

0: jdbc:hive2://rhes75:10099/default> *desc formatted etc*

. . . . . . . . . . . . . . . . . . >
+---+++

|   col_name| data_type
  |  comment   |

+---+++

| # col_name| data_type
  | comment|

*| id| bigint
   ||*

*| col1  | varchar(30)
  ||*

*| col2  | string
   ||*

|   | NULL
 | NULL   |

| # Detailed Table Information  | NULL
 | NULL   |

| Database: | test
 | NULL   |

| OwnerType:| USER
 | NULL   |

| Owner:| hduser
 | NULL 

Show create table on a Hive Table in Spark SQL - Treats CHAR, VARCHAR as STRING

2022-03-11 Thread Venkatesan Muniappan
hi Spark Team,

I have raised a question on Spark through Stackoverflow. When you get a
chance, can you please take a look and help me ?.

https://stackoverflow.com/q/71431757/5927843

Thanks,
Venkat
2016173438


Re: [SPARK-38438] pyspark - how to update spark.jars.packages on existing default context?

2022-03-11 Thread Rafał Wojdyła
I don't know why I don't see my last message in the thread here:
https://lists.apache.org/thread/5wgdqp746nj4f6ovdl42rt82wc8ltkcn
Also don't get messages from Artemis in my mail, I can only see them in the
thread web UI, which is very confusing.
On top of that when I click on "reply via your own email client" in the web
UI, I get: Bad Request Error 400

Anyways to answer to your last comment Artemis:

> I guess there are several misconceptions here:

There's no confusion on my side, all that makes sense. When I said "worker"
in that comment I meant the scheduler worker not Spark worker, which in the
Spark realm would be the client.
Everything else you said is undoubtedly correct, but unrelated to the
issue/problem at hand.

Sean, Artemis - I appreciate your feedback about the infra setup, but it's
beside the problem behind this issue.

Let me describe a simpler setup/example with the same problem, say:
 1. I have a jupyter notebook
 2. use local/driver spark mode only
 3. I start the driver, process some data, store it in pandas dataframe
 4. now say I want to add a package to spark driver (or increase the JVM
memory etc)

There's currently no way to do the step 4 without restarting the notebook
process which holds the "reference" to the Spark driver/JVM. If I restart
the Jupter notebook I would lose all the data in memory (e.g. pandas data),
ofc I can save that data to e.g. disk but that's beside the point.

I understand you don't want to provide this functionality in Spark, nor
warn users on changes in Spark Configuration that won't actually work - as
a user I wish I could get at least a warning in that case, but I respect
your decision. It seems like the workaround to shutdown the JVM works in
this case, I would much appreciate your feedback about **that specific
workaround** please. Any reason not to use it?
Cheers - Rafal

On Thu, 10 Mar 2022 at 18:50, Rafał Wojdyła  wrote:

> If you have a long running python orchestrator worker (e.g. Luigi worker),
> and say it's gets a DAG of A -> B ->C, and say the worker first creates a
> spark driver for A (which doesn't need extra jars/packages), then it gets B
> which is also a spark job but it needs an extra package, it won't be able
> to create a new spark driver with extra packages since it's "not possible"
> to create a new driver JVM. I would argue it's the same scenario if you
> have multiple spark jobs that need different amounts of memory or anything
> that requires JVM restart. Of course I can use the workaround to shut down
> the driver/JVM, do you have any feedback about that workaround (see my
> previous comment or the issue).
>
> On Thu, 10 Mar 2022 at 18:12, Sean Owen  wrote:
>
>> Wouldn't these be separately submitted jobs for separate workloads? You
>> can of course dynamically change each job submitted to have whatever
>> packages you like, from whatever is orchestrating. A single job doing
>> everything sound right.
>>
>> On Thu, Mar 10, 2022, 12:05 PM Rafał Wojdyła 
>> wrote:
>>
>>> Because I can't (and should not) know ahead of time which jobs will be
>>> executed, that's the job of the orchestration layer (and can be dynamic). I
>>> know I can specify multiple packages. Also not worried about memory.
>>>
>>> On Thu, 10 Mar 2022 at 13:54, Artemis User 
>>> wrote:
>>>
 If changing packages or jars isn't your concern, why not just specify
 ALL packages that you would need for the Spark environment?  You know you
 can define multiple packages under the packages option.  This shouldn't
 cause memory issues since JVM uses dynamic class loading...

 On 3/9/22 10:03 PM, Rafał Wojdyła wrote:

 Hi Artemis,
 Thanks for your input, to answer your questions:

 > You may want to ask yourself why it is necessary to change the jar
 packages during runtime.

 I have a long running orchestrator process, which executes multiple
 spark jobs, currently on a single VM/driver, some of those jobs might
 require extra packages/jars (please see example in the issue).

 > Changing package doesn't mean to reload the classes.

 AFAIU this is unrelated

 > There is no way to reload the same class unless you customize the
 classloader of Spark.

 AFAIU this is an implementation detail.

 > I also don't think it is necessary to implement a warning or error
 message when changing the configuration since it doesn't do any harm

 To reiterate right now the API allows to change configuration of the
 context, without that configuration taking effect. See example of confused
 users here:
  *
 https://stackoverflow.com/questions/41886346/spark-2-1-0-session-config-settings-pyspark
  *
 https://stackoverflow.com/questions/53606756/how-to-set-spark-driver-memory-in-client-mode-pyspark-version-2-3-1

 I'm curious if you have any opinion about the "hard-reset" workaround,
 copy-pasting from the issue:

 ```
 s: