Spark 3.3.0 with Structure Streaming from Kafka Issue on commons-pools2

2022-08-26 Thread Raymond Tang
Hi all,
I encountered one issue when reading from Kafka as stream and then sink into 
HDFS (using delta lake format).


java.lang.NoSuchMethodError: 
org.apache.spark.sql.kafka010.consumer.InternalKafkaConsumerPool$PoolConfig.setMinEvictableIdleTime(Ljava/time/Duration;)V
I looked into the details and found it occurred because Spark built-in jars has 
version  1.5.4 (commons-pool-1.5.4.jar) while this package 
org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0 replies on version 2.11.1 and 
these two version are not compatible. Thus I’ve done one workaround for now to 
place the latest version into my Spark class path as the following page 
documents:
java.lang.NoSuchMethodError: 
PoolConfig.setMinEvictableIdleTime

Question from me:

  *   Can we bump up the version in Spark future releases to avoid issues like 
this?
  *   Will this workaround cause side effects based on your knowledge?

I’m a frequent user of Spark but I don’t have much detailed knowledge in Spark 
underlying code (and I only looked into it whenever I need to debug a complex 
problem).

Thanks and Regards,
Raymond





Spark SQL Predict Pushdown for Hive Bucketed Table

2022-08-26 Thread Raymond Tang

Hi all,
Anyone knows why Spark SQL is not using Hive buckets pruning when reading from 
bucketed Hive table?
[SPARK-40206] Spark SQL Predict Pushdown for Hive Bucketed Table - ASF JIRA 
(apache.org)

Details also provided at the end of mail.

Regards,
Raymond


Hi team,

I was testing out Hive bucket table features.  One of the benefits as most 
documentation suggested is that bucketed hive table can be used for query 
filer/predict pushdown to improve query performance.

However through my exploration, that doesn't seem to be true. Can you please 
help to clarify if Spark SQL supports query optimizations when using Hive 
bucketed table?



How to produce the issue:

Create a Hive 3 table using the following DDL:

create table test_db.bucket_table(user_id int, key string)

comment 'A bucketed table'

partitioned by(country string)

clustered by(user_id) sorted by (key) into 10 buckets

stored as ORC;

And then insert into this table using the following PySpark script:

from pyspark.sql import SparkSession



appName = "PySpark Hive Bucketing Example"

master = "local"



# Create Spark session with Hive supported.

spark = SparkSession.builder \

.appName(appName) \

.master(master) \

.enableHiveSupport() \

.getOrCreate()



# prepare sample data for inserting into hive table

data = []

countries = ['CN', 'AU']

for i in range(0, 1000):

data.append([int(i),  'U'+str(i), countries[i % 2]])



df = spark.createDataFrame(data, ['user_id', 'key', 'country'])

df.show()



# Save df to Hive table test_db.bucket_table



df.write.mode('append').insertInto('test_db.bucket_table')

Then query the table using the following script:

from pyspark.sql import SparkSession



appName = "PySpark Hive Bucketing Example"

master = "local"



# Create Spark session with Hive supported.

spark = SparkSession.builder \

.appName(appName) \

.master(master) \

.enableHiveSupport() \

.getOrCreate()



df = spark.sql("""select * from test_db.bucket_table

where country='AU' and user_id=101

""")

df.show()

df.explain(extended=True)

I am expecting to read from only one bucket file in HDFS but instead Spark 
scanned all bucket files in partition folder country=AU.

== Parsed Logical Plan ==

