Re: Connection pool shut down in Spark Iceberg Streaming Connector

2023-10-05 Thread Igor Calabria
You might be affected by this issue:
https://github.com/apache/iceberg/issues/8601

It was already patched but it isn't released yet.

On Thu, Oct 5, 2023 at 7:47 PM Prashant Sharma  wrote:

> Hi Sanket, more details might help here.
>
> How does your spark configuration look like?
>
> What exactly was done when this happened?
>
> On Thu, 5 Oct, 2023, 2:29 pm Agrawal, Sanket,
>  wrote:
>
>> Hello Everyone,
>>
>>
>>
>> We are trying to stream the changes in our Iceberg tables stored in AWS
>> S3. We are achieving this through Spark-Iceberg Connector and using JAR
>> files for Spark-AWS. Suddenly we have started receiving error “Connection
>> pool shut down”.
>>
>>
>>
>> Spark Version: 3.4.1
>>
>> Iceberg: 1.3.1
>>
>>
>>
>> Any help or guidance would of great help.
>>
>>
>>
>> Thank You,
>>
>> Sanket A.
>>
>>
>>
>> This message (including any attachments) contains confidential
>> information intended for a specific individual and purpose, and is
>> protected by law. If you are not the intended recipient, you should delete
>> this message and any disclosure, copying, or distribution of this message,
>> or the taking of any action based on it, by you is strictly prohibited.
>>
>> Deloitte refers to a Deloitte member firm, one of its related entities,
>> or Deloitte Touche Tohmatsu Limited ("DTTL"). Each Deloitte member firm is
>> a separate legal entity and a member of DTTL. DTTL does not provide
>> services to clients. Please see www.deloitte.com/about to learn more.
>>
>> v.E.1
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Spark Compatibility with Spring Boot 3.x

2023-10-05 Thread Angshuman Bhattacharya
Thanks Ahmed. I am trying to bring this up with Spark DE community

On Thu, Oct 5, 2023 at 12:32 PM Ahmed Albalawi <
ahmed.albal...@capitalone.com> wrote:

> Hello team,
>
> We are in the process of upgrading one of our apps to Spring Boot 3.x
> while using Spark, and we have encountered an issue with Spark
> compatibility, specifically with Jakarta Servlet. Spring Boot 3.x uses
> Jakarta Servlet while Spark uses Javax Servlet. Can we get some guidance on
> how to upgrade to Spring Boot 3.x while continuing to use Spark.
>
> The specific error is listed below:
>
> java.lang.NoClassDefFoundError: javax/servlet/Servlet
> at org.apache.spark.ui.SparkUI$.create(SparkUI.scala:239)
> at org.apache.spark.SparkContext.(SparkContext.scala:503)
> at org.apache.spark.SparkContext$.getOrCreate(SparkContext.scala:2888)
> at org.apache.spark.SparkContext.getOrCreate(SparkContext.scala)
>
> The error comes up when we try to run a mvn clean install, and the issue is 
> in our test cases. This issue happens specifically when we build our spark 
> session. The line of code it traces down to is as follows:
>
> *session = 
> SparkSession.builder().sparkContext(SparkContext.getOrCreate(sparkConf)).getOrCreate();*
>
> What we have tried:
>
> - We noticed according to this post 
> ,
>  there are no compatible versions of spark using version 5 of the Jakarta 
> Servlet API
>
> - We've tried 
> 
>  using the maven shade plugin to use jakarta instead of javax, but are 
> running into some other issues with this.
> - We've also looked at the following 
> 
>  to use jakarta 4.x with jersey 2.x and still have an issue with the servlet
>
>
> Please let us know if there are any solutions to this issue. Thanks!
>
>
> --
> *Ahmed Albalawi*
>
> Senior Associate Software Engineer • EP2 Tech - CuRE
>
> 571-668-3911 •  1680 Capital One Dr.
>

__



The information contained in this e-mail may be confidential and/or proprietary 
to Capital One and/or its affiliates and may only be used solely in performance 
of work or services for Capital One. The information transmitted herewith is 
intended only for use by the individual or entity to which it is addressed. If 
the reader of this message is not the intended recipient, you are hereby 
notified that any review, retransmission, dissemination, distribution, copying 
or other use of, or taking of any action in reliance upon this information is 
strictly prohibited. If you have received this communication in error, please 
contact the sender and delete the material from your computer.





Re: Spark Compatibility with Spring Boot 3.x

2023-10-05 Thread Sean Owen
I think we already updated this in Spark 4. However for now you would have
to also include a JAR with the jakarta.* classes instead.
You are welcome to try Spark 4 now by building from master, but it's far
from release.

On Thu, Oct 5, 2023 at 11:53 AM Ahmed Albalawi
 wrote:

> Hello team,
>
> We are in the process of upgrading one of our apps to Spring Boot 3.x
> while using Spark, and we have encountered an issue with Spark
> compatibility, specifically with Jakarta Servlet. Spring Boot 3.x uses
> Jakarta Servlet while Spark uses Javax Servlet. Can we get some guidance on
> how to upgrade to Spring Boot 3.x while continuing to use Spark.
>
> The specific error is listed below:
>
> java.lang.NoClassDefFoundError: javax/servlet/Servlet
> at org.apache.spark.ui.SparkUI$.create(SparkUI.scala:239)
> at org.apache.spark.SparkContext.(SparkContext.scala:503)
> at org.apache.spark.SparkContext$.getOrCreate(SparkContext.scala:2888)
> at org.apache.spark.SparkContext.getOrCreate(SparkContext.scala)
>
> The error comes up when we try to run a mvn clean install, and the issue is 
> in our test cases. This issue happens specifically when we build our spark 
> session. The line of code it traces down to is as follows:
>
> *session = 
> SparkSession.builder().sparkContext(SparkContext.getOrCreate(sparkConf)).getOrCreate();*
>
> What we have tried:
>
> - We noticed according to this post 
> ,
>  there are no compatible versions of spark using version 5 of the Jakarta 
> Servlet API
>
> - We've tried 
> 
>  using the maven shade plugin to use jakarta instead of javax, but are 
> running into some other issues with this.
> - We've also looked at the following 
> 
>  to use jakarta 4.x with jersey 2.x and still have an issue with the servlet
>
>
> Please let us know if there are any solutions to this issue. Thanks!
>
>
> --
> *Ahmed Albalawi*
>
> Senior Associate Software Engineer • EP2 Tech - CuRE
>
> 571-668-3911 •  1680 Capital One Dr.
> --
>
> The information contained in this e-mail may be confidential and/or
> proprietary to Capital One and/or its affiliates and may only be used
> solely in performance of work or services for Capital One. The information
> transmitted herewith is intended only for use by the individual or entity
> to which it is addressed. If the reader of this message is not the intended
> recipient, you are hereby notified that any review, retransmission,
> dissemination, distribution, copying or other use of, or taking of any
> action in reliance upon this information is strictly prohibited. If you
> have received this communication in error, please contact the sender and
> delete the material from your computer.
>
>
>
>
>


Spark Compatibility with Spring Boot 3.x

2023-10-05 Thread Ahmed Albalawi
Hello team,

We are in the process of upgrading one of our apps to Spring Boot 3.x while
using Spark, and we have encountered an issue with Spark compatibility,
specifically with Jakarta Servlet. Spring Boot 3.x uses Jakarta Servlet
while Spark uses Javax Servlet. Can we get some guidance on how to upgrade
to Spring Boot 3.x while continuing to use Spark.

The specific error is listed below:

java.lang.NoClassDefFoundError: javax/servlet/Servlet
at org.apache.spark.ui.SparkUI$.create(SparkUI.scala:239)
at org.apache.spark.SparkContext.(SparkContext.scala:503)
at org.apache.spark.SparkContext$.getOrCreate(SparkContext.scala:2888)
at org.apache.spark.SparkContext.getOrCreate(SparkContext.scala)