'Project [*]

 - 'Filter (('country = AU) AND ('t1.user_id = 101))

- 'SubqueryAlias t1

   - 'UnresolvedRelation [test_db, bucket_table], [], false



== Analyzed Logical Plan ==

user_id: int, key: string, country: string

Project [user_id#20, key#21, country#22]

 - Filter ((country#22 = AU) AND (user_id#20 = 101))

- SubqueryAlias t1

   - SubqueryAlias spark_catalog.test_db.bucket_table

  - Relation test_db.bucket_table[user_id#20,key#21,country#22] orc



== Optimized Logical Plan ==

Filter (((isnotnull(country#22) AND isnotnull(user_id#20)) AND (country#22 = 
AU)) AND (user_id#20 = 101))

 - Relation test_db.bucket_table[user_id#20,key#21,country#22] orc



== Physical Plan ==

*(1) Filter (isnotnull(user_id#20) AND (user_id#20 = 101))

 - *(1) ColumnarToRow

- FileScan orc test_db.bucket_table[user_id#20,key#21,country#22] Batched: 
true, DataFilters: [isnotnull(user_id#20), (user_id#20 = 101)], Format: ORC, 
Location: InMemoryFileIndex(1 
paths)[hdfs://localhost:9000/user/hive/warehouse/test_db.db/bucket_table/coun...,
 PartitionFilters: [isnotnull(country#22), (country#22 = AU)], PushedFilters: 
[IsNotNull(user_id), EqualTo(user_id,101)], ReadSchema: 
struct

Am I doing something wrong? or is it because Spark doesn't support it? Your 
guidance and help will be appreciated.






Structured Streaming - data not being read (offsets not getting committed ?)

2022-08-26 Thread karan alang
Hello All,

i've a long-running Apache Spark structured streaming job running in GCP
Dataproc, which reads data from Kafka every 10 mins, and does some
processing. Kafka topic has 3 partitions, and a retention period of 3 days.

The issue i'm facing is that after few hours, the program stops reading
data from Kafka. If i delete the gcp bucket (which is the checkpoint
directory), and then restart the streaming job - it starts consuming the
data again.

here is the code, where I'm reading data from Kafka & using foreachbatch to
call a function where the processing happens

```

df_stream = spark.readStream.format('kafka') \
.option("kafka.security.protocol", "SSL") \
.option("kafka.ssl.truststore.location", ssl_truststore_location) \
.option("kafka.ssl.truststore.password", ssl_truststore_password) \
.option("kafka.ssl.keystore.location", ssl_keystore_location) \
.option("kafka.ssl.keystore.password", ssl_keystore_password) \
.option("kafka.bootstrap.servers", kafkaBrokers) \
.option("subscribe", topic) \
.option("startingOffsets", "latest") \
.option("failOnDataLoss", "false") \
.option("kafka.metadata.max.age.ms", "1000") \
.option("kafka.ssl.keystore.type", "PKCS12") \
.option("kafka.ssl.truststore.type", "PKCS12") \
.option("maxOffsetsPerTrigger", 10) \
.option("max.poll.records", 500) \
.option("max.poll.interval.ms", 100) \
.load()

query = df_stream.selectExpr("CAST(value AS STRING)", "timestamp",
"topic").writeStream \
.outputMode("append") \
.trigger(processingTime='3 minutes') \
.option("truncate", "false") \
.option("checkpointLocation", checkpoint) \
.foreachBatch(convertToDictForEachBatch) \
.start()

```

Log snippet, offset being set to latest & data not being read :

```

total time take, convertToDict :  0:00:00.00669522/08/26 17:45:03 INFO
org.apache.kafka.clients.consumer.internals.AbstractCoordinator:
[Consumer 
clientId=consumer-spark-kafka-source-a1532573-fad4-4127-ac20-ab9878913643-594190416-driver-0-1,
groupId=spark-kafka-source-a1532573-fad4-4127-ac20-ab9878913643-594190416-driver-0]
Member 
consumer-spark-kafka-source-a1532573-fad4-4127-ac20-ab9878913643-594190416-driver-0-1-26755c4c-93d6-4ab6-8799-411439e310bc
sending LeaveGroup request to coordinator 35.185.24.226:9094 (id:
2147483646 rack: null) due to consumer poll timeout has expired. This
means the time between subsequent calls to poll() was longer than the
configured max.poll.interval.ms, which typically implies that the poll
loop is spending too much time processing messages. You can address
this either by increasing max.poll.interval.ms or by reducing the
maximum size of batches returned in poll() with
max.poll.records.22/08/26 17:50:00 INFO
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:
[Consumer 
clientId=consumer-spark-kafka-source-a1532573-fad4-4127-ac20-ab9878913643-594190416-driver-0-1,
groupId=spark-kafka-source-a1532573-fad4-4127-ac20-ab9878913643-594190416-driver-0]
Giving away all assigned partitions as lost since generation has been
reset,indicating that consumer is no longer part of the group22/08/26
17:50:00 INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:
[Consumer 
clientId=consumer-spark-kafka-source-a1532573-fad4-4127-ac20-ab9878913643-594190416-driver-0-1,
groupId=spark-kafka-source-a1532573-fad4-4127-ac20-ab9878913643-594190416-driver-0]
Lost previously assigned partitions syslog.ueba-us4.v1.versa.demo3-0,
syslog.ueba-us4.v1.versa.demo3-1,
syslog.ueba-us4.v1.versa.demo3-222/08/26 17:50:00 INFO
org.apache.kafka.clients.consumer.internals.AbstractCoordinator:
[Consumer 
clientId=consumer-spark-kafka-source-a1532573-fad4-4127-ac20-ab9878913643-594190416-driver-0-1,
groupId=spark-kafka-source-a1532573-fad4-4127-ac20-ab9878913643-594190416-driver-0]
(Re-)joining group22/08/26 17:50:00 INFO
org.apache.kafka.clients.consumer.internals.AbstractCoordinator:
[Consumer 
clientId=consumer-spark-kafka-source-a1532573-fad4-4127-ac20-ab9878913643-594190416-driver-0-1,
groupId=spark-kafka-source-a1532573-fad4-4127-ac20-ab9878913643-594190416-driver-0]
Join group failed with
org.apache.kafka.common.errors.MemberIdRequiredException: The group
member needs to have a valid member id before actually entering a
consumer group.22/08/26 17:50:00 INFO
org.apache.kafka.clients.consumer.internals.AbstractCoordinator:
[Consumer 
clientId=consumer-spark-kafka-source-a1532573-fad4-4127-ac20-ab9878913643-594190416-driver-0-1,
groupId=spark-kafka-source-a1532573-fad4-4127-ac20-ab9878913643-594190416-driver-0]
(Re-)joining group22/08/26 17:50:03 INFO
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:
[Consumer 
clientId=consumer-spark-kafka-source-a1532573-fad4-4127-ac20-ab9878913643-594190416-driver-0-1,
groupId=spark-kafka-source-a1532573-fad4-4127-ac20-ab9878913643-594190416-driver-0]
Finished 

Re: Profiling PySpark Pandas UDF

2022-08-26 Thread Abdeali Kothari
Hi Luca, I see you pushed some code to the PR 3 hrs ago.
That's awesome. If I can help out in any way - do let me know
I think that's an amazing feature and would be great if it can get into
spark

On Fri, 26 Aug 2022, 12:41 Luca Canali,  wrote:

> @Abdeali as for “lightweight profiling”, there is some work in progress on
> instrumenting Python UDFs with Spark metrics, see
> https://issues.apache.org/jira/browse/SPARK-34265
>
> However it is a bit stuck at the moment, and needs to be revived I
> believe.
>
>
>
> Best,
>
> Luca
>
>
>
> *From:* Abdeali Kothari 
> *Sent:* Friday, August 26, 2022 06:36
> *To:* Subash Prabanantham 
> *Cc:* Russell Jurney ; Gourav Sengupta <
> gourav.sengu...@gmail.com>; Sean Owen ; Takuya UESHIN <
> ues...@happy-camper.st>; user 
> *Subject:* Re: Profiling PySpark Pandas UDF
>
>
>
> The python profiler is pretty cool !
>
> Ill try it out to see what could be taking time within the UDF with it.
>
>
>
> I'm wondering if there is also some lightweight profiling (which does not
> slow down my processing) for me to get:
>
>
>
>  - how much time the UDF took (like how much time was spent inside the UDF)
>
>  - how many times the UDF was called
>
>
>
> I can see the overall time a stage took in the Spark UI - would be cool if
> I could find the time a UDF takes too
>
>
>
> On Fri, 26 Aug 2022, 00:25 Subash Prabanantham, 
> wrote:
>
> Wow, lots of good suggestions. I didn’t know about the profiler either.
> Great suggestion @Takuya.
>
>
>
>
>
> Thanks,
>
> Subash
>
>
>
> On Thu, 25 Aug 2022 at 19:30, Russell Jurney 
> wrote:
>
> YOU know what you're talking about and aren't hacking a solution. You are
> my new friend :) Thank you, this is incredibly helpful!
>
>
>
>
> Thanks,
>
> Russell Jurney @rjurney 
> russell.jur...@gmail.com LI  FB
>  datasyndrome.com
>
>
>
>
>
> On Thu, Aug 25, 2022 at 10:52 AM Takuya UESHIN 
> wrote:
>
> Hi Subash,
>
> Have you tried the Python/Pandas UDF Profiler introduced in Spark 3.3?
> -
> https://spark.apache.org/docs/latest/api/python/development/debugging.html#python-pandas-udf
>
> Hope it can help you.
>
> Thanks.
>
>
>
> On Thu, Aug 25, 2022 at 10:18 AM Russell Jurney 
> wrote:
>
> Subash, I’m here to help :)
>
>
>
> I started a test script to demonstrate a solution last night but got a
> cold and haven’t finished it. Give me another day and I’ll get it to you.
> My suggestion is that you run PySpark locally in pytest with a fixture to
> generate and yield your SparckContext and SparkSession and the. Write tests
> that load some test data, perform some count operation and checkpoint to
> ensure that data is loaded, start a timer, run your UDF on the DataFrame,
> checkpoint again or write some output to disk to make sure it finishes and
> then stop the timer and compute how long it takes. I’ll show you some code,
> I have to do this for Graphlet AI’s RTL utils and other tools to figure out
> how much overhead there is using Pandera and Spark together to validate
> data: https://github.com/Graphlet-AI/graphlet
>
>
>
> I’ll respond by tomorrow evening with code in a fist! We’ll see if it gets
> consistent, measurable and valid results! :)
>
>
>
> Russell Jurney
>
>
>
> On Thu, Aug 25, 2022 at 10:00 AM Sean Owen  wrote:
>
> It's important to realize that while pandas UDFs and pandas on Spark are
> both related to pandas, they are not themselves directly related. The first
> lets you use pandas within Spark, the second lets you use pandas on Spark.
>
>
>
> Hard to say with this info but you want to look at whether you are doing
> something expensive in each UDF call and consider amortizing it with the
> scalar iterator UDF pattern. Maybe.
>
>
>
> A pandas UDF is not spark code itself so no there is no tool in spark to
> profile it. Conversely any approach to profiling pandas or python would
> work here .
>
>
>
> On Thu, Aug 25, 2022, 11:22 AM Gourav Sengupta 
> wrote:
>
> Hi,
>
>
>
> May be I am jumping to conclusions and making stupid guesses, but have you
> tried koalas now that it is natively integrated with pyspark??
>
>
>
> Regards
>
> Gourav
>
>
>
> On Thu, 25 Aug 2022, 11:07 Subash Prabanantham, 
> wrote:
>
> Hi All,
>
>
>
> I was wondering if we have any best practices on using pandas UDF ?
> Profiling UDF is not an easy task and our case requires some drilling down
> on the logic of the function.
>
>
>
>
>
> Our use case:
>
> We are using func(Dataframe) => Dataframe as interface to use Pandas UDF,
> while running locally only the function, it runs faster but when executed
> in Spark environment - the processing time is more than expected. We have
> one column where the value is large (BinaryType -> 600KB), wondering
> whether this could make the Arrow computation slower ?
>
>
>
> Is there any profiling or best way to debug the cost incurred using pandas
> UDF ?
>
>
>
>
>
> Thanks,
>
> Subash
>
>
>
> --
>
>
>
> Thanks,
>
> Russell Jurney @rjurney 

回复:Re: Spark got incorrect scala version while using spark 3.2.1 and spark 3.2.2

2022-08-26 Thread ckgppl_yan
Oh, I got it. I thought SPARK can get local scala version.
- 原始邮件 -
发件人:Sean Owen 
收件人:ckgppl_...@sina.cn
抄送人:user 
主题:Re: Spark got incorrect scala version while using spark 3.2.1 and spark 3.2.2
日期:2022年08月26日 21点08分

Spark is built with and ships with a copy of Scala. It doesn't use your local 
version.
On Fri, Aug 26, 2022 at 2:55 AM  wrote:
Hi all,
I found a strange thing. I have run SPARK 3.2.1 prebuilt in local mode. My OS 
scala version is 2.13.7.But when I run  spark-sumit then check the SparkUI, the 
web page shown that my scala version is 2.13.5.I used spark-shell, it also 
shown that my scala version is 2.13.5.Then I tried SPARK 3.2.2, it also shown 
that my scala version is 2.13.5.I checked the codes, it seems that SparkEnv got 
scala version from "scala.util.Properties.versionString".Not sure why it shown 
different scala version. Is it a bug or not?
Thanks
Liang

Re: Spark got incorrect scala version while using spark 3.2.1 and spark 3.2.2

2022-08-26 Thread pengyh

good answer. nice to know too.

Sean Owen wrote:

Spark is built with and ships with a copy of Scala. It doesn't use your
local version.


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark got incorrect scala version while using spark 3.2.1 and spark 3.2.2

2022-08-26 Thread Sean Owen
Spark is built with and ships with a copy of Scala. It doesn't use your
local version.

On Fri, Aug 26, 2022 at 2:55 AM  wrote:

> Hi all,
>
> I found a strange thing. I have run SPARK 3.2.1 prebuilt in local mode. My
> OS scala version is 2.13.7.
> But when I run  spark-sumit then check the SparkUI, the web page shown
> that my scala version is 2.13.5.
> I used spark-shell, it also shown that my scala version is 2.13.5.
> Then I tried SPARK 3.2.2, it also shown that my scala version is 2.13.5.
> I checked the codes, it seems that SparkEnv got scala version from
> "scala.util.Properties.versionString".
> Not sure why it shown different scala version. Is it a bug or not?
>
> Thanks
>
> Liang
>


Spark got incorrect scala version while using spark 3.2.1 and spark 3.2.2

2022-08-26 Thread ckgppl_yan
Hi all,
I found a strange thing. I have run SPARK 3.2.1 prebuilt in local mode. My OS 
scala version is 2.13.7.But when I run  spark-sumit then check the SparkUI, the 
web page shown that my scala version is 2.13.5.I used spark-shell, it also 
shown that my scala version is 2.13.5.Then I tried SPARK 3.2.2, it also shown 
that my scala version is 2.13.5.I checked the codes, it seems that SparkEnv got 
scala version from "scala.util.Properties.versionString".Not sure why it shown 
different scala version. Is it a bug or not?
Thanks
Liang

RE: Profiling PySpark Pandas UDF

2022-08-26 Thread Luca Canali
@Abdeali as for “lightweight profiling”, there is some work in progress on 
instrumenting Python UDFs with Spark metrics, see 
https://issues.apache.org/jira/browse/SPARK-34265  

However it is a bit stuck at the moment, and needs to be revived I believe.  

 

Best,

Luca

 

From: Abdeali Kothari  
Sent: Friday, August 26, 2022 06:36
To: Subash Prabanantham 
Cc: Russell Jurney ; Gourav Sengupta 
; Sean Owen ; Takuya UESHIN 
; user 
Subject: Re: Profiling PySpark Pandas UDF

 

The python profiler is pretty cool !

Ill try it out to see what could be taking time within the UDF with it.

 

I'm wondering if there is also some lightweight profiling (which does not slow 
down my processing) for me to get:

 

 - how much time the UDF took (like how much time was spent inside the UDF)

 - how many times the UDF was called 

 

I can see the overall time a stage took in the Spark UI - would be cool if I 
could find the time a UDF takes too

 

On Fri, 26 Aug 2022, 00:25 Subash Prabanantham, mailto:subashpraba...@gmail.com> > wrote:

Wow, lots of good suggestions. I didn’t know about the profiler either. Great 
suggestion @Takuya. 

 

 

Thanks,

Subash

 

On Thu, 25 Aug 2022 at 19:30, Russell Jurney mailto:russell.jur...@gmail.com> > wrote:

YOU know what you're talking about and aren't hacking a solution. You are my 
new friend :) Thank you, this is incredibly helpful!




 

Thanks,

Russell Jurney   @rjurney  
 russell.jur...@gmail.com  
 LI   FB  
 datasyndrome.com

 

 

On Thu, Aug 25, 2022 at 10:52 AM Takuya UESHIN mailto:ues...@happy-camper.st> > wrote:

Hi Subash,

Have you tried the Python/Pandas UDF Profiler introduced in Spark 3.3?
- 
https://spark.apache.org/docs/latest/api/python/development/debugging.html#python-pandas-udf

Hope it can help you.

Thanks.

 

On Thu, Aug 25, 2022 at 10:18 AM Russell Jurney mailto:russell.jur...@gmail.com> > wrote:

Subash, I’m here to help :)

 

I started a test script to demonstrate a solution last night but got a cold and 
haven’t finished it. Give me another day and I’ll get it to you. My suggestion 
is that you run PySpark locally in pytest with a fixture to generate and yield 
your SparckContext and SparkSession and the. Write tests that load some test 
data, perform some count operation and checkpoint to ensure that data is 
loaded, start a timer, run your UDF on the DataFrame, checkpoint again or write 
some output to disk to make sure it finishes and then stop the timer and 
compute how long it takes. I’ll show you some code, I have to do this for 
Graphlet AI’s RTL utils and other tools to figure out how much overhead there 
is using Pandera and Spark together to validate data: 
https://github.com/Graphlet-AI/graphlet

 

I’ll respond by tomorrow evening with code in a fist! We’ll see if it gets 
consistent, measurable and valid results! :)

 

Russell Jurney

 

On Thu, Aug 25, 2022 at 10:00 AM Sean Owen mailto:sro...@gmail.com> > wrote:

It's important to realize that while pandas UDFs and pandas on Spark are both 
related to pandas, they are not themselves directly related. The first lets you 
use pandas within Spark, the second lets you use pandas on Spark. 

 

Hard to say with this info but you want to look at whether you are doing 
something expensive in each UDF call and consider amortizing it with the scalar 
iterator UDF pattern. Maybe. 

 

A pandas UDF is not spark code itself so no there is no tool in spark to 
profile it. Conversely any approach to profiling pandas or python would work 
here .

 

On Thu, Aug 25, 2022, 11:22 AM Gourav Sengupta mailto:gourav.sengu...@gmail.com> > wrote:

Hi,

 

May be I am jumping to conclusions and making stupid guesses, but have you 
tried koalas now that it is natively integrated with pyspark??

 

Regards 

Gourav

 

On Thu, 25 Aug 2022, 11:07 Subash Prabanantham, mailto:subashpraba...@gmail.com> > wrote:

Hi All,

 

I was wondering if we have any best practices on using pandas UDF ? Profiling 
UDF is not an easy task and our case requires some drilling down on the logic 
of the function. 

 

 

Our use case:

We are using func(Dataframe) => Dataframe as interface to use Pandas UDF, while 
running locally only the function, it runs faster but when executed in Spark 
environment - the processing time is more than expected. We have one column 
where the value is large (BinaryType -> 600KB), wondering whether this could 
make the Arrow computation slower ? 

 

Is there any profiling or best way to debug the cost incurred using pandas UDF ?

 

 

Thanks,

Subash

 

-- 

 

Thanks,

Russell Jurney   @rjurney  
 russell.jur...@gmail.com  
 LI   FB  
 datasyndrome.com




 

-- 

Takuya UESHIN