The error comes up when we try to run a mvn clean install, and the
issue is in our test cases. This issue happens specifically when we
build our spark session. The line of code it traces down to is as
follows:

*session = 
SparkSession.builder().sparkContext(SparkContext.getOrCreate(sparkConf)).getOrCreate();*

What we have tried:

- We noticed according to this post
,
there are no compatible versions of spark using version 5 of the
Jakarta Servlet API

- We've tried 

using the maven shade plugin to use jakarta instead of javax, but are
running into some other issues with this.
- We've also looked at the following

to use jakarta 4.x with jersey 2.x and still have an issue with the
servlet


Please let us know if there are any solutions to this issue. Thanks!


-- 
*Ahmed Albalawi*

Senior Associate Software Engineer • EP2 Tech - CuRE

571-668-3911 •  1680 Capital One Dr.

__



The information contained in this e-mail may be confidential and/or proprietary 
to Capital One and/or its affiliates and may only be used solely in performance 
of work or services for Capital One. The information transmitted herewith is 
intended only for use by the individual or entity to which it is addressed. If 
the reader of this message is not the intended recipient, you are hereby 
notified that any review, retransmission, dissemination, distribution, copying 
or other use of, or taking of any action in reliance upon this information is 
strictly prohibited. If you have received this communication in error, please 
contact the sender and delete the material from your computer.





Re: [PySpark Structured Streaming] How to tune .repartition(N) ?

2023-10-05 Thread Mich Talebzadeh
The fact that you have 60 partitions or brokers in kaka  is not directly
correlated  to Spark Structured Streaming (SSS) executors by itself. See
below.

Spark starts with 200 partitions. However, by default, Spark/PySpark
creates partitions that are equal to the number of CPU cores in the node,
the so called vcores. So it depends on the number of nodes you are using in
your spark cluster.

Without doing a PoC you would not need to worry about repartition(10) in
your writeStream. I suggest that for now you remove that parameter and
observe the spark processing through Spark GUI (default port 4040) and in
particular the page on Structured Streaming". Your sink is Delta Lake which
is no different from any other data warehouses such as Google BigQuery.

My general advice, the usual thing to watch  from Spark GUI

Processing Time (Process Rate)  + Reserved Capacity < Batch Interval (Batch
Duration)

If your sink ( Delta Lake) has an issue absorbing data in a timely manner
as per above formulae, you will see the defect on the Processing Rate

Batch Interval, i.e. the rate at which the upstream source sends messages
through Kafka. We can start by assuming that the rate of increase in the
number of messages processed (processing time) will require an additional
reserved capacity. We can anticipate a heuristic 70% (~1SD) increase in the
processing time so in theory you  should be able to handle all this work
below the batch interval.

The parameter which I think many deploy is
spark.streaming.backpressure.enabled
> (spark.conf.set("spark.streaming.backpressure.enabled", "true"). The
central idea is that if a component is struggling to keep up, it should
communicate to upstream components and get them to reduce the load. In the
context of Spark Streaming, the receiver is the upstream component which
gets notified if the executors cannot keep up. There are a number of
occasions this will  (not just necessarily the spike in the incoming
messages). For example:

   - Streaming Source: Unexpected short burst of incoming messages in
   source system
   - YARN: Lost Spark executors due to node(s) failure
   - External Sink System: High load on external systems such as Delta
   Lake, BigQuery etc

Without backpressure, microbatches queue up over time and the scheduling
delay increases (check Operation Duration from GUI).

The next parameter I think of is sparkStreamingBackpressurePidMinRate. It is
 the total records per second. It relies on
spark.streaming.kafka.maxRatePerPartition, (not set), which is the maximum
rate (number of records per second) at which messages will be read from
each Kafka partition.

So  sparkStreamingBackpressurePidMinRate starts with

n (total number of kafka partitions)
* spark.streaming.kafka.maxRatePerPartition * Batch Interval

spark.streaming.kafka.maxRatePerPartition is used to control the maximum
rate of data ingestion from Kafka per partition. Kafka topics can have
multiple partitions, and Spark Streaming processes data in parallel by
reading from these partitions.
If you set spark.streaming.kafka.maxRatePerPartition to 1000, Spark
Streaming will consume data from each Kafka partition at a rate of up to
1000 messages per second.

So in your case if you set it goes something like

60 * 1000 * Batch Interval (in seconds)

Of course I stand corrected.

HTH

Mich Talebzadeh,
Distinguished Technologist, Solutions Architect & Engineer
London
United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Thu, 5 Oct 2023 at 05:54, Shao Yang Hong
 wrote:

> Hi all on user@spark:
>
> We are looking for advice and suggestions on how to tune the
> .repartition() parameter.
>
> We are using Spark Streaming on our data pipeline to consume messages
> and persist them to a Delta Lake
> (https://delta.io/learn/getting-started/).
>
> We read messages from a Kafka topic, then add a generated date column
> as a daily partitioning, and save these records to Delta Lake. We have
> 60 Kafka partitions on the Kafka topic, 15 Spark executor instances
> (so 4 Kafka partitions per executor).
>
> How then, should we use .repartition()? Should we omit this parameter?
> Or set it to 15? or 4?
>
> Our code looks roughly like the below:
>
> ```
> df = (
> spark.readStream.format("kafka")
> .option("kafka.bootstrap.servers", os.environ["KAFKA_BROKERS"])
> .option("subscribe", os.environ["KAFKA_TOPIC"])
> .load()
> )
>
> table = (
> df.select(
> from_protobuf(
> "value", "table", "/opt/protobuf-desc/table.desc"
>   

Re: [PySpark Structured Streaming] How to tune .repartition(N) ?

2023-10-05 Thread Perez
You can try the 'optimize' command of delta lake. That will help you for
sure. It merges small files. Also, it depends on the file format. If you
are working with Parquet then still small files should not cause any issues.

P.

On Thu, Oct 5, 2023 at 10:55 AM Shao Yang Hong
 wrote:

> Hi Raghavendra,
>
> Yes, we are trying to reduce the number of files in delta as well (the
> small file problem [0][1]).
>
> We already have a scheduled app to compact files, but the number of
> files is still large, at 14K files per day.
>
> [0]: https://docs.delta.io/latest/optimizations-oss.html#language-python
> [1]:
> https://delta.io/blog/2023-01-25-delta-lake-small-file-compaction-optimize/
>
> On Thu, Oct 5, 2023 at 12:53 PM Raghavendra Ganesh
>  wrote:
> >
> > Hi,
> > What is the purpose for which you want to use repartition() .. to reduce
> the number of files in delta?
> > Also note that there is an alternative option of using coalesce()
> instead of repartition().
> > --
> > Raghavendra
> >
> >
> > On Thu, Oct 5, 2023 at 10:15 AM Shao Yang Hong <
> shaoyang.h...@ninjavan.co.invalid> wrote:
> >>
> >> Hi all on user@spark:
> >>
> >> We are looking for advice and suggestions on how to tune the
> >> .repartition() parameter.
> >>
> >> We are using Spark Streaming on our data pipeline to consume messages
> >> and persist them to a Delta Lake
> >> (https://delta.io/learn/getting-started/).
> >>
> >> We read messages from a Kafka topic, then add a generated date column
> >> as a daily partitioning, and save these records to Delta Lake. We have
> >> 60 Kafka partitions on the Kafka topic, 15 Spark executor instances
> >> (so 4 Kafka partitions per executor).
> >>
> >> How then, should we use .repartition()? Should we omit this parameter?
> >> Or set it to 15? or 4?
> >>
> >> Our code looks roughly like the below:
> >>
> >> ```
> >> df = (
> >> spark.readStream.format("kafka")
> >> .option("kafka.bootstrap.servers", os.environ["KAFKA_BROKERS"])
> >> .option("subscribe", os.environ["KAFKA_TOPIC"])
> >> .load()
> >> )
> >>
> >> table = (
> >> df.select(
> >> from_protobuf(
> >> "value", "table", "/opt/protobuf-desc/table.desc"
> >> ).alias("msg")
> >> )
> >> .withColumn("uuid", col("msg.uuid"))
> >> # etc other columns...
> >>
> >> # generated column for daily partitioning in Delta Lake
> >> .withColumn(CREATED_DATE,
> >> date_format(from_unixtime("msg.logged_at"), "-MM-dd"))
> >> .drop("msg")
> >> )
> >>
> >> query = (
> >> table
> >> .repartition(10).writeStream
> >> .queryName(APP_NAME)
> >> .outputMode("append")
> >> .format("delta")
> >> .partitionBy(CREATED_DATE)
> >> .option("checkpointLocation", os.environ["CHECKPOINT"])
> >> .start(os.environ["DELTA_PATH"])
> >> )
> >>
> >> query.awaitTermination()
> >> spark.stop()
> >> ```
> >>
> >> Any advice would be appreciated.
> >>
> >> --
> >> Best Regards,
> >> Shao Yang HONG
> >>
> >> -
> >> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> >>
>
>
> --
> Best Regards,
> Shao Yang HONG
> Software Engineer, Pricing, Tech
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Connection pool shut down in Spark Iceberg Streaming Connector

2023-10-05 Thread Prashant Sharma
Hi Sanket, more details might help here.

How does your spark configuration look like?

What exactly was done when this happened?

On Thu, 5 Oct, 2023, 2:29 pm Agrawal, Sanket,
 wrote:

> Hello Everyone,
>
>
>
> We are trying to stream the changes in our Iceberg tables stored in AWS
> S3. We are achieving this through Spark-Iceberg Connector and using JAR
> files for Spark-AWS. Suddenly we have started receiving error “Connection
> pool shut down”.
>
>
>
> Spark Version: 3.4.1
>
> Iceberg: 1.3.1
>
>
>
> Any help or guidance would of great help.
>
>
>
> Thank You,
>
> Sanket A.
>
>
>
> This message (including any attachments) contains confidential information
> intended for a specific individual and purpose, and is protected by law. If
> you are not the intended recipient, you should delete this message and any
> disclosure, copying, or distribution of this message, or the taking of any
> action based on it, by you is strictly prohibited.
>
> Deloitte refers to a Deloitte member firm, one of its related entities, or
> Deloitte Touche Tohmatsu Limited ("DTTL"). Each Deloitte member firm is a
> separate legal entity and a member of DTTL. DTTL does not provide services
> to clients. Please see www.deloitte.com/about to learn more.
>
> v.E.1
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org


Connection pool shut down in Spark Iceberg Streaming Connector

2023-10-05 Thread Agrawal, Sanket
Hello Everyone,

We are trying to stream the changes in our Iceberg tables stored in AWS S3. We 
are achieving this through Spark-Iceberg Connector and using JAR files for 
Spark-AWS. Suddenly we have started receiving error "Connection pool shut down".

Spark Version: 3.4.1
Iceberg: 1.3.1

Any help or guidance would of great help.

Thank You,
Sanket A.


This message (including any attachments) contains confidential information 
intended for a specific individual and purpose, and is protected by law. If you 
are not the intended recipient, you should delete this message and any 
disclosure, copying, or distribution of this message, or the taking of any 
action based on it, by you is strictly prohibited.

Deloitte refers to a Deloitte member firm, one of its related entities, or 
Deloitte Touche Tohmatsu Limited ("DTTL"). Each Deloitte member firm is a 
separate legal entity and a member of DTTL. DTTL does not provide services to 
clients. Please see www.deloitte.com/about to learn more.

v.E.1


spark_iceberg_streaming.logs
Description: spark_iceberg_streaming.logs

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org