Re: [EXTERNAL] Re: Spark-submit without access to HDFS

2023-12-11 Thread Mich Talebzadeh
Hi Eugene,

With regard to your points

What are the PYTHONPATH and SPARK_HOME env variables in your script?

OK let us look at a typical of my Spark project structure

- project_root
  |-- README.md
  |-- __init__.py
  |-- conf
  |   |-- (configuration files for Spark)
  |-- deployment
  |   |-- deployment.yaml
  |-- design
  |   |-- (design-related files or documentation)
  |-- othermisc
  |   |-- (other miscellaneous files)
  |-- sparkutils
  |   |-- (utility modules or scripts specific to Spark)
  |-- src
  |-- (main source code for your Spark application)

If you want Spark to recognize modules from the sparkutils directory or any
other directories within your project, you can include those directories in
the PYTHONPATH.

For example, if you want to include the sparkutils directory:

export PYTHONPATH=/path/to/project_root/sparkutils:$PYTHONPATH
to recap, the ${PYTHONPATH} variable is primarily used to specify
additional directories where Python should look for modules and packages.
In the context of Spark, it is typically used to include directories
containing custom Python code or modules that your Spark application
depends on.

With regard to

The --conf "spark.yarn.appMasterEnv.SPARK_HOME=$SPARK_HOME" configuration
option in Spark is used when submitting a Spark application to run on YARN

   -

   --conf: This is used to specify Spark configuration properties when
   submitting a Spark application.
   -

   spark.yarn.appMasterEnv.SPARK_HOME: This is a Spark configuration
   property that defines the value of the SPARK_HOME environment variable
   for the Spark application's Application Master (the process responsible for
   managing the execution of tasks on a YARN cluster).
   -

   $SPARK_HOME: This holds the path to the Spark installation directory.

This configuration is setting the SPARK_HOME environment variable for the
Spark Application Master when the application is running on YARN. This is
important because the Spark Application Master needs to know the location
of the Spark installation directory (SPARK_HOME) to configure and manage
the Spark application's execution on the YARN cluster. HTH
Mich Talebzadeh,
Distinguished Technologist, Solutions Architect & Engineer
London
United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Mon, 11 Dec 2023 at 01:43, Eugene Miretsky  wrote:

> Setting PYSPARK_ARCHIVES_PATH to hfds:// did the tricky. But don't
> understand a few things
>
> 1) The default behaviour is if PYSPARK_ARCHIVES_PATH is empty,
> pyspark.zip is uploaded from the local SPARK_HOME. If it is set to
> "local://" the upload is skipped. I would expect the latter to be the
> default. What's the use case for uploading the local pyspark.zip every
> time?
> 2) It seems like the localConfigs are meant to be copied every time (code)
> what's the use case for that? Why not just use the cluster config?
>
>
>
> On Sun, Dec 10, 2023 at 1:15 PM Eugene Miretsky  wrote:
>
>> Thanks Mich,
>>
>> Tried this and still getting
>> INF Client: "Uploading resource
>> file:/opt/spark/spark-3.5.0-bin-hadoop3/python/lib/pyspark.zip ->
>> hdfs:/". It is also doing it for (py4j.-0.10.9.7-src.zip and
>> __spark_conf__.zip). It is working now because I enabled direct
>> access to HDFS to allow copying the files. But ideally I would like to not
>> have to copy any files directly to HDFS.
>>
>> 1) We would expect pyspark as well as the relevant configs to already be
>> available on the cluster - why are they being copied over? (we can always
>> provide the extra libraries needed using py-files the way you did)
>> 2) If we wanted users to be able to use custom pyspark, we would rather
>> just copy the file HDFS/GCS in other ways, and let users reference it in
>> their job
>> 3) What are the PYTHONPATH and SPARK_HOME env variables in your script?
>> Are they local paths, or paths on the spark cluster?
>>
>> On Fri, Nov 17, 2023 at 8:57 AM Mich Talebzadeh <
>> mich.talebza...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> How are you submitting your spark job from your client?
>>>
>>> Your files can either be on HDFS or HCFS such as gs, s3 etc.
>>>
>>> With reference to --py-files hdfs://yarn-master-url hdfs://foo.py', I
>>> assume you want your
>>>
>>> spark-submit --verbose \
>>>--deploy-mode cluster \
>>>--conf "spark.yarn.appMasterEnv.SPARK_HOME=$SPARK_HOME" \
>>>--conf "spark.yarn.appMasterEnv.PYTHONPATH=${PYTHONPATH}" \
>>>--conf "spark.executorEnv.PYTHONPATH=${PYTHONPATH}" \
>>>--py-files 

Re: [EXTERNAL] Re: Spark-submit without access to HDFS

2023-12-10 Thread Eugene Miretsky
Setting PYSPARK_ARCHIVES_PATH to hfds:// did the tricky. But don't
understand a few things

1) The default behaviour is if PYSPARK_ARCHIVES_PATH is empty, pyspark.zip
is uploaded from the local SPARK_HOME. If it is set to "local://" the
upload is skipped. I would expect the latter to be the default. What's the
use case for uploading the local pyspark.zip every time?
2) It seems like the localConfigs are meant to be copied every time (code
) what's the use case for that? Why not just
use the cluster config?



On Sun, Dec 10, 2023 at 1:15 PM Eugene Miretsky  wrote:

> Thanks Mich,
>
> Tried this and still getting
> INF Client: "Uploading resource
> file:/opt/spark/spark-3.5.0-bin-hadoop3/python/lib/pyspark.zip ->
> hdfs:/". It is also doing it for (py4j.-0.10.9.7-src.zip and
> __spark_conf__.zip). It is working now because I enabled direct
> access to HDFS to allow copying the files. But ideally I would like to not
> have to copy any files directly to HDFS.
>
> 1) We would expect pyspark as well as the relevant configs to already be
> available on the cluster - why are they being copied over? (we can always
> provide the extra libraries needed using py-files the way you did)
> 2) If we wanted users to be able to use custom pyspark, we would rather
> just copy the file HDFS/GCS in other ways, and let users reference it in
> their job
> 3) What are the PYTHONPATH and SPARK_HOME env variables in your script?
> Are they local paths, or paths on the spark cluster?
>
> On Fri, Nov 17, 2023 at 8:57 AM Mich Talebzadeh 
> wrote:
>
>> Hi,
>>
>> How are you submitting your spark job from your client?
>>
>> Your files can either be on HDFS or HCFS such as gs, s3 etc.
>>
>> With reference to --py-files hdfs://yarn-master-url hdfs://foo.py', I
>> assume you want your
>>
>> spark-submit --verbose \
>>--deploy-mode cluster \
>>--conf "spark.yarn.appMasterEnv.SPARK_HOME=$SPARK_HOME" \
>>--conf "spark.yarn.appMasterEnv.PYTHONPATH=${PYTHONPATH}" \
>>--conf "spark.executorEnv.PYTHONPATH=${PYTHONPATH}" \
>>--py-files $CODE_DIRECTORY_CLOUD/dataproc_on_gke.zip \
>>--conf "spark.driver.memory"=4G \
>>--conf "spark.executor.memory"=4G \
>>--conf "spark.num.executors"=4 \
>>--conf "spark.executor.cores"=2 \
>>$CODE_DIRECTORY_CLOUD/${APPLICATION}
>>
>> in my case I define $CODE_DIRECTORY_CLOUD as below on google cloud storage
>>
>> CODE_DIRECTORY="/home/hduser/dba/bin/python/"
>> CODE_DIRECTORY_CLOUD="gs://,${PROJECT}-spark-on-k8s/codes"
>> cd $CODE_DIRECTORY
>> [ -f ${source_code}.zip ] && rm -r -f ${source_code}.zip
>> echo `date` ", ===> creating source zip directory from  ${source_code}"
>> # zip needs to be done at root directory of code
>> zip -rq ${source_code}.zip ${source_code}
>> gsutil cp ${source_code}.zip $CODE_DIRECTORY_CLOUD
>> gsutil cp /home/hduser/dba/bin/python/${source_code}/src/${APPLICATION}
>> $CODE_DIRECTORY_CLOUD
>>
>> So in summary I create a zip  file of my project and copy it across to
>> the cloud storage and then put the application (py file) there as well and
>> use them in spark-submit
>>
>> I trust this answers your question.
>>
>> HTH
>>
>>
>>
>> Mich Talebzadeh,
>> Technologist, Solutions Architect & Engineer
>> London
>> United Kingdom
>>
>>
>>view my Linkedin profile
>> 
>>
>>
>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> On Wed, 15 Nov 2023 at 21:33, Eugene Miretsky 
>> wrote:
>>
>>> Hey All,
>>>
>>> We are running Pyspark spark-submit from a client outside the cluster.
>>> The client has network connectivity only to the Yarn Master, not the HDFS
>>> Datanodes. How can we submit the jobs? The idea would be to preload all the
>>> dependencies (job code, libraries, etc) to HDFS, and just submit the job
>>> from the client.
>>>
>>> We tried something like this
>>> 'PYSPARK_ARCHIVES_PATH=hdfs://some-path/pyspark.zip spark-submit
>>> --master yarn --deploy-mode cluster --py-files hdfs://yarn-master-url
>>> hdfs://foo.py'
>>>
>>> The error we are getting is
>>> "
>>>
>>> org.apache.hadoop.net.ConnectTimeoutException: 6 millis timeout
>>> while waiting for channel to be ready for connect. ch :
>>> java.nio.channels.SocketChannel[connection-pending remote=/
>>> 10.117.110.19:9866]
>>>
>>> org.apache.hadoop.ipc.RemoteException(java.io.IOException): File
>>> /user/users/.sparkStaging/application_1698216436656_0104/
>>> *spark_conf.zip* could only be written 

Re: [EXTERNAL] Re: Spark-submit without access to HDFS

2023-12-10 Thread Eugene Miretsky
Thanks Mich,

Tried this and still getting
INF Client: "Uploading resource
file:/opt/spark/spark-3.5.0-bin-hadoop3/python/lib/pyspark.zip ->
hdfs:/". It is also doing it for (py4j.-0.10.9.7-src.zip and
__spark_conf__.zip). It is working now because I enabled direct
access to HDFS to allow copying the files. But ideally I would like to not
have to copy any files directly to HDFS.

1) We would expect pyspark as well as the relevant configs to already be
available on the cluster - why are they being copied over? (we can always
provide the extra libraries needed using py-files the way you did)
2) If we wanted users to be able to use custom pyspark, we would rather
just copy the file HDFS/GCS in other ways, and let users reference it in
their job
3) What are the PYTHONPATH and SPARK_HOME env variables in your script? Are
they local paths, or paths on the spark cluster?

On Fri, Nov 17, 2023 at 8:57 AM Mich Talebzadeh 
wrote:

> Hi,
>
> How are you submitting your spark job from your client?
>
> Your files can either be on HDFS or HCFS such as gs, s3 etc.
>
> With reference to --py-files hdfs://yarn-master-url hdfs://foo.py', I
> assume you want your
>
> spark-submit --verbose \
>--deploy-mode cluster \
>--conf "spark.yarn.appMasterEnv.SPARK_HOME=$SPARK_HOME" \
>--conf "spark.yarn.appMasterEnv.PYTHONPATH=${PYTHONPATH}" \
>--conf "spark.executorEnv.PYTHONPATH=${PYTHONPATH}" \
>--py-files $CODE_DIRECTORY_CLOUD/dataproc_on_gke.zip \
>--conf "spark.driver.memory"=4G \
>--conf "spark.executor.memory"=4G \
>--conf "spark.num.executors"=4 \
>--conf "spark.executor.cores"=2 \
>$CODE_DIRECTORY_CLOUD/${APPLICATION}
>
> in my case I define $CODE_DIRECTORY_CLOUD as below on google cloud storage
>
> CODE_DIRECTORY="/home/hduser/dba/bin/python/"
> CODE_DIRECTORY_CLOUD="gs://,${PROJECT}-spark-on-k8s/codes"
> cd $CODE_DIRECTORY
> [ -f ${source_code}.zip ] && rm -r -f ${source_code}.zip
> echo `date` ", ===> creating source zip directory from  ${source_code}"
> # zip needs to be done at root directory of code
> zip -rq ${source_code}.zip ${source_code}
> gsutil cp ${source_code}.zip $CODE_DIRECTORY_CLOUD
> gsutil cp /home/hduser/dba/bin/python/${source_code}/src/${APPLICATION}
> $CODE_DIRECTORY_CLOUD
>
> So in summary I create a zip  file of my project and copy it across to the
> cloud storage and then put the application (py file) there as well and use
> them in spark-submit
>
> I trust this answers your question.
>
> HTH
>
>
>
> Mich Talebzadeh,
> Technologist, Solutions Architect & Engineer
> London
> United Kingdom
>
>
>view my Linkedin profile
> 
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Wed, 15 Nov 2023 at 21:33, Eugene Miretsky 
> wrote:
>
>> Hey All,
>>
>> We are running Pyspark spark-submit from a client outside the cluster.
>> The client has network connectivity only to the Yarn Master, not the HDFS
>> Datanodes. How can we submit the jobs? The idea would be to preload all the
>> dependencies (job code, libraries, etc) to HDFS, and just submit the job
>> from the client.
>>
>> We tried something like this
>> 'PYSPARK_ARCHIVES_PATH=hdfs://some-path/pyspark.zip spark-submit --master
>> yarn --deploy-mode cluster --py-files hdfs://yarn-master-url hdfs://foo.py'
>>
>> The error we are getting is
>> "
>>
>> org.apache.hadoop.net.ConnectTimeoutException: 6 millis timeout while
>> waiting for channel to be ready for connect. ch :
>> java.nio.channels.SocketChannel[connection-pending remote=/
>> 10.117.110.19:9866]
>>
>> org.apache.hadoop.ipc.RemoteException(java.io.IOException): File
>> /user/users/.sparkStaging/application_1698216436656_0104/*spark_conf.zip*
>> could only be written to 0 of the 1 minReplication nodes. There are 2
>> datanode(s) running and 2 node(s) are excluded in this operation.
>> "
>>
>> A few question
>> 1) What are the spark_conf.zip files. Is it the hive-site/yarn-site conf
>> files? Why would the client send them to the cluster? (the cluster already
>> has all that info - this would make sense in client mode, but not cluster
>> mode )
>> 2) Is it possible to use spark-submit without HDFS access?
>> 3) How would we fix this?
>>
>> Cheers,
>> Eugene
>>
>> --
>>
>> *Eugene Miretsky*
>> Managing Partner |  Badal.io | Book a meeting /w me!
>> 
>> mobile:  416-568-9245
>> email: eug...@badal.io 
>>
>

-- 

*Eugene Miretsky*
Managing Partner |  Badal.io | Book a meeting /w me!


Re: Spark on Java 17

2023-12-09 Thread Jörn Franke
It is just a goal… however I would not tune the no of regions or region size yet.Simply specify gc algorithm and max heap size.Try to tune other options only if there is a need, only one at at time (otherwise it is difficult to determine cause/effects) and have a performance testing framework in place to be able to measure differences.Do you need those large heaps in Spark? Why not split the tasks further to have more tasks with less memory ?I understand that each job is different and there can be reasons for it, but I often try to just use the defaults and then tune individual options. I try to also avoid certain extreme values (of course there are cases when they are needed). Especially often when upgrading from one Spark version to another then I find out it is then often better to work with a Spark job with default settings, because Spark itself has improved/changed how it works.To reduce the needed heap you can try to increase the number of tasks ( see here https://spark.apache.org/docs/latest/configuration.html)spark.executor.cores (to a few) and spark.sql.shuffle.partitions (default is 200 - you can try how much it brings to change it to 400 etc).and reducespark.executor.memoryAm 10.12.2023 um 02:33 schrieb Faiz Halde :Thanks, IL check them outCurious though, the official G1GC page https://www.oracle.com/technical-resources/articles/java/g1gc.html says that there must be no more than 2048 regions and region size is limited upto 32mbThat's strange because our heaps go up to 100gb and that would require 64mb region size to be under 2048ThanksFaizOn Sat, Dec 9, 2023, 10:33 Luca Canali  wrote:







Hi Faiz,
 
We find G1GC works well for some of our workloads that are Parquet-read intensive and we have been using G1GC with Spark on Java 8 already (spark.driver.extraJavaOptions and spark.executor.extraJavaOptions= “-XX:+UseG1GC”),
 while currently we are mostly running Spark (3.3 and higher) on Java 11.  
However, the best is always to refer to measurements of your specific workloads, let me know if you find something different. 

BTW besides the WebUI, I typically measure GC time also with a couple of custom tools:
https://github.com/cerndb/spark-dashboard and  https://github.com/LucaCanali/sparkMeasure 

A few tests of microbenchmarking Spark reading Parquet with a few different JDKs at:
https://db-blog.web.cern.ch/node/192 

 
Best,
Luca
 
 

From: Faiz Halde  
Sent: Thursday, December 7, 2023 23:25
To: user@spark.apache.org
Subject: Spark on Java 17

 

Hello,

 


We are planning to switch to Java 17 for Spark and were wondering if there's any obvious learnings from anybody related to JVM tuning?


 


We've been running on Java 8 for a while now and used to use the parallel GC as that used to be a general recommendation for high throughout systems. How has the default G1GC worked out with Spark?


 


Thanks


Faiz








Re: Spark on Java 17

2023-12-09 Thread Faiz Halde
Thanks, IL check them out

Curious though, the official G1GC page
https://www.oracle.com/technical-resources/articles/java/g1gc.html says
that there must be no more than 2048 regions and region size is limited
upto 32mb

That's strange because our heaps go up to 100gb and that would require 64mb
region size to be under 2048

Thanks
Faiz

On Sat, Dec 9, 2023, 10:33 Luca Canali  wrote:

> Hi Faiz,
>
>
>
> We find G1GC works well for some of our workloads that are Parquet-read
> intensive and we have been using G1GC with Spark on Java 8 already
> (spark.driver.extraJavaOptions and spark.executor.extraJavaOptions=
> “-XX:+UseG1GC”), while currently we are mostly running Spark (3.3 and
> higher) on Java 11.
>
> However, the best is always to refer to measurements of your specific
> workloads, let me know if you find something different.
> BTW besides the WebUI, I typically measure GC time also with a couple of
> custom tools: https://github.com/cerndb/spark-dashboard and
> https://github.com/LucaCanali/sparkMeasure
>
> A few tests of microbenchmarking Spark reading Parquet with a few
> different JDKs at: https://db-blog.web.cern.ch/node/192
>
>
>
> Best,
>
> Luca
>
>
>
>
>
> *From:* Faiz Halde 
> *Sent:* Thursday, December 7, 2023 23:25
> *To:* user@spark.apache.org
> *Subject:* Spark on Java 17
>
>
>
> Hello,
>
>
>
> We are planning to switch to Java 17 for Spark and were wondering if
> there's any obvious learnings from anybody related to JVM tuning?
>
>
>
> We've been running on Java 8 for a while now and used to use the parallel
> GC as that used to be a general recommendation for high throughout systems.
> How has the default G1GC worked out with Spark?
>
>
>
> Thanks
>
> Faiz
>


Re: Spark on Java 17

2023-12-09 Thread Jörn Franke
If you do tests with newer Java versions you can also try:

- UseNUMA: -XX:+UseNUMA. See https://openjdk.org/jeps/345

You can also assess the new Java GC algorithms:
- -XX:+UseShenandoahGC - works with terabyte of heaps - more memory efficient 
than zgc with heaps <32 GB. See also: 
https://developers.redhat.com/articles/2021/09/16/shenandoah-openjdk-17-sub-millisecond-gc-pauses
-  -XX:+UseZGC - works also with terabytes of heaps - see also 
https://www.baeldung.com/jvm-zgc-garbage-collector

Note: in jdk 21 zgc has an additional option that could make sense to activate:

-XX:+ZGenerational

See also 
https://developers.redhat.com/articles/2021/11/02/how-choose-best-java-garbage-collector

Note: it might be worth to try also JDK 21 - it has for certain GCs 
optimizations (amongst other things - I wonder how much improvement virtual 
threads can bring to Spark)

> Am 08.12.2023 um 01:02 schrieb Faiz Halde :
> 
> 
> Hello,
> 
> We are planning to switch to Java 17 for Spark and were wondering if there's 
> any obvious learnings from anybody related to JVM tuning?
> 
> We've been running on Java 8 for a while now and used to use the parallel GC 
> as that used to be a general recommendation for high throughout systems. How 
> has the default G1GC worked out with Spark?
> 
> Thanks
> Faiz


RE: Spark on Java 17

2023-12-09 Thread Luca Canali
Hi Faiz,

We find G1GC works well for some of our workloads that are Parquet-read 
intensive and we have been using G1GC with Spark on Java 8 already 
(spark.driver.extraJavaOptions and spark.executor.extraJavaOptions= 
“-XX:+UseG1GC”), while currently we are mostly running Spark (3.3 and higher) 
on Java 11.
However, the best is always to refer to measurements of your specific 
workloads, let me know if you find something different.
BTW besides the WebUI, I typically measure GC time also with a couple of custom 
tools: https://github.com/cerndb/spark-dashboard and  
https://github.com/LucaCanali/sparkMeasure
A few tests of microbenchmarking Spark reading Parquet with a few different 
JDKs at: https://db-blog.web.cern.ch/node/192

Best,
Luca


From: Faiz Halde 
Sent: Thursday, December 7, 2023 23:25
To: user@spark.apache.org
Subject: Spark on Java 17

Hello,

We are planning to switch to Java 17 for Spark and were wondering if there's 
any obvious learnings from anybody related to JVM tuning?

We've been running on Java 8 for a while now and used to use the parallel GC as 
that used to be a general recommendation for high throughout systems. How has 
the default G1GC worked out with Spark?

Thanks
Faiz


Re: SSH Tunneling issue with Apache Spark

2023-12-06 Thread Venkatesan Muniappan
Thanks for the clarification. I will try to do plain jdbc connection on
Scala/Java and will update this thread on how it goes.

*Thanks,*
*Venkat*



On Thu, Dec 7, 2023 at 9:40 AM Nicholas Chammas 
wrote:

> PyMySQL has its own implementation
> 
>  of
> the MySQL client-server protocol. It does not use JDBC.
>
>
> On Dec 6, 2023, at 10:43 PM, Venkatesan Muniappan <
> venkatesa...@noonacademy.com> wrote:
>
> Thanks for the advice Nicholas.
>
> As mentioned in the original email, I have tried JDBC + SSH Tunnel using
> pymysql and sshtunnel and it worked fine. The problem happens only with
> Spark.
>
> *Thanks,*
> *Venkat*
>
>
>
> On Wed, Dec 6, 2023 at 10:21 PM Nicholas Chammas <
> nicholas.cham...@gmail.com> wrote:
>
>> This is not a question for the dev list. Moving dev to bcc.
>>
>> One thing I would try is to connect to this database using JDBC + SSH
>> tunnel, but without Spark. That way you can focus on getting the JDBC
>> connection to work without Spark complicating the picture for you.
>>
>>
>> On Dec 5, 2023, at 8:12 PM, Venkatesan Muniappan <
>> venkatesa...@noonacademy.com> wrote:
>>
>> Hi Team,
>>
>> I am facing an issue with SSH Tunneling in Apache Spark. The behavior is
>> same as the one in this Stackoverflow question
>> 
>> but there are no answers there.
>>
>> This is what I am trying:
>>
>>
>> with SSHTunnelForwarder(
>> (ssh_host, ssh_port),
>> ssh_username=ssh_user,
>> ssh_pkey=ssh_key_file,
>> remote_bind_address=(sql_hostname, sql_port),
>> local_bind_address=(local_host_ip_address, sql_port)) as tunnel:
>> tunnel.local_bind_port
>> b1_semester_df = spark.read \
>> .format("jdbc") \
>> .option("url", b2b_mysql_url.replace("<>", 
>> str(tunnel.local_bind_port)))
>> \
>> .option("query", b1_semester_sql) \
>> .option("database", 'b2b') \
>> .option("password", b2b_mysql_password) \
>> .option("driver", "com.mysql.cj.jdbc.Driver") \
>> .load()
>> b1_semester_df.count()
>>
>> Here, the b1_semester_df is loaded but when I try count on the same Df it
>> fails saying this
>>
>> 23/12/05 11:49:17 ERROR TaskSetManager: Task 0 in stage 2.0 failed 4
>> times; aborting job
>> Traceback (most recent call last):
>>   File "", line 1, in 
>>   File "/usr/lib/spark/python/pyspark/sql/dataframe.py", line 382, in
>> show
>> print(self._jdf.showString(n, 20, vertical))
>>   File
>> "/usr/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line
>> 1257, in __call__
>>   File "/usr/lib/spark/python/pyspark/sql/utils.py", line 63, in deco
>> return f(*a, **kw)
>>   File "/usr/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py",
>> line 328, in get_return_value
>> py4j.protocol.Py4JJavaError: An error occurred while calling
>> o284.showString.
>> : org.apache.spark.SparkException: Job aborted due to stage failure: Task
>> 0 in stage 2.0 failed 4 times, most recent failure: Lost task 0.3 in stage
>> 2.0 (TID 11, ip-172-32-108-1.eu-central-1.compute.internal, executor 3):
>> com.mysql.cj.jdbc.exceptions.CommunicationsException: Communications link
>> failure
>>
>> However, the same is working fine with pandas df. I have tried this below
>> and it worked.
>>
>>
>> with SSHTunnelForwarder(
>> (ssh_host, ssh_port),
>> ssh_username=ssh_user,
>> ssh_pkey=ssh_key_file,
>> remote_bind_address=(sql_hostname, sql_port)) as tunnel:
>> conn = pymysql.connect(host=local_host_ip_address, user=sql_username,
>> passwd=sql_password, db=sql_main_database,
>> port=tunnel.local_bind_port)
>> df = pd.read_sql_query(b1_semester_sql, conn)
>> spark.createDataFrame(df).createOrReplaceTempView("b1_semester")
>>
>> So wanted to check what I am missing with my Spark usage. Please help.
>>
>> *Thanks,*
>> *Venkat*
>>
>>
>>
>


Re: SSH Tunneling issue with Apache Spark

2023-12-06 Thread Nicholas Chammas
PyMySQL has its own implementation 

 of the MySQL client-server protocol. It does not use JDBC.


> On Dec 6, 2023, at 10:43 PM, Venkatesan Muniappan 
>  wrote:
> 
> Thanks for the advice Nicholas. 
> 
> As mentioned in the original email, I have tried JDBC + SSH Tunnel using 
> pymysql and sshtunnel and it worked fine. The problem happens only with Spark.
> 
> Thanks,
> Venkat
> 
> 
> 
> On Wed, Dec 6, 2023 at 10:21 PM Nicholas Chammas  > wrote:
>> This is not a question for the dev list. Moving dev to bcc.
>> 
>> One thing I would try is to connect to this database using JDBC + SSH 
>> tunnel, but without Spark. That way you can focus on getting the JDBC 
>> connection to work without Spark complicating the picture for you.
>> 
>> 
>>> On Dec 5, 2023, at 8:12 PM, Venkatesan Muniappan 
>>> mailto:venkatesa...@noonacademy.com>> wrote:
>>> 
>>> Hi Team,
>>> 
>>> I am facing an issue with SSH Tunneling in Apache Spark. The behavior is 
>>> same as the one in this Stackoverflow question 
>>> 
>>>  but there are no answers there.
>>> 
>>> This is what I am trying:
>>> 
>>> 
>>> with SSHTunnelForwarder(
>>> (ssh_host, ssh_port),
>>> ssh_username=ssh_user,
>>> ssh_pkey=ssh_key_file,
>>> remote_bind_address=(sql_hostname, sql_port),
>>> local_bind_address=(local_host_ip_address, sql_port)) as tunnel:
>>> tunnel.local_bind_port
>>> b1_semester_df = spark.read \
>>> .format("jdbc") \
>>> .option("url", b2b_mysql_url.replace("<>", 
>>> str(tunnel.local_bind_port))) \
>>> .option("query", b1_semester_sql) \
>>> .option("database", 'b2b') \
>>> .option("password", b2b_mysql_password) \
>>> .option("driver", "com.mysql.cj.jdbc.Driver") \
>>> .load()
>>> b1_semester_df.count()
>>> 
>>> Here, the b1_semester_df is loaded but when I try count on the same Df it 
>>> fails saying this
>>> 
>>> 23/12/05 11:49:17 ERROR TaskSetManager: Task 0 in stage 2.0 failed 4 times; 
>>> aborting job
>>> Traceback (most recent call last):
>>>   File "", line 1, in 
>>>   File "/usr/lib/spark/python/pyspark/sql/dataframe.py", line 382, in show
>>> print(self._jdf.showString(n, 20, vertical))
>>>   File 
>>> "/usr/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 
>>> 1257, in __call__
>>>   File "/usr/lib/spark/python/pyspark/sql/utils.py", line 63, in deco
>>> return f(*a, **kw)
>>>   File "/usr/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", 
>>> line 328, in get_return_value
>>> py4j.protocol.Py4JJavaError: An error occurred while calling 
>>> o284.showString.
>>> : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 
>>> in stage 2.0 failed 4 times, most recent failure: Lost task 0.3 in stage 
>>> 2.0 (TID 11, ip-172-32-108-1.eu-central-1.compute.internal, executor 3): 
>>> com.mysql.cj.jdbc.exceptions.CommunicationsException: Communications link 
>>> failure
>>> 
>>> However, the same is working fine with pandas df. I have tried this below 
>>> and it worked.
>>> 
>>> 
>>> with SSHTunnelForwarder(
>>> (ssh_host, ssh_port),
>>> ssh_username=ssh_user,
>>> ssh_pkey=ssh_key_file,
>>> remote_bind_address=(sql_hostname, sql_port)) as tunnel:
>>> conn = pymysql.connect(host=local_host_ip_address, user=sql_username,
>>>passwd=sql_password, db=sql_main_database,
>>>port=tunnel.local_bind_port)
>>> df = pd.read_sql_query(b1_semester_sql, conn)
>>> spark.createDataFrame(df).createOrReplaceTempView("b1_semester")
>>> 
>>> So wanted to check what I am missing with my Spark usage. Please help.
>>> 
>>> Thanks,
>>> Venkat
>>> 
>> 



Re: SSH Tunneling issue with Apache Spark

2023-12-06 Thread Venkatesan Muniappan
Thanks for the advice Nicholas.

As mentioned in the original email, I have tried JDBC + SSH Tunnel using
pymysql and sshtunnel and it worked fine. The problem happens only with
Spark.

*Thanks,*
*Venkat*



On Wed, Dec 6, 2023 at 10:21 PM Nicholas Chammas 
wrote:

> This is not a question for the dev list. Moving dev to bcc.
>
> One thing I would try is to connect to this database using JDBC + SSH
> tunnel, but without Spark. That way you can focus on getting the JDBC
> connection to work without Spark complicating the picture for you.
>
>
> On Dec 5, 2023, at 8:12 PM, Venkatesan Muniappan <
> venkatesa...@noonacademy.com> wrote:
>
> Hi Team,
>
> I am facing an issue with SSH Tunneling in Apache Spark. The behavior is
> same as the one in this Stackoverflow question
> 
> but there are no answers there.
>
> This is what I am trying:
>
>
> with SSHTunnelForwarder(
> (ssh_host, ssh_port),
> ssh_username=ssh_user,
> ssh_pkey=ssh_key_file,
> remote_bind_address=(sql_hostname, sql_port),
> local_bind_address=(local_host_ip_address, sql_port)) as tunnel:
> tunnel.local_bind_port
> b1_semester_df = spark.read \
> .format("jdbc") \
> .option("url", b2b_mysql_url.replace("<>", 
> str(tunnel.local_bind_port)))
> \
> .option("query", b1_semester_sql) \
> .option("database", 'b2b') \
> .option("password", b2b_mysql_password) \
> .option("driver", "com.mysql.cj.jdbc.Driver") \
> .load()
> b1_semester_df.count()
>
> Here, the b1_semester_df is loaded but when I try count on the same Df it
> fails saying this
>
> 23/12/05 11:49:17 ERROR TaskSetManager: Task 0 in stage 2.0 failed 4
> times; aborting job
> Traceback (most recent call last):
>   File "", line 1, in 
>   File "/usr/lib/spark/python/pyspark/sql/dataframe.py", line 382, in show
> print(self._jdf.showString(n, 20, vertical))
>   File
> "/usr/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line
> 1257, in __call__
>   File "/usr/lib/spark/python/pyspark/sql/utils.py", line 63, in deco
> return f(*a, **kw)
>   File "/usr/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py",
> line 328, in get_return_value
> py4j.protocol.Py4JJavaError: An error occurred while calling
> o284.showString.
> : org.apache.spark.SparkException: Job aborted due to stage failure: Task
> 0 in stage 2.0 failed 4 times, most recent failure: Lost task 0.3 in stage
> 2.0 (TID 11, ip-172-32-108-1.eu-central-1.compute.internal, executor 3):
> com.mysql.cj.jdbc.exceptions.CommunicationsException: Communications link
> failure
>
> However, the same is working fine with pandas df. I have tried this below
> and it worked.
>
>
> with SSHTunnelForwarder(
> (ssh_host, ssh_port),
> ssh_username=ssh_user,
> ssh_pkey=ssh_key_file,
> remote_bind_address=(sql_hostname, sql_port)) as tunnel:
> conn = pymysql.connect(host=local_host_ip_address, user=sql_username,
> passwd=sql_password, db=sql_main_database,
> port=tunnel.local_bind_port)
> df = pd.read_sql_query(b1_semester_sql, conn)
> spark.createDataFrame(df).createOrReplaceTempView("b1_semester")
>
> So wanted to check what I am missing with my Spark usage. Please help.
>
> *Thanks,*
> *Venkat*
>
>
>


Re: ordering of rows in dataframe

2023-12-05 Thread Enrico Minack

Looks like what you want is to add a column that, when ordered by that
column, the current order of the dateframe is preserved.

All you need is the monotonically_increasing_id() function:

spark.range(0, 10, 1, 5).withColumn("row",
monotonically_increasing_id()).show()
+---+---+
| id|    row|
+---+---+
|  0|  0|
|  1|  1|
|  2| 8589934592|
|  3| 8589934593|
|  4|17179869184|
|  5|17179869185|
|  6|25769803776|
|  7|25769803777|
|  8|34359738368|
|  9|34359738369|
+---+---+

Within a partition, all columns have consecutive row numbers, the start
of a new partition observes a jump in the row number. The example above
has 5 partitions with 2 rows each.

In case you need a global consecutive row number (not needed to preserve
current dataframe order as you want it), you can use the
Dataframe.with_row_numbers() method provided by the Spark-Extension
package:
https://github.com/G-Research/spark-extension/blob/master/ROW_NUMBER.md

import gresearch.spark

df.with_row_numbers().show()
+---+--+
| id|row_number|
+---+--+
|  1| 1|
|  2| 2|
|  2| 3|
|  3| 4|
+---+--+

Cheers,
Enrico



Am 05.12.23 um 18:25 schrieb Som Lima:


want to maintain the order of the rows in the data frame in Pyspark.
Is there any way to achieve this for this function here we have the
row ID which will give numbering to each row. Currently, the below
function results in the rearrangement of the row in the data frame.

|def createRowIdColumn(new_column, position, start_value): row_count =
df.count() row_ids = spark.range(int(start_value), int(start_value) +
row_count, 1).toDF(new_column) window = Window.orderBy(lit(1))
df_row_ids = row_ids.withColumn("row_num", row_number().over(window) -
1) df_with_row_num = df.withColumn("row_num",
row_number().over(window) - 1) if position == "Last Column": result =
df_with_row_num.join(df_row_ids, on="row_num").drop("row_num") else:
result = df_row_ids.join(df_with_row_num,
on="row_num").drop("row_num") return result.orderBy(new_column) |

Please let me know the solution if we can achieve this requirement.




Re: [PySpark][Spark Dataframe][Observation] Why empty dataframe join doesn't let you get metrics from observation?

2023-12-05 Thread Enrico Minack

Hi Michail,

with spark.conf.set("spark.sql.planChangeLog.level", "WARN") you can see
how Spark optimizes the query plan.

In PySpark, the plan is optimized into

Project ...
  +- CollectMetrics 2, [count(1) AS count(1)#200L]
  +- LocalTableScan , [col1#125, col2#126L, col3#127, col4#132L]

The entire join gets optimized away into an empty table. Looks like it
figures out that df has no rows with col1 = 'c'. So df is never consumed
/ iterated, so the observation does not retrieve any metrics.

In Scala, the optimization is different:

*(2) Project ...
  +- CollectMetrics 2, [count(1) AS count(1)#63L]
     +- *(1) Project [col1#37, col2#38, col3#39, cast(null as int) AS
col4#51]
        +- *(1) Filter (isnotnull(col1#37) AND (col1#37 = c))
       +- CollectMetrics 1, [count(1) AS count(1)#56L]
          +- LocalTableScan [col1#37, col2#38, col3#39]

where the join also gets optimized away, but table df is still filtered
for col1 = 'c', which iterates over the rows and collects the metrics
for observation 1.

Hope this helps to understand why there are no observed metrics for
Observation("1") in your case.

Enrico



Am 04.12.23 um 10:45 schrieb Enrico Minack:

Hi Michail,

observations as well as ordinary accumulators only observe / process
rows that are iterated / consumed by downstream stages. If the query
plan decides to skip one side of the join, that one will be removed from
the final plan completely. Then, the Observation will not retrieve any
metrics and .get waits forever. Definitively not helpful.

When creating the Observation class, we thought about a timeout for the
get method but could not find a use case where the user would call get
without first executing the query. Here is a scenario where though
executing the query there is no observation result. We will rethink this.

Interestingly, your example works in Scala:

import org.apache.spark.sql.Observation

val df = Seq(("a", 1, "1 2 3 4"), ("b", 2, "1 2 3 4")).toDF("col1",
"col2", "col3")
val df_join = Seq(("a", 6), ("b", 5)).toDF("col1", "col4")

val o1 = Observation()
val o2 = Observation()

val df1 = df.observe(o1, count("*")).filter("col1 = 'c'")
val df2 = df1.join(df_join, "col1", "left").observe(o2, count("*"))

df2.show()
+++++
|col1|col2|col3|col4|
+++++
+++++

o1.get
Map[String,Any] = Map(count(1) -> 2)

o2.get
Map[String,Any] = Map(count(1) -> 0)


Pyspark and Scala should behave identically here. I will investigate.

Cheers,
Enrico



Am 02.12.23 um 17:11 schrieb Михаил Кулаков:

Hey folks, I actively using observe method on my spark jobs and
noticed interesting behavior:
Here is an example of working and non working code:
https://gist.github.com/Coola4kov/8aeeb05abd39794f8362a3cf1c66519c


In a few words, if I'm joining dataframe after some filter rules and
it became empty, observations configured on the first dataframe never
return any results, unless some action called on the empty dataframe
specifically before join.

Looks like a bug to me, I will appreciate any advice on how to fix
this behavior.




-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark-Connect: Param `--packages` does not take effect for executors.

2023-12-04 Thread Holden Karau
So I think this sounds like a bug to me, in the help options for both
regular spark-submit and ./sbin/start-connect-server.sh we say:
"  --packages  Comma-separated list of maven coordinates of
jars to include
  on the driver and executor classpaths. Will
search the local
  maven repo, then maven central and any
additional remote
  repositories given by --repositories. The
format for the
  coordinates should be
groupId:artifactId:version."

If the behaviour is intentional for spark-connect it would be good to
understand why (and then also update the docs).

On Mon, Dec 4, 2023 at 3:33 PM Aironman DirtDiver 
wrote:

> The issue you're encountering with the iceberg-spark-runtime dependency
> not being properly passed to the executors in your Spark Connect server
> deployment could be due to a couple of factors:
>
>1.
>
>*Spark Submit Packaging:* When you use the --packages parameter in
>spark-submit, it only adds the JARs to the driver classpath. The
>executors still need to download and load the JARs separately. This
>can lead to issues if the JARs are not accessible from the executors, such
>as when running in a distributed environment like Kubernetes.
>2.
>
>*Kubernetes Container Image:* The Spark Connect server container image
>(xxx/spark-py:3.5-prd) might not have the iceberg-spark-runtime dependency
>pre-installed. This means that even if the JARs are available on the
>driver,the executors won't have access to them.
>
> To address this issue, consider the following solutions:
>
>1.
>
>*Package Dependencies into Image:* As you mentioned, packaging the
>required dependencies into your container image is a viable option. This
>ensures that the executors have direct access to the JARs, eliminating
>the need for downloading or copying during job execution.
>2.
>
>*Use Spark Submit with --jars Option:* Instead of relying on --packages
>, you can explicitly specify the JARs using the --jars option in
>spark-submit. This will package the JARs into the Spark application's
>submission directory, ensuring that they are available to both the
>driver and executors.
>3.
>
>*Mount JARs as Shared Volume:* If the iceberg-spark-runtime dependency
>is already installed on the cluster nodes,you can mount the JARs as a
>shared volume accessible to both the driver and executors. This avoids
>the need to package or download the JARs.
>Mounting JARs as a shared volume in your Spark Connect server
>deployment involves creating a shared volume that stores the JARs and then
>mounting that volume to both the driver and executor containers. Here's a
>step-by-step guide:
>
>Create a Shared Volume: Create a shared volume using a persistent
>storage solution like NFS, GlusterFS, or AWS EFS. Ensure that all cluster
>nodes have access to the shared volume.
>
>Copy JARs to Shared Volume: Copy the required JARs, including
>iceberg-spark-runtime, to the shared volume. This will make them accessible
>to both the driver and executor containers.
>
>Mount Shared Volume to Driver Container: In your Spark Connect server
>deployment configuration, specify the shared volume as a mount point for
>the driver container. This will make the JARs available to the driver.
>
>Mount Shared Volume to Executor Containers: In the Spark Connect
>server deployment configuration, specify the shared volume as a mount point
>for the executor containers. This will make the JARs available to the
>executors.
>
>Update Spark Connect Server Configuration: In your Spark Connect
>server configuration, ensure that the spark.sql.catalogImplementation
>property is set to iceberg. This will instruct Spark to use the Iceberg
>catalog implementation.
>
>By following these steps, you can successfully mount JARs as a shared
>volume in your Spark Connect server deployment, eliminating the need to
>package or download the JARs.
>4.
>
>*Use Spark Connect Server with Remote Resources:* Spark Connect Server
>supports accessing remote resources,such as JARs stored in a distributed
>file system or a cloud storage service. By configuring Spark Connect
>Server to use remote resources, you can avoid packaging the
>dependencies into the container image.
>
> By implementing one of these solutions, you should be able to resolve the
> issue of the iceberg-spark-runtime dependency not being properly passed to
> the executors in your Spark Connect server deployment.
>
> Let me know if any of the proposal works for you.
>
> Alonso
>
> El lun, 4 dic 2023 a las 11:44, Xiaolong Wang
> () escribió:
>
>> Hi, Spark community,
>>
>> I encountered a weird bug when using Spark Connect server to integrate
>> with Iceberg. I added the 

Re: Do we have any mechanism to control requests per second for a Kafka connect sink?

2023-12-04 Thread Yeikel Santana
Apologies to everyone. I sent this to the wrong email list. Please discard







 On Mon, 04 Dec 2023 10:48:11 -0500 Yeikel Santana  wrote 
---



Hello everyone,



Is there any mechanism to force Kafka Connect to ingest at a given rate per 
second as opposed to tasks?



I am operating in a shared environment where the ingestion rate needs to be as 
low as possible (for example, 5 requests/second as an upper limit), and as far 
as I can tell, `tasks` are the main unit of work we can use.



My current understanding is that a task will be blocked to process one batch, 
and it will continue to the next batch as soon as the previous request is 
completed. This should mean that if the target server can process the requests 
at a higher rate, then the sink will continue sending at that rate.



However, in my scenario, what I need is to send n requests per second and then 
sit idle until that time passes to avoid overloading the target server. 



In this specific example, my best attempt to control the throughput was to 
configure it something like:



```json

"connector.class": 
"io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",

"tasks.max": "1",

"max.retries": "10",

"retry.backoff.ms": "1000",

"max.buffered.records": "100",

"batch.size": "100",

"max.in.flight.requests": "1",

"flush.synchronously": "true",

```



Unfortunately, while that helps, it does not solve the inherent problem. I also 
understand that this is very specific to the given Sink Connector, but my 
question is more about a global overwrite that could be applied if any.



I also suppose that I could add a `Thread.sleep` call as an SMT, but that does 
not sound like a good solution. 



Thank you!

Re: Spark-Connect: Param `--packages` does not take effect for executors.

2023-12-04 Thread Aironman DirtDiver
The issue you're encountering with the iceberg-spark-runtime dependency not
being properly passed to the executors in your Spark Connect server
deployment could be due to a couple of factors:

   1.

   *Spark Submit Packaging:* When you use the --packages parameter in
   spark-submit, it only adds the JARs to the driver classpath. The
   executors still need to download and load the JARs separately. This can
   lead to issues if the JARs are not accessible from the executors, such
   as when running in a distributed environment like Kubernetes.
   2.

   *Kubernetes Container Image:* The Spark Connect server container image
   (xxx/spark-py:3.5-prd) might not have the iceberg-spark-runtime dependency
   pre-installed. This means that even if the JARs are available on the
   driver,the executors won't have access to them.

To address this issue, consider the following solutions:

   1.

   *Package Dependencies into Image:* As you mentioned, packaging the
   required dependencies into your container image is a viable option. This
   ensures that the executors have direct access to the JARs, eliminating
   the need for downloading or copying during job execution.
   2.

   *Use Spark Submit with --jars Option:* Instead of relying on --packages, you
   can explicitly specify the JARs using the --jars option in
spark-submit. This
   will package the JARs into the Spark application's submission
directory, ensuring
   that they are available to both the driver and executors.
   3.

   *Mount JARs as Shared Volume:* If the iceberg-spark-runtime dependency
   is already installed on the cluster nodes,you can mount the JARs as a
   shared volume accessible to both the driver and executors. This avoids
   the need to package or download the JARs.
   Mounting JARs as a shared volume in your Spark Connect server deployment
   involves creating a shared volume that stores the JARs and then mounting
   that volume to both the driver and executor containers. Here's a
   step-by-step guide:

   Create a Shared Volume: Create a shared volume using a persistent
   storage solution like NFS, GlusterFS, or AWS EFS. Ensure that all cluster
   nodes have access to the shared volume.

   Copy JARs to Shared Volume: Copy the required JARs, including
   iceberg-spark-runtime, to the shared volume. This will make them accessible
   to both the driver and executor containers.

   Mount Shared Volume to Driver Container: In your Spark Connect server
   deployment configuration, specify the shared volume as a mount point for
   the driver container. This will make the JARs available to the driver.

   Mount Shared Volume to Executor Containers: In the Spark Connect server
   deployment configuration, specify the shared volume as a mount point for
   the executor containers. This will make the JARs available to the executors.

   Update Spark Connect Server Configuration: In your Spark Connect server
   configuration, ensure that the spark.sql.catalogImplementation property is
   set to iceberg. This will instruct Spark to use the Iceberg catalog
   implementation.

   By following these steps, you can successfully mount JARs as a shared
   volume in your Spark Connect server deployment, eliminating the need to
   package or download the JARs.
   4.

   *Use Spark Connect Server with Remote Resources:* Spark Connect Server
   supports accessing remote resources,such as JARs stored in a distributed
   file system or a cloud storage service. By configuring Spark Connect
   Server to use remote resources, you can avoid packaging the dependencies
   into the container image.

By implementing one of these solutions, you should be able to resolve the
issue of the iceberg-spark-runtime dependency not being properly passed to
the executors in your Spark Connect server deployment.

Let me know if any of the proposal works for you.

Alonso

El lun, 4 dic 2023 a las 11:44, Xiaolong Wang
() escribió:

> Hi, Spark community,
>
> I encountered a weird bug when using Spark Connect server to integrate
> with Iceberg. I added the iceberg-spark-runtime dependency with
> `--packages` param, the driver/connect-server pod did get the correct
> dependencies. But when looking at the executor's library, the jar was not
> properly passed.
>
> To work around this, I need to package the required dependencies into my
> image which is something not flexible and elegant.
>
> I'm wondering if anyone has seen this kind of error before.
>
> FYI, my Spark Connect server deployment looks something like the following:
>
> apiVersion: apps/v1
>> kind: Deployment
>> metadata:
>> labels:
>> app: spark-connect-ads
>> component: spark-connect
>> name: spark-connect-ads
>> namespace: realtime-streaming
>> spec:
>> selector:
>> matchLabels:
>> app: spark-connect-ads
>> component: spark-connect
>> template:
>> metadata:
>> labels:
>> app: spark-connect-ads
>> component: spark-connect
>> name: spark-connect-ads
>> namespace: realtime-streaming
>> spec:
>> containers:
>> - 

Re: [PySpark][Spark Dataframe][Observation] Why empty dataframe join doesn't let you get metrics from observation?

2023-12-04 Thread Enrico Minack

Hi Michail,

observations as well as ordinary accumulators only observe / process
rows that are iterated / consumed by downstream stages. If the query
plan decides to skip one side of the join, that one will be removed from
the final plan completely. Then, the Observation will not retrieve any
metrics and .get waits forever. Definitively not helpful.

When creating the Observation class, we thought about a timeout for the
get method but could not find a use case where the user would call get
without first executing the query. Here is a scenario where though
executing the query there is no observation result. We will rethink this.

Interestingly, your example works in Scala:

import org.apache.spark.sql.Observation

val df = Seq(("a", 1, "1 2 3 4"), ("b", 2, "1 2 3 4")).toDF("col1",
"col2", "col3")
val df_join = Seq(("a", 6), ("b", 5)).toDF("col1", "col4")

val o1 = Observation()
val o2 = Observation()

val df1 = df.observe(o1, count("*")).filter("col1 = 'c'")
val df2 = df1.join(df_join, "col1", "left").observe(o2, count("*"))

df2.show()
+++++
|col1|col2|col3|col4|
+++++
+++++

o1.get
Map[String,Any] = Map(count(1) -> 2)

o2.get
Map[String,Any] = Map(count(1) -> 0)


Pyspark and Scala should behave identically here. I will investigate.

Cheers,
Enrico



Am 02.12.23 um 17:11 schrieb Михаил Кулаков:

Hey folks, I actively using observe method on my spark jobs and
noticed interesting behavior:
Here is an example of working and non working code:
https://gist.github.com/Coola4kov/8aeeb05abd39794f8362a3cf1c66519c


In a few words, if I'm joining dataframe after some filter rules and
it became empty, observations configured on the first dataframe never
return any results, unless some action called on the empty dataframe
specifically before join.

Looks like a bug to me, I will appreciate any advice on how to fix
this behavior.




-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: [FYI] SPARK-45981: Improve Python language test coverage

2023-12-02 Thread Hyukjin Kwon
Awesome!

On Sat, Dec 2, 2023 at 2:33 PM Dongjoon Hyun 
wrote:

> Hi, All.
>
> As a part of Apache Spark 4.0.0 (SPARK-44111), the Apache Spark community
> starts to have test coverage for all supported Python versions from Today.
>
> - https://github.com/apache/spark/actions/runs/7061665420
>
> Here is a summary.
>
> 1. Main CI: All PRs and commits on `master` branch are tested with Python
> 3.9.
> 2. Daily CI:
> https://github.com/apache/spark/actions/workflows/build_python.yml
> - PyPy 3.8
> - Python 3.10
> - Python 3.11
> - Python 3.12
>
> This is a great addition for PySpark 4.0+ users and an extensible
> framework for all future Python versions.
>
> Thank you all for making this together!
>
> Best,
> Dongjoon.
>


Re: [Streaming (DStream) ] : Does Spark Streaming supports pause/resume consumption of message from Kafka?

2023-12-01 Thread Mich Talebzadeh
Ok pause/continue to throw some challenges.

The implication is to pause gracefully and resume the same' First have a
look at this SPIP of mine

[SPARK-42485] SPIP: Shutting down spark structured streaming when the
streaming process completed current process - ASF JIRA (apache.org)
<https://issues.apache.org/jira/browse/SPARK-42485>

<https://issues.apache.org/jira/browse/SPARK-42485>Then we can assume a
graceful pause/restart

As a suggestion, to implement conditional pausing and resuming, you can
introduce a flag or control signal within your DStream processing logic.
When the condition for pausing is met, the stop() method is called to
temporarily halt message processing. Conversely, when the condition for
resuming is met, the start() method is invoked to re-enable message
consumption.

Let us have a go at it

is_paused = False def process_stream(message): global is_paused if not
is_paused: # Perform processing logic here print(message) # Check for
pausing condition if should_pause(message): is_paused = True stream.stop() #
Check for resuming condition if should_resume() and is_paused: is_paused =
False stream.start() stream = DStream(source)
stream.foreach(process_stream) stream.start()

HTH

Mich Talebzadeh,
Distinguished Technologist, Solutions Architect & Engineer
London
United Kingdom


   view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>


 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Fri, 1 Dec 2023 at 12:56, Saurabh Agrawal (180813)
 wrote:

> Hi Spark Team,
>
> I am using Spark 3.4.0 version in my application which is use to consume
> messages from Kafka topics.
>
> I have below queries:
>
> 1. Does DStream support pause/resume streaming message consumption at
> runtime on particular condition? If yes, please provide details.
>
> 2. I tried to revoke partition from consumer at runtime which cause error.
>
>
>
> *throw new IllegalStateException(s"Previously tracked partitions " +*
>
> *s"${revokedPartitions.mkString("[", ",", "]")} been revoked by
> Kafka because of consumer " +*
>
> *s"rebalance. This is mostly due to another stream with same group
> id joined, " +*
>
> *s"please check if there're different streaming application
> misconfigure to use same " +*
>
> *s"group id. Fundamentally different stream should use different
> group id")*
>
>
>
>
>
> 3. Does Spark support Blue/Green Deployment. I need to implement
> Blue/Green Deployment scenario with Spark. Facing problem as need to deploy
> both Blue and Green deployment with same consumer-group-id. As I read,
> spark does not support 2 deployment with same consumer group-id, this
> implementation is failing. Please guide how this can be implemented with
> Spark.
>
> 4. Does Spark support Active-Active deployment.
>
>
>
> It will be great if you can reply on above queries please.
>
>
>
> --
>
>
> * Regards,*
>
> *Saurabh Agrawal*
>
> [image: Image]
>
> Software Development Specialist, IPaaS R
> [image: A picture containing logoDescription automatically generated]
>
>
>
>
>
> *This email and the information contained herein is proprietary and
> confidential and subject to the Amdocs Email Terms of Service, which you
> may review at* *https://www.amdocs.com/about/email-terms-of-service*
> <https://www.amdocs.com/about/email-terms-of-service>
>


Re:[ANNOUNCE] Apache Spark 3.4.2 released

2023-11-30 Thread beliefer
Congratulations!







At 2023-12-01 01:23:55, "Dongjoon Hyun"  wrote:

We are happy to announce the availability of Apache Spark 3.4.2!

Spark 3.4.2 is a maintenance release containing many fixes including
security and correctness domains. This release is based on the
branch-3.4 maintenance branch of Spark. We strongly
recommend all 3.4 users to upgrade to this stable release.

To download Spark 3.4.2, head over to the download page:
https://spark.apache.org/downloads.html

To view the release notes:
https://spark.apache.org/releases/spark-release-3-4-2.html

We would like to acknowledge all community members for contributing to this
release. This release would not have been possible without you.

Dongjoon Hyun


Re: Tuning Best Practices

2023-11-29 Thread Bryant Wright
Thanks, Jack!

Please let me know if you find any other guides specific to tuning shuffles
and joins.

Currently, the best way I know how to handle joins across large datasets
that can't be broadcast is by rewriting the source tables HIVE partitioned
by one or two join keys, and then breaking down the joins into stages with
intermediate write steps for a join across a handful of tables with a new
HIVE partition scheme that suits the next set of joins. I perform these
joins in a python loop over the HIVE partitions to minimize load.

I imagine there's more I could do to reduce the amount of manual coding and
intermediate write steps.

I'll start with these docs!

Thanks,

Bryant

On Tue, Nov 28, 2023, 5:23 PM Jack Goodson  wrote:

> Hi Bryant,
>
> the below docs are a good start on performance tuning
>
> https://spark.apache.org/docs/latest/sql-performance-tuning.html
>
> Hope it helps!
>
> On Wed, Nov 29, 2023 at 9:32 AM Bryant Wright 
> wrote:
>
>> Hi, I'm looking for a comprehensive list of Tuning Best Practices for
>> spark.
>>
>> I did a search on the archives for "tuning" and the search returned no
>> results.
>>
>> Thanks for your help.
>>
>


RE: Re: Spark Compatibility with Spring Boot 3.x

2023-11-29 Thread Guru Panda
Team,
Do we have any updates when spark 4.x version will release in order to
address below issues related to  > java.lang.NoClassDefFoundError:
javax/servlet/Servlet


Thanks and Regards,
Guru
On 2023/10/05 17:19:51 Angshuman Bhattacharya wrote:
> Thanks Ahmed. I am trying to bring this up with Spark DE community
>
> On Thu, Oct 5, 2023 at 12:32 PM Ahmed Albalawi <
> ahmed.albal...@capitalone.com> wrote:
>
> > Hello team,
> >
> > We are in the process of upgrading one of our apps to Spring Boot 3.x
> > while using Spark, and we have encountered an issue with Spark
> > compatibility, specifically with Jakarta Servlet. Spring Boot 3.x uses
> > Jakarta Servlet while Spark uses Javax Servlet. Can we get some
guidance on
> > how to upgrade to Spring Boot 3.x while continuing to use Spark.
> >
> > The specific error is listed below:
> >
> > java.lang.NoClassDefFoundError: javax/servlet/Servlet
> > at org.apache.spark.ui.SparkUI$.create(SparkUI.scala:239)
> > at org.apache.spark.SparkContext.(SparkContext.scala:503)
> > at
org.apache.spark.SparkContext$.getOrCreate(SparkContext.scala:2888)
> > at org.apache.spark.SparkContext.getOrCreate(SparkContext.scala)
> >
> > The error comes up when we try to run a mvn clean install, and the
issue is in our test cases. This issue happens specifically when we build
our spark session. The line of code it traces down to is as follows:
> >
> > *session =
SparkSession.builder().sparkContext(SparkContext.getOrCreate(sparkConf)).getOrCreate();*
> >
> > What we have tried:
> >
> > - We noticed according to this post <
https://urldefense.com/v3/__https://stackoverflow.com/questions/75690210/apache-spark-with-spring-boot-failed-to-start-exception-factory-method-javasp__;!!EFVe01R3CjU!fdH40mQbStFzaUfVJ7JR-ZDD8DFiYsPseqp-yRu1BAFs8Zc3PZGVzaqF39BbL5vGOwm3mXNcTF1_O_mk_uj9FaBlEplW-4Oh7jceadvuqg$>,
there are no compatible versions of spark using version 5 of the Jakarta
Servlet API
> >
> > - We've tried <
https://urldefense.com/v3/__https://stackoverflow.com/questions/76618374/spark-3-4-1-jakarta-servlet-6-0-0-compatibility-issue__;!!EFVe01R3CjU!fdH40mQbStFzaUfVJ7JR-ZDD8DFiYsPseqp-yRu1BAFs8Zc3PZGVzaqF39BbL5vGOwm3mXNcTF1_O_mk_uj9FaBlEplW-4Oh7jeV6Df7lQ$>
using the maven shade plugin to use jakarta instead of javax, but are
running into some other issues with this.
> > - We've also looked at the following <
https://urldefense.com/v3/__https://stackoverflow.com/questions/75350944/dependecy-conflict-apache-spark-and-spring-boot__;!!EFVe01R3CjU!fdH40mQbStFzaUfVJ7JR-ZDD8DFiYsPseqp-yRu1BAFs8Zc3PZGVzaqF39BbL5vGOwm3mXNcTF1_O_mk_uj9FaBlEplW-4Oh7jeiYCkybw$>
to use jakarta 4.x with jersey 2.x and still have an issue with the servlet
> >
> >
> > Please let us know if there are any solutions to this issue. Thanks!
> >
> >
> > --
> > *Ahmed Albalawi*
> >
> > Senior Associate Software Engineer • EP2 Tech - CuRE
> >
> > 571-668-3911 •  1680 Capital One Dr.
> >
>
> __
>
>
>
> The information contained in this e-mail may be confidential and/or
proprietary to Capital One and/or its affiliates and may only be used
solely in performance of work or services for Capital One. The information
transmitted herewith is intended only for use by the individual or entity
to which it is addressed. If the reader of this message is not the intended
recipient, you are hereby notified that any review, retransmission,
dissemination, distribution, copying or other use of, or taking of any
action in reliance upon this information is strictly prohibited. If you
have received this communication in error, please contact the sender and
delete the material from your computer.
>
>
>
>


Re: Tuning Best Practices

2023-11-28 Thread Jack Goodson
Hi Bryant,

the below docs are a good start on performance tuning

https://spark.apache.org/docs/latest/sql-performance-tuning.html

Hope it helps!

On Wed, Nov 29, 2023 at 9:32 AM Bryant Wright 
wrote:

> Hi, I'm looking for a comprehensive list of Tuning Best Practices for
> spark.
>
> I did a search on the archives for "tuning" and the search returned no
> results.
>
> Thanks for your help.
>


Re: Classpath isolation per SparkSession without Spark Connect

2023-11-28 Thread Pasha Finkelshtein
I actually think it should be totally possible to use it on an executor
side. Maybe it will require a small extension/udf, but generally no issues
here. Pf4j is very lightweight, so you'll only have a small overhead for
classloaders.

There's still a small question of distribution of plugins/extensions, but
you probably already have a storage and can store them there.



[image: facebook] 
[image: twitter] 
[image: linkedin] 
[image: instagram] 

Pasha Finkelshteyn

Developer Advocate for Data Engineering

JetBrains



asm0...@jetbrains.com
https://linktr.ee/asm0dey

Find out more 



On Tue, 28 Nov 2023 at 17:04, Faiz Halde  wrote:

> Hey Pasha,
>
> Is your suggestion towards the spark team? I can make use of the plugin
> system on the driver side of spark but considering spark is distributed,
> the executor side of spark needs to adapt to the pf4j framework I believe
> too
>
> Thanks
> Faiz
>
> On Tue, Nov 28, 2023, 16:57 Pasha Finkelshtein <
> pavel.finkelsht...@gmail.com> wrote:
>
>> To me it seems like it's the best possible use case for PF4J.
>>
>>
>> [image: facebook] 
>> [image: twitter] 
>> [image: linkedin] 
>> [image: instagram] 
>>
>> Pasha Finkelshteyn
>>
>> Developer Advocate for Data Engineering
>>
>> JetBrains
>>
>>
>>
>> asm0...@jetbrains.com
>> https://linktr.ee/asm0dey
>>
>> Find out more 
>>
>>
>>
>> On Tue, 28 Nov 2023 at 12:47, Holden Karau 
>> wrote:
>>
>>> So I don’t think we make any particular guarantees around class path
>>> isolation there, so even if it does work it’s something you’d need to pay
>>> attention to on upgrades. Class path isolation is tricky to get right.
>>>
>>> On Mon, Nov 27, 2023 at 2:58 PM Faiz Halde  wrote:
>>>
 Hello,

 We are using spark 3.5.0 and were wondering if the following is
 achievable using spark-core

 Our use case involves spinning up a spark cluster where the driver
 application loads user jars containing spark transformations at runtime. A
 single spark application can load multiple user jars ( same cluster ) that
 can have class path conflicts if care is not taken

 AFAIK, to get this right requires the Executor to be designed in a way
 that allows for class path isolation ( UDF, lambda expressions ). Ideally
 per Spark Session is what we want

 I know Spark connect has been designed this way but Spark connect is
 not an option for us at the moment. I had some luck using a private method
 inside spark called JobArtifactSet.withActiveJobArtifactState

 Is it sufficient for me to run the user code enclosed
 within JobArtifactSet.withActiveJobArtifactState to achieve my requirement?

 Thank you


 Faiz

>>>


Re: Classpath isolation per SparkSession without Spark Connect

2023-11-28 Thread Faiz Halde
Hey Pasha,

Is your suggestion towards the spark team? I can make use of the plugin
system on the driver side of spark but considering spark is distributed,
the executor side of spark needs to adapt to the pf4j framework I believe
too

Thanks
Faiz

On Tue, Nov 28, 2023, 16:57 Pasha Finkelshtein 
wrote:

> To me it seems like it's the best possible use case for PF4J.
>
>
> [image: facebook] 
> [image: twitter] 
> [image: linkedin] 
> [image: instagram] 
>
> Pasha Finkelshteyn
>
> Developer Advocate for Data Engineering
>
> JetBrains
>
>
>
> asm0...@jetbrains.com
> https://linktr.ee/asm0dey
>
> Find out more 
>
>
>
> On Tue, 28 Nov 2023 at 12:47, Holden Karau  wrote:
>
>> So I don’t think we make any particular guarantees around class path
>> isolation there, so even if it does work it’s something you’d need to pay
>> attention to on upgrades. Class path isolation is tricky to get right.
>>
>> On Mon, Nov 27, 2023 at 2:58 PM Faiz Halde  wrote:
>>
>>> Hello,
>>>
>>> We are using spark 3.5.0 and were wondering if the following is
>>> achievable using spark-core
>>>
>>> Our use case involves spinning up a spark cluster where the driver
>>> application loads user jars containing spark transformations at runtime. A
>>> single spark application can load multiple user jars ( same cluster ) that
>>> can have class path conflicts if care is not taken
>>>
>>> AFAIK, to get this right requires the Executor to be designed in a way
>>> that allows for class path isolation ( UDF, lambda expressions ). Ideally
>>> per Spark Session is what we want
>>>
>>> I know Spark connect has been designed this way but Spark connect is not
>>> an option for us at the moment. I had some luck using a private method
>>> inside spark called JobArtifactSet.withActiveJobArtifactState
>>>
>>> Is it sufficient for me to run the user code enclosed
>>> within JobArtifactSet.withActiveJobArtifactState to achieve my requirement?
>>>
>>> Thank you
>>>
>>>
>>> Faiz
>>>
>>


Re: Classpath isolation per SparkSession without Spark Connect

2023-11-28 Thread Pasha Finkelshtein
To me it seems like it's the best possible use case for PF4J.


[image: facebook] 
[image: twitter] 
[image: linkedin] 
[image: instagram] 

Pasha Finkelshteyn

Developer Advocate for Data Engineering

JetBrains



asm0...@jetbrains.com
https://linktr.ee/asm0dey

Find out more 



On Tue, 28 Nov 2023 at 12:47, Holden Karau  wrote:

> So I don’t think we make any particular guarantees around class path
> isolation there, so even if it does work it’s something you’d need to pay
> attention to on upgrades. Class path isolation is tricky to get right.
>
> On Mon, Nov 27, 2023 at 2:58 PM Faiz Halde  wrote:
>
>> Hello,
>>
>> We are using spark 3.5.0 and were wondering if the following is
>> achievable using spark-core
>>
>> Our use case involves spinning up a spark cluster where the driver
>> application loads user jars containing spark transformations at runtime. A
>> single spark application can load multiple user jars ( same cluster ) that
>> can have class path conflicts if care is not taken
>>
>> AFAIK, to get this right requires the Executor to be designed in a way
>> that allows for class path isolation ( UDF, lambda expressions ). Ideally
>> per Spark Session is what we want
>>
>> I know Spark connect has been designed this way but Spark connect is not
>> an option for us at the moment. I had some luck using a private method
>> inside spark called JobArtifactSet.withActiveJobArtifactState
>>
>> Is it sufficient for me to run the user code enclosed
>> within JobArtifactSet.withActiveJobArtifactState to achieve my requirement?
>>
>> Thank you
>>
>>
>> Faiz
>>
>


Re: Classpath isolation per SparkSession without Spark Connect

2023-11-27 Thread Faiz Halde
Thanks Holden,

So you're saying even Spark connect is not going to provide that guarantee?
The code referred to above is taken up from Spark connect implementation

Could you explain which parts are tricky to get right? Just to be well
prepared of the consequences

On Tue, Nov 28, 2023, 01:30 Holden Karau  wrote:

> So I don’t think we make any particular guarantees around class path
> isolation there, so even if it does work it’s something you’d need to pay
> attention to on upgrades. Class path isolation is tricky to get right.
>
> On Mon, Nov 27, 2023 at 2:58 PM Faiz Halde  wrote:
>
>> Hello,
>>
>> We are using spark 3.5.0 and were wondering if the following is
>> achievable using spark-core
>>
>> Our use case involves spinning up a spark cluster where the driver
>> application loads user jars containing spark transformations at runtime. A
>> single spark application can load multiple user jars ( same cluster ) that
>> can have class path conflicts if care is not taken
>>
>> AFAIK, to get this right requires the Executor to be designed in a way
>> that allows for class path isolation ( UDF, lambda expressions ). Ideally
>> per Spark Session is what we want
>>
>> I know Spark connect has been designed this way but Spark connect is not
>> an option for us at the moment. I had some luck using a private method
>> inside spark called JobArtifactSet.withActiveJobArtifactState
>>
>> Is it sufficient for me to run the user code enclosed
>> within JobArtifactSet.withActiveJobArtifactState to achieve my requirement?
>>
>> Thank you
>>
>>
>> Faiz
>>
>


Re: Classpath isolation per SparkSession without Spark Connect

2023-11-27 Thread Holden Karau
So I don’t think we make any particular guarantees around class path
isolation there, so even if it does work it’s something you’d need to pay
attention to on upgrades. Class path isolation is tricky to get right.

On Mon, Nov 27, 2023 at 2:58 PM Faiz Halde  wrote:

> Hello,
>
> We are using spark 3.5.0 and were wondering if the following is achievable
> using spark-core
>
> Our use case involves spinning up a spark cluster where the driver
> application loads user jars containing spark transformations at runtime. A
> single spark application can load multiple user jars ( same cluster ) that
> can have class path conflicts if care is not taken
>
> AFAIK, to get this right requires the Executor to be designed in a way
> that allows for class path isolation ( UDF, lambda expressions ). Ideally
> per Spark Session is what we want
>
> I know Spark connect has been designed this way but Spark connect is not
> an option for us at the moment. I had some luck using a private method
> inside spark called JobArtifactSet.withActiveJobArtifactState
>
> Is it sufficient for me to run the user code enclosed
> within JobArtifactSet.withActiveJobArtifactState to achieve my requirement?
>
> Thank you
>
>
> Faiz
>


Re: Spark structured streaming tab is missing from spark web UI

2023-11-24 Thread Jungtaek Lim
The feature was added in Spark 3.0. Btw, you may want to check out the EOL
date for Apache Spark releases - https://endoflife.date/apache-spark 2.x is
already EOLed.


On Fri, Nov 24, 2023 at 11:13 PM mallesh j 
wrote:

> Hi Team,
>
> I am trying to test the performance of a spark streaming application that
> I wrote which reads/writes data to Kafka. Code is working fine but I cannot
> see the Streaming tab in the UI. I tried enabling it by adding below config
> to spark conf but still no luck. I have even checked in Google/Stack
> overflow on this but did not get it. So can you please check and let me
> know on the same ? If it is present or not , how can I enable it?
>
> Attached is the screenshot for the same
>
> Spark version 2.4
> Scala version 2.11
>
>
> Thanks & Regards
>  Mallesh Jogu,
> + 919493390341.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org


Re: How exactly does dropDuplicatesWithinWatermark work?

2023-11-21 Thread Jungtaek Lim
I'll probably reply the same to SO but posting here first.

This is mentioned in JIRA ticket, design doc, and also API doc, but to
reiterate, the contract/guarantee of the new API is that the API will
deduplicate events properly when the max distance of all your duplicate
events are less than watermark delay. The internal implementation is
slightly complicated and depends on the first arrived event per
duplication, and the API does not promise any behavior beyond
the contract/guarantee. You cannot expect any strict behavior beyond the
contract/guarantee.

The main use case of this new API is to cover with writers which guarantees
"at-least-once", which has a risk of duplication. E.g. Writing data to a
Kafka topic without a transaction could end up with duplication. In most
cases, duplicated writes for the same data would happen within a
predictable time frame, and this new API will ensure that these duplicated
writes are deduplicated once users provide the max distance of time (max -
min) among duplicated events as delay threshold of watermark.

Hope this helps.

Thanks,
Jungtaek Lim (HeartSaVioR)

On Mon, Nov 20, 2023 at 10:18 AM Perfect Stranger 
wrote:

> Hello, I have trouble understanding how dropDuplicatesWithinWatermark
> works. And I posted this stackoverflow question:
>
> https://stackoverflow.com/questions/77512507/how-exactly-does-dropduplicateswithinwatermark-work
>
> Could somebody answer it please?
>
> Best Regards,
> Pavel.
>


Re: Spark-submit without access to HDFS

2023-11-17 Thread Mich Talebzadeh
Hi,

How are you submitting your spark job from your client?

Your files can either be on HDFS or HCFS such as gs, s3 etc.

With reference to --py-files hdfs://yarn-master-url hdfs://foo.py', I
assume you want your

spark-submit --verbose \
   --deploy-mode cluster \
   --conf "spark.yarn.appMasterEnv.SPARK_HOME=$SPARK_HOME" \
   --conf "spark.yarn.appMasterEnv.PYTHONPATH=${PYTHONPATH}" \
   --conf "spark.executorEnv.PYTHONPATH=${PYTHONPATH}" \
   --py-files $CODE_DIRECTORY_CLOUD/dataproc_on_gke.zip \
   --conf "spark.driver.memory"=4G \
   --conf "spark.executor.memory"=4G \
   --conf "spark.num.executors"=4 \
   --conf "spark.executor.cores"=2 \
   $CODE_DIRECTORY_CLOUD/${APPLICATION}

in my case I define $CODE_DIRECTORY_CLOUD as below on google cloud storage

CODE_DIRECTORY="/home/hduser/dba/bin/python/"
CODE_DIRECTORY_CLOUD="gs://,${PROJECT}-spark-on-k8s/codes"
cd $CODE_DIRECTORY
[ -f ${source_code}.zip ] && rm -r -f ${source_code}.zip
echo `date` ", ===> creating source zip directory from  ${source_code}"
# zip needs to be done at root directory of code
zip -rq ${source_code}.zip ${source_code}
gsutil cp ${source_code}.zip $CODE_DIRECTORY_CLOUD
gsutil cp /home/hduser/dba/bin/python/${source_code}/src/${APPLICATION}
$CODE_DIRECTORY_CLOUD

So in summary I create a zip  file of my project and copy it across to the
cloud storage and then put the application (py file) there as well and use
them in spark-submit

I trust this answers your question.

HTH



Mich Talebzadeh,
Technologist, Solutions Architect & Engineer
London
United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Wed, 15 Nov 2023 at 21:33, Eugene Miretsky 
wrote:

> Hey All,
>
> We are running Pyspark spark-submit from a client outside the cluster. The
> client has network connectivity only to the Yarn Master, not the HDFS
> Datanodes. How can we submit the jobs? The idea would be to preload all the
> dependencies (job code, libraries, etc) to HDFS, and just submit the job
> from the client.
>
> We tried something like this
> 'PYSPARK_ARCHIVES_PATH=hdfs://some-path/pyspark.zip spark-submit --master
> yarn --deploy-mode cluster --py-files hdfs://yarn-master-url hdfs://foo.py'
>
> The error we are getting is
> "
>
> org.apache.hadoop.net.ConnectTimeoutException: 6 millis timeout while
> waiting for channel to be ready for connect. ch :
> java.nio.channels.SocketChannel[connection-pending remote=/
> 10.117.110.19:9866]
>
> org.apache.hadoop.ipc.RemoteException(java.io.IOException): File
> /user/users/.sparkStaging/application_1698216436656_0104/*spark_conf.zip*
> could only be written to 0 of the 1 minReplication nodes. There are 2
> datanode(s) running and 2 node(s) are excluded in this operation.
> "
>
> A few question
> 1) What are the spark_conf.zip files. Is it the hive-site/yarn-site conf
> files? Why would the client send them to the cluster? (the cluster already
> has all that info - this would make sense in client mode, but not cluster
> mode )
> 2) Is it possible to use spark-submit without HDFS access?
> 3) How would we fix this?
>
> Cheers,
> Eugene
>
> --
>
> *Eugene Miretsky*
> Managing Partner |  Badal.io | Book a meeting /w me!
> 
> mobile:  416-568-9245
> email: eug...@badal.io 
>


RE: The job failed when we upgraded from spark 3.3.1 to spark3.4.1

2023-11-16 Thread Stevens, Clay
Perhaps you also need to upgrade Scala?

Clay Stevens

From: Hanyu Huang 
Sent: Wednesday, 15 November, 2023 1:15 AM
To: user@spark.apache.org
Subject: The job failed when we upgraded from spark 3.3.1 to spark3.4.1

Caution, this email may be from a sender outside Wolters Kluwer. Verify the 
sender and know the content is safe.
The version our job originally ran was spark 3.3.1 and Apache Iceberg to 1.2.0, 
But since we upgraded to  spark3.4.1 and Apache Iceberg to 1.3.1, jobs started 
to fail frequently, We tried to upgrade only iceberg without upgrading spark, 
and the job did not report an error.


Detailed description:

When we execute this function writing data to the iceberg table:

def appendToIcebergTable(targetTable: String, df: DataFrame): Unit = {
  _logger.warn(s"Append data to $targetTable")

  val (targetCols, sourceCols) = matchDFSchemaWithTargetTable(targetTable, df)
  df.createOrReplaceTempView("_temp")
  spark.sql(s"""
  INSERT INTO $targetTable ($targetCols) SELECT $sourceCols FROM _temp
  """)
  _logger.warn(s"Done append data to $targetTable")
  getIcebergLastAppendCountVerbose(targetTable)
}


The error is reported as follows:
Caused by: java.lang.AssertionError: assertion failed at 
scala.Predef$.assert(Predef.scala:208) at 
org.apache.spark.sql.execution.ColumnarToRowExec.(Columnar.scala:72) ... 
191 more


Read the source code and find that the error is reported here:


case class ColumnarToRowExec(child: SparkPlan) extends ColumnarToRowTransition 
with CodegenSupport {
  // supportsColumnar requires to be only called on driver side, see also 
SPARK-37779.
  assert(Utils.isInRunningSparkTask || child.supportsColumnar)

  override def output: Seq[Attribute] = child.output

  override def outputPartitioning: Partitioning = child.outputPartitioning

  override def outputOrdering: Seq[SortOrder] = child.outputOrdering

But we can't find the root cause,So seek help from the community ,If more log 
information is required, please let me know.

thanks


Re: Spark-submit without access to HDFS

2023-11-16 Thread Jörn Franke
I am not 100% sure but I do not think this works - the driver would need access to HDFS.What you could try (have not tested it though in your scenario):- use SparkConnect: https://spark.apache.org/docs/latest/spark-connect-overview.html- host the zip file on a https server and use that url (I would recommend against it though for various reasons, such as reliability)Am 15.11.2023 um 22:33 schrieb Eugene Miretsky :Hey All, We are running Pyspark spark-submit from a client outside the cluster. The client has network connectivity only to the Yarn Master, not the HDFS Datanodes. How can we submit the jobs? The idea would be to preload all the dependencies (job code, libraries, etc) to HDFS, and just submit the job from the client. We tried something like this'PYSPARK_ARCHIVES_PATH=hdfs://some-path/pyspark.zip spark-submit --master yarn --deploy-mode cluster --py-files hdfs://yarn-master-url hdfs://foo.py'The error we are getting is "org.apache.hadoop.net.ConnectTimeoutException: 6 millis timeout while waiting for channel to be ready for connect. ch : java.nio.channels.SocketChannel[connection-pending remote=/10.117.110.19:9866]org.apache.hadoop.ipc.RemoteException(java.io.IOException): File /user/users/.sparkStaging/application_1698216436656_0104/spark_conf.zip could only be written to 0 of the 1 minReplication nodes. There are 2 datanode(s) running and 2 node(s) are excluded in this operation." A few question 1) What are the spark_conf.zip files. Is it the hive-site/yarn-site conf files? Why would the client send them to the cluster? (the cluster already has all that info - this would make sense in client mode, but not cluster mode )2) Is it possible to use spark-submit without HDFS access? 3) How would we fix this?  Cheers,Eugene-- Eugene MiretskyManaging Partner |  Badal.io | Book a meeting /w me! mobile:  416-568-9245email:     eug...@badal.io


Re: Re: [EXTERNAL] Re: Spark-submit without access to HDFS

2023-11-15 Thread eab...@163.com
Hi Eugene,
  As the logs indicate, when executing spark-submit, Spark will package and 
upload spark/conf to HDFS, along with uploading spark/jars. These files are 
uploaded to HDFS unless you specify uploading them to another OSS. To do so, 
you'll need to modify the configuration in hdfs-site.xml, for instance, 
fs.oss.impl, etc.



eabour
 
From: Eugene Miretsky
Date: 2023-11-16 09:58
To: eab...@163.com
CC: Eugene Miretsky; user @spark
Subject: Re: [EXTERNAL] Re: Spark-submit without access to HDFS
Hey! 

Thanks for the response. 

We are getting the error because there is no network connectivity to the data 
nodes - that's expected. 

What I am trying to find out is WHY we need access to the data nodes, and if 
there is a way to submit a job without it. 

Cheers,
Eugene

On Wed, Nov 15, 2023 at 7:32 PM eab...@163.com  wrote:
Hi Eugene,
I think you should Check if the HDFS service is running properly.  From the 
logs, it appears that there are two datanodes in HDFS,  but none of them are 
healthy.  Please investigate the reasons why the datanodes are not functioning 
properly.  It seems that the issue might be due to insufficient disk space.



eabour
 
From: Eugene Miretsky
Date: 2023-11-16 05:31
To: user
Subject: Spark-submit without access to HDFS
Hey All, 

We are running Pyspark spark-submit from a client outside the cluster. The 
client has network connectivity only to the Yarn Master, not the HDFS 
Datanodes. How can we submit the jobs? The idea would be to preload all the 
dependencies (job code, libraries, etc) to HDFS, and just submit the job from 
the client. 

We tried something like this
'PYSPARK_ARCHIVES_PATH=hdfs://some-path/pyspark.zip spark-submit --master yarn 
--deploy-mode cluster --py-files hdfs://yarn-master-url hdfs://foo.py'

The error we are getting is 
"
org.apache.hadoop.net.ConnectTimeoutException: 6 millis timeout while 
waiting for channel to be ready for connect. ch : 
java.nio.channels.SocketChannel[connection-pending remote=/10.117.110.19:9866]
org.apache.hadoop.ipc.RemoteException(java.io.IOException): File 
/user/users/.sparkStaging/application_1698216436656_0104/spark_conf.zip could 
only be written to 0 of the 1 minReplication nodes. There are 2 datanode(s) 
running and 2 node(s) are excluded in this operation.
" 

A few question 
1) What are the spark_conf.zip files. Is it the hive-site/yarn-site conf files? 
Why would the client send them to the cluster? (the cluster already has all 
that info - this would make sense in client mode, but not cluster mode )
2) Is it possible to use spark-submit without HDFS access? 
3) How would we fix this?  

Cheers,
Eugene

-- 

Eugene Miretsky
Managing Partner |  Badal.io | Book a meeting /w me! 
mobile:  416-568-9245
email: eug...@badal.io


-- 

Eugene Miretsky
Managing Partner |  Badal.io | Book a meeting /w me! 
mobile:  416-568-9245
email: eug...@badal.io


Re: [EXTERNAL] Re: Spark-submit without access to HDFS

2023-11-15 Thread Eugene Miretsky
Hey!

Thanks for the response.

We are getting the error because there is no network connectivity to the
data nodes - that's expected.

What I am trying to find out is WHY we need access to the data nodes, and
if there is a way to submit a job without it.

Cheers,
Eugene

On Wed, Nov 15, 2023 at 7:32 PM eab...@163.com  wrote:

> Hi Eugene,
> I think you should Check if the HDFS service is running properly.  From 
> the logs, it appears that there are two datanodes in HDFS,
>  but none of them are healthy.
> Please investigate the reasons why the datanodes are not functioning properly.
> It seems that the issue might be due to insufficient disk space.
>
> --
> eabour
>
>
> *From:* Eugene Miretsky 
> *Date:* 2023-11-16 05:31
> *To:* user 
> *Subject:* Spark-submit without access to HDFS
> Hey All,
>
> We are running Pyspark spark-submit from a client outside the cluster. The
> client has network connectivity only to the Yarn Master, not the HDFS
> Datanodes. How can we submit the jobs? The idea would be to preload all the
> dependencies (job code, libraries, etc) to HDFS, and just submit the job
> from the client.
>
> We tried something like this
> 'PYSPARK_ARCHIVES_PATH=hdfs://some-path/pyspark.zip spark-submit --master
> yarn --deploy-mode cluster --py-files hdfs://yarn-master-url hdfs://foo.py'
>
> The error we are getting is
> "
>
> org.apache.hadoop.net.ConnectTimeoutException: 6 millis timeout while
> waiting for channel to be ready for connect. ch :
> java.nio.channels.SocketChannel[connection-pending remote=/
> 10.117.110.19:9866]
>
> org.apache.hadoop.ipc.RemoteException(java.io.IOException): File
> /user/users/.sparkStaging/application_1698216436656_0104/*spark_conf.zip*
> could only be written to 0 of the 1 minReplication nodes. There are 2
> datanode(s) running and 2 node(s) are excluded in this operation.
> "
>
> A few question
> 1) What are the spark_conf.zip files. Is it the hive-site/yarn-site conf
> files? Why would the client send them to the cluster? (the cluster already
> has all that info - this would make sense in client mode, but not cluster
> mode )
> 2) Is it possible to use spark-submit without HDFS access?
> 3) How would we fix this?
>
> Cheers,
> Eugene
>
> --
>
> *Eugene Miretsky*
> Managing Partner |  Badal.io | Book a meeting /w me!
> 
> mobile:  416-568-9245
> email: eug...@badal.io 
>
>

-- 

*Eugene Miretsky*
Managing Partner |  Badal.io | Book a meeting /w me!

mobile:  416-568-9245
email: eug...@badal.io 


Re: Spark-submit without access to HDFS

2023-11-15 Thread eab...@163.com
Hi Eugene,
I think you should Check if the HDFS service is running properly.  From the 
logs, it appears that there are two datanodes in HDFS,  but none of them are 
healthy.  Please investigate the reasons why the datanodes are not functioning 
properly.  It seems that the issue might be due to insufficient disk space.



eabour
 
From: Eugene Miretsky
Date: 2023-11-16 05:31
To: user
Subject: Spark-submit without access to HDFS
Hey All, 

We are running Pyspark spark-submit from a client outside the cluster. The 
client has network connectivity only to the Yarn Master, not the HDFS 
Datanodes. How can we submit the jobs? The idea would be to preload all the 
dependencies (job code, libraries, etc) to HDFS, and just submit the job from 
the client. 

We tried something like this
'PYSPARK_ARCHIVES_PATH=hdfs://some-path/pyspark.zip spark-submit --master yarn 
--deploy-mode cluster --py-files hdfs://yarn-master-url hdfs://foo.py'

The error we are getting is 
"
org.apache.hadoop.net.ConnectTimeoutException: 6 millis timeout while 
waiting for channel to be ready for connect. ch : 
java.nio.channels.SocketChannel[connection-pending remote=/10.117.110.19:9866]
org.apache.hadoop.ipc.RemoteException(java.io.IOException): File 
/user/users/.sparkStaging/application_1698216436656_0104/spark_conf.zip could 
only be written to 0 of the 1 minReplication nodes. There are 2 datanode(s) 
running and 2 node(s) are excluded in this operation.
" 

A few question 
1) What are the spark_conf.zip files. Is it the hive-site/yarn-site conf files? 
Why would the client send them to the cluster? (the cluster already has all 
that info - this would make sense in client mode, but not cluster mode )
2) Is it possible to use spark-submit without HDFS access? 
3) How would we fix this?  

Cheers,
Eugene

-- 

Eugene Miretsky
Managing Partner |  Badal.io | Book a meeting /w me! 
mobile:  416-568-9245
email: eug...@badal.io


Re: Okio Vulnerability in Spark 3.4.1

2023-11-14 Thread Bjørn Jørgensen
FYI
I have opened Update okio to version 1.17.6
<https://github.com/fabric8io/kubernetes-client/pull/5587> for this now.

tor. 31. aug. 2023 kl. 21:18 skrev Sean Owen :

> It's a dependency of some other HTTP library. Use mvn dependency:tree to
> see where it comes from. It may be more straightforward to upgrade the
> library that brings it in, assuming a later version brings in a later okio.
> You can also manage up the version directly with a new entry in
> 
>
> However, does this affect Spark? all else equal it doesn't hurt to
> upgrade, but wondering if there is even a theory that it needs to be
> updated.
>
>
> On Thu, Aug 31, 2023 at 7:42 AM Agrawal, Sanket 
> wrote:
>
>> I don’t see an entry in pom.xml while building spark. I think it is being
>> downloaded as part of some other dependency.
>>
>>
>>
>> *From:* Sean Owen 
>> *Sent:* Thursday, August 31, 2023 5:10 PM
>> *To:* Agrawal, Sanket 
>> *Cc:* user@spark.apache.org
>> *Subject:* [EXT] Re: Okio Vulnerability in Spark 3.4.1
>>
>>
>>
>> Does the vulnerability affect Spark?
>>
>> In any event, have you tried updating Okio in the Spark build? I don't
>> believe you could just replace the JAR, as other libraries probably rely on
>> it and compiled against the current version.
>>
>>
>>
>> On Thu, Aug 31, 2023 at 6:02 AM Agrawal, Sanket <
>> sankeagra...@deloitte.com.invalid> wrote:
>>
>> Hi All,
>>
>>
>>
>> Amazon inspector has detected a vulnerability in okio-1.15.0.jar JAR in
>> Spark 3.4.1. It suggests to upgrade the jar version to 3.4.0. But when we
>> try this version of jar then the spark application is failing with below
>> error:
>>
>>
>>
>> py4j.protocol.Py4JJavaError: An error occurred while calling
>> None.org.apache.spark.api.java.JavaSparkContext.
>>
>> : java.lang.NoClassDefFoundError: okio/BufferedSource
>>
>> at okhttp3.internal.Util.(Util.java:62)
>>
>> at okhttp3.OkHttpClient.(OkHttpClient.java:127)
>>
>> at okhttp3.OkHttpClient$Builder.(OkHttpClient.java:475)
>>
>> at
>> io.fabric8.kubernetes.client.okhttp.OkHttpClientFactory.newOkHttpClientBuilder(OkHttpClientFactory.java:41)
>>
>> at
>> io.fabric8.kubernetes.client.okhttp.OkHttpClientFactory.newBuilder(OkHttpClientFactory.java:56)
>>
>> at
>> io.fabric8.kubernetes.client.okhttp.OkHttpClientFactory.newBuilder(OkHttpClientFactory.java:68)
>>
>> at
>> io.fabric8.kubernetes.client.okhttp.OkHttpClientFactory.newBuilder(OkHttpClientFactory.java:30)
>>
>> at
>> io.fabric8.kubernetes.client.KubernetesClientBuilder.getHttpClient(KubernetesClientBuilder.java:88)
>>
>> at
>> io.fabric8.kubernetes.client.KubernetesClientBuilder.build(KubernetesClientBuilder.java:78)
>>
>> at
>> org.apache.spark.deploy.k8s.SparkKubernetesClientFactory$.createKubernetesClient(SparkKubernetesClientFactory.scala:120)
>>
>> at
>> org.apache.spark.scheduler.cluster.k8s.KubernetesClusterManager.createSchedulerBackend(KubernetesClusterManager.scala:111)
>>
>> at
>> org.apache.spark.SparkContext$.org$apache$spark$SparkContext$$createTaskScheduler(SparkContext.scala:3037)
>>
>> at org.apache.spark.SparkContext.(SparkContext.scala:568)
>>
>> at
>> org.apache.spark.api.java.JavaSparkContext.(JavaSparkContext.scala:58)
>>
>> at
>> java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native
>> Method)
>>
>> at
>> java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(Unknown
>> Source)
>>
>> at
>> java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(Unknown
>> Source)
>>
>> at java.base/java.lang.reflect.Constructor.newInstance(Unknown
>> Source)
>>
>> at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:247)
>>
>> at
>> py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
>>
>> at py4j.Gateway.invoke(Gateway.java:238)
>>
>> at
>> py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)
>>
>> at
>> py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69)
>>
>> at
>> py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
>>
>> at
>> py4j.ClientServerConnection.run(

Re: Unsubscribe

2023-11-08 Thread Xin Zhang
Unsubscribe


--
Email:josseph.zh...@gmail.com


Re: [ SPARK SQL ]: UPPER in WHERE condition is not working in Apache Spark 3.5.0 for Mysql ENUM Column

2023-11-07 Thread Suyash Ajmera
Any update on this?


On Fri, 13 Oct, 2023, 12:56 pm Suyash Ajmera, 
wrote:

> This issue is related to CharVarcharCodegenUtils readSidePadding method .
>
> Appending white spaces while reading ENUM data from mysql
>
> Causing issue in querying , writing the same data to Cassandra.
>
> On Thu, 12 Oct, 2023, 7:46 pm Suyash Ajmera, 
> wrote:
>
>> I have upgraded my spark job from spark 3.3.1 to spark 3.5.0, I am
>> querying to Mysql Database and applying
>>
>> `*UPPER(col) = UPPER(value)*` in the subsequent sql query. It is working
>> as expected in spark 3.3.1 , but not working with 3.5.0.
>>
>> Where Condition ::  `*UPPER(vn) = 'ERICSSON' AND (upper(st) = 'OPEN' OR
>> upper(st) = 'REOPEN' OR upper(st) = 'CLOSED')*`
>>
>> The *st *column is ENUM in the database and it is causing the issue.
>>
>> Below is the Physical Plan of *FILTER* phase :
>>
>> For 3.3.1 :
>>
>> +- Filter ((upper(vn#11) = ERICSSON) AND (((upper(st#42) = OPEN) OR
>> (upper(st#42) = REOPEN)) OR (upper(st#42) = CLOSED)))
>>
>> For 3.5.0 :
>>
>> +- Filter ((upper(vn#11) = ERICSSON) AND (((upper(staticinvoke(class
>> org.apache.spark.sql.catalyst.util.CharVarcharCodegenUtils, StringType,
>> readSidePadding, st#42, 13, true, false, true)) = OPEN) OR
>> (upper(staticinvoke(class
>> org.apache.spark.sql.catalyst.util.CharVarcharCodegenUtils, StringType,
>> readSidePadding, st#42, 13, true, false, true)) = REOPEN)) OR
>> (upper(staticinvoke(class
>> org.apache.spark.sql.catalyst.util.CharVarcharCodegenUtils, StringType,
>> readSidePadding, st#42, 13, true, false, true)) = CLOSED)))
>>
>> -
>>
>> I have debug it and found that Spark added a property in version 3.4.0 ,
>> i.e. **spark.sql.readSideCharPadding** which has default value **true**.
>>
>> Link to the JIRA : https://issues.apache.org/jira/browse/SPARK-40697
>>
>> Added a new method in Class **CharVarcharCodegenUtils**
>>
>> public static UTF8String readSidePadding(UTF8String inputStr, int limit) {
>> int numChars = inputStr.numChars();
>> if (numChars == limit) {
>>   return inputStr;
>> } else if (numChars < limit) {
>>   return inputStr.rpad(limit, SPACE);
>> } else {
>>   return inputStr;
>> }
>>   }
>>
>>
>> **This method is appending some whitespace padding to the ENUM values
>> while reading and causing the Issue.**
>>
>> ---
>>
>> When I am removing the UPPER function from the where condition the
>> **FILTER** Phase looks like this :
>>
>>  +- Filter (((staticinvoke(class
>> org.apache.spark.sql.catalyst.util.CharVarcharCodegenUtils,
>>  StringType, readSidePadding, st#42, 13, true, false, true) = OPEN
>> ) OR (staticinvoke(class
>> org.apache.spark.sql.catalyst.util.CharVarcharCodegenUtils, StringType,
>> readSidePadding, st#42, 13, true, false, true) = REOPEN   )) OR
>> (staticinvoke(class
>> org.apache.spark.sql.catalyst.util.CharVarcharCodegenUtils, StringType,
>> readSidePadding, st#42, 13, true, false, true) = CLOSED   ))
>>
>>
>> **You can see it has added some white space after the value and the query
>> runs fine giving the correct result.**
>>
>> But with the UPPER function I am not getting the data.
>>
>> --
>>
>> I have also tried to disable this Property *spark.sql.readSideCharPadding
>> = false* with following cases :
>>
>> 1. With Upper function in where clause :
>>It is not pushing the filters to Database and the *query works fine*.
>>
>>
>>   +- Filter (((upper(st#42) = OPEN) OR (upper(st#42) = REOPEN)) OR
>> (upper(st#42) = CLOSED))
>>
>> 2. But when I am removing the upper function
>>
>>  *It is pushing the filter to Mysql with the white spaces and I am not
>> getting the data. (THIS IS A CAUSING VERY BIG ISSUE)*
>>
>>   PushedFilters: [*IsNotNull(vn), *EqualTo(vn,ERICSSON),
>> *Or(Or(EqualTo(st,OPEN ),EqualTo(st,REOPEN
>> )),EqualTo(st,CLOSED   ))]
>>
>> I cannot move this filter to JDBC read query , also I can't remove this
>> UPPER function in the where clause.
>>
>>
>> 
>>
>> Also I found same data getting written to CASSANDRA with *PADDING .*
>>
>


Re: Spark master shuts down when one of zookeeper dies

2023-11-07 Thread Mich Talebzadeh
Hi,

Spark standalone mode does not use or rely on ZooKeeper by default. The
Spark master and workers communicate directly with each other without using
ZooKeeper. However, it appears that in your case you are relying on
ZooKeeper to provide high availability for your standalone cluster. By
configuring Spark to use ZooKeeper for leader election, you can ensure that
there is always a Spark master running, even if one of the ZooKeeper
servers goes down.

To use ZooKeeper for high availability in Spark standalone mode, you need
to configure the following properties:

spark.deploy.recoveryMode: Set to ZOOKEEPER to enable high availability
spark.deploy.zookeeper.url: The ZooKeeper cluster URL

Now the Spark master shuts down when a Zookeeper instance is down because
it loses its leadership. Zookeeper uses a leader election algorithm to
ensure that there is always a single leader in the cluster. When a
Zookeeper instance goes down, the remaining Zookeeper instances will elect
a new leader.

The original master that was down never comes up because it has lost its
state. The Spark master stores its state in Zookeeper. When the Zookeeper
instance that the master was connected to goes down, the master loses its
state. This means that the master cannot restart without losing data.

To avoid this problem, you can run multiple Spark masters in high
availability mode. This means that you will have at least two Spark masters
running at all times. When a Zookeeper instance goes down, the remaining
Spark masters will continue to run and serve applications. As stated, to
run Spark masters in high availability mode, you will need to configure the
spark.deploy.recoveryMode property to ZOOKEEPER. You will also need to
configure the spark.deploy.zookeeper.url property to point to your
Zookeeper cluster.

HTH,

Mich Talebzadeh,
Distinguished Technologist, Solutions Architect & Engineer
London
United Kingdom

Mich Talebzadeh (Ph.D.) | LinkedIn


https://en.everybodywiki.com/Mich_Talebzadeh



Disclaimer: Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




Mich Talebzadeh,
Distinguished Technologist, Solutions Architect & Engineer
London
United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Mon, 6 Nov 2023 at 15:19, Kaustubh Ghode  wrote:

> I am using spark-3.4.1 I have a setup with three ZooKeeper servers, Spark
> master shuts down when a Zookeeper instance is down a new master is elected
> as leader and the cluster is up. But the original master that was down
> never comes up. can you please help me with this issue?
>
> Stackoverflow link:- https://stackoverflow.com/questions/77431515
>
> Thanks,
> Kaustubh
>


Re: Parser error when running PySpark on Windows connecting to GCS

2023-11-04 Thread Mich Talebzadeh
General

The reason why os.path.join is appending double backslash on Windows is
because that is how Windows paths are represented. However, GCS paths (a
Hadoop Compatible File System  (HCFS) use forward slashes like in Linux.
This can cause problems if you are trying to use a Windows path in a Spark
job, *because Spark assumes that all paths are Linux paths*.

A way to avoid this problem is to use the os.path.normpath function to
normalize the path before passing it to Spark. This will ensure that the
path is in a format that is compatible with Spark.

*In Python*

import os
# example
path = "gs://etcbucket/data-file"
normalized_path = os.path.normpath(path)
# Pass the normalized path to Spark

*In Scala*


import java.io.File val path = "gs://etcbucket/data-file" val
normalizedPath = new File(path).getCanonicalPath() // Pass the normalized
path to Spark

HTH

Mich Talebzadeh,
Distinguished Technologist, Solutions Architect & Engineer
London
United Kingdom



 view my Linkedin profile


 https://en.everybodywiki.com/Mich_Talebzadeh


Disclaimer: Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.



Mich Talebzadeh,
Distinguished Technologist, Solutions Architect & Engineer
London
United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Sat, 4 Nov 2023 at 12:28, Richard Smith
 wrote:

> Hi All,
>
> I've just encountered and worked around a problem that is pretty obscure
> and unlikely to affect many people, but I thought I'd better report it
> anyway
>
> All the data I'm using is inside Google Cloud Storage buckets (path starts
> with gs://) and I'm running Spark 3.5.0 locally (for testing, real thing is
> on serverless Dataproc) on a Windows 10 laptop. The job fails when reading
> metadata via the machine learning scripts.
>
> The error is *org.apache.hadoop.shaded.com.google.rej2.PatternSyntaxException:
> error parsing regexp: invalid escape sequence: '\m'*
>
> I tracked it down to *site-packages/pyspark/ml/util.py* line 578
>
> metadataPath = os.path.join(path,"metadata")
>
> which seems innocuous but what's happening is because I'm on Windows,
> os.path.join is appending double backslash, whilst the gcs path uses
> forward slashes like Linux.
>
> I hacked the code to explicitly use forward slash if path contains gs: and
> the job now runs successfully.
>
> Richard
>


Re: Data analysis issues

2023-11-02 Thread Mich Talebzadeh
Hi,

Your mileage varies so to speak.Whether or not the data you use to analyze
in Spark through RStudio will be seen by Spark's back-end depends on how
you deploy Spark and RStudio. If you are deploying Spark and RStudio on
your own premises or in a private cloud environment, then the data you use
will only be accessible to the roles that have access to your environment.
However, if you are using a managed Spark service such as Google Dataproc
or Amazon EMR etc, then the data you use may be accessible to Spark's
back-end. This is because managed Spark services typically store your data
on their own servers. Try using encryption combined with RBAC (who can
access what), to protect your data privacy. Also beware of security risks
associated with third-party libraries if you are deploying them.

HTH

Mich Talebzadeh,
Distinguished Technologist, Solutions Architect & Engineer
London
United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Thu, 2 Nov 2023 at 22:46, Jauru Lin  wrote:

> Hello all,
>
> I have a question about Apache Spark,
> I would like to ask if I use Rstudio to connect to Spark to analyze data,
> will the data I use be seen by Spark's back-end personnel?
>
> Hope someone can solve my problem.
> Thanks!
>


Re: Spark / Scala conflict

2023-11-02 Thread Harry Jamison
Thanks Alonso,
I think this gives me some ideas.

My code is written in Python, and I use spark-submit to submit it.
I am not sure what code is written in scala.  Maybe the Phoenix driver based on 
the stack trace?
How do I tell which version of scala that was compiled against?

Is there a jar that I need to add to the spark or hbase classpath?




On Thursday, November 2, 2023 at 01:38:21 AM PDT, Aironman DirtDiver 
 wrote: 





The error message Caused by: java.lang.ClassNotFoundException: 
scala.Product$class indicates that the Spark job is trying to load a class that 
is not available in the classpath. This can happen if the Spark job is compiled 
with a different version of Scala than the version of Scala that is used to run 
the job.
You have mentioned that you are using Spark 3.5.0, which is compatible with 
Scala 2.12. However, you have also mentioned that you have tried Scala versions 
2.10, 2.11, 2.12, and 2.13. This suggests that you may have multiple versions 
of Scala installed on your system.
To resolve the issue, you need to make sure that the Spark job is compiled and 
run with the same version of Scala. You can do this by setting the 
SPARK_SCALA_VERSION environment variable to the desired Scala version before 
starting the Spark job.
For example, to compile the Spark job with Scala 2.12, you would run the 
following command:
SPARK_SCALA_VERSION=2.12 sbt compile

To run the Spark job with Scala 2.12, you would run the following command:
SPARK_SCALA_VERSION=2.12 spark-submit spark-job.jar

If you are using Databricks, you can set the Scala version for the Spark 
cluster in the cluster creation settings.
Once you have ensured that the Spark job is compiled and run with the same 
version of Scala, the error should be resolved.
Here are some additional tips for troubleshooting Scala version conflicts:
* Make sure that you are using the correct version of the Spark libraries. 
The Spark libraries must be compiled with the same version of Scala as the 
Spark job.
* If you are using a third-party library, make sure that it is compatible 
with the version of Scala that you are using.
* Check the Spark logs for any ClassNotFoundExceptions. The logs may 
indicate the specific class that is missing from the classpath.
* Use a tool like sbt dependency:tree to view the dependencies of your 
Spark job. This can help you to identify any conflicting dependencies.

El jue, 2 nov 2023 a las 5:39, Harry Jamison 
() escribió:
> I am getting the error below when I try to run a spark job connecting to 
> phoneix.  It seems like I have the incorrect scala version that some part of 
> the code is expecting.
> 
> I am using spark 3.5.0, and I have copied these phoenix jars into the spark 
> lib
> phoenix-server-hbase-2.5-5.1.3.jar  
> phoenix-spark-5.0.0-HBase-2.0.jar
> 
> I have tried scala 2.10, 2.11, 2.12, and 2.13
> I do not see the scala version used in the logs so I am not 100% sure that it 
> is using the version I expect that it should be.
> 
> 
> Here is the exception that I am getting
> 
> 2023-11-01T16:13:00,391 INFO  [Thread-4] handler.ContextHandler: Started 
> o.s.j.s.ServletContextHandler@15cd3b2a{/static/sql,null,AVAILABLE,@Spark}
> Traceback (most recent call last):
>   File "/hadoop/spark/spark-3.5.0-bin-hadoop3/copy_tables.py", line 10, in 
> 
> .option("zkUrl", "namenode:2181").load()
>   File 
> "/hadoop/spark/spark/python/lib/pyspark.zip/pyspark/sql/readwriter.py", line 
> 314, in load
>   File 
> "/hadoop/spark/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", 
> line 1322, in __call__
>   File 
> "/hadoop/spark/spark/python/lib/pyspark.zip/pyspark/errors/exceptions/captured.py",
>  line 179, in deco
>   File 
> "/hadoop/spark/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/protocol.py", line 
> 326, in get_return_value
> py4j.protocol.Py4JJavaError: An error occurred while calling o28.load.
> : java.lang.NoClassDefFoundError: scala/Product$class
> at 
> org.apache.phoenix.spark.PhoenixRelation.(PhoenixRelation.scala:29)
> at 
> org.apache.phoenix.spark.DefaultSource.createRelation(DefaultSource.scala:29)
> at 
> org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:346)
> at 
> org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:229)
> at 
> org.apache.spark.sql.DataFrameReader.$anonfun$load$2(DataFrameReader.scala:211)
> at scala.Option.getOrElse(Option.scala:189)
> at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211)
> at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:172)
> at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
> Method)
> at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at 

RE: jackson-databind version mismatch

2023-11-02 Thread moshik.vitas
Thanks for replying,

 

The issue was import of spring-boot-dependencies on my dependencyManagement pom 
that forced invalid jar version.

Removed this section and got valid spark dependencies.

 

Regards,
Moshik Vitas

 

From: Bjørn Jørgensen  
Sent: Thursday, 2 November 2023 10:40
To: eab...@163.com
Cc: user @spark ; Saar Barhoom ; 
moshik.vi...@veeva.com
Subject: Re: jackson-databind version mismatch

 

[SPARK-43225][BUILD][SQL] Remove jackson-core-asl and jackson-mapper-asl from 
pre-built distribution <https://github.com/apache/spark/pull/40893> 

 

tor. 2. nov. 2023 kl. 09:15 skrev Bjørn Jørgensen mailto:bjornjorgen...@gmail.com> >:

In spark 3.5.0 removed  jackson-core-asl and jackson-mapper-asl  those are with 
groupid org.codehaus.jackson. 

 

Those others jackson-* are with groupid com.fasterxml.jackson.core 

 

 

tor. 2. nov. 2023 kl. 01:43 skrev eab...@163.com <mailto:eab...@163.com>  
mailto:eab...@163.com> >:

Hi,

Please check the versions of jar files starting with "jackson-". Make sure 
all versions are consistent.  jackson jar list in spark-3.3.0:



2022/06/10  04:3775,714 jackson-annotations-2.13.3.jar

2022/06/10  04:37   374,895 jackson-core-2.13.3.jar

2022/06/10  04:37   232,248 jackson-core-asl-1.9.13.jar

2022/06/10  04:37 1,536,542 jackson-databind-2.13.3.jar

2022/06/10  04:3752,020 jackson-dataformat-yaml-2.13.3.jar

2022/06/10  04:37   121,201 jackson-datatype-jsr310-2.13.3.jar

2022/06/10  04:37   780,664 jackson-mapper-asl-1.9.13.jar

2022/06/10  04:37   458,981 jackson-module-scala_2.12-2.13.3.jar



Spark 3.3.0 uses Jackson version 2.13.3, while Spark 3.5.0 uses Jackson version 
2.15.2. I think you can remove the lower version of Jackson package to keep the 
versions consistent.

eabour

 

From:  <mailto:moshik.vi...@veeva.com.INVALID> moshik.vi...@veeva.com.INVALID

Date: 2023-11-01 15:03

To:  <mailto:user@spark.apache.org> user@spark.apache.org

CC:  <mailto:saar.barh...@veeva.com> 'Saar Barhoom'

Subject: jackson-databind version mismatch

Hi Spark team,

 

On upgrading spark version from 3.2.1 to 3.4.1 got the following issue:

java.lang.NoSuchMethodError: 'com.fasterxml.jackson.core.JsonGenerator 
com.fasterxml.jackson.databind.ObjectMapper.createGenerator(java.io.OutputStream,
 com.fasterxml.jackson.core.JsonEncoding)'

at 
org.apache.spark.util.JsonProtocol$.toJsonString(JsonProtocol.scala:75)

at 
org.apache.spark.SparkThrowableHelper$.getMessage(SparkThrowableHelper.scala:74)

at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$7(SQLExecution.scala:127)

at scala.Option.map(Option.scala:230)

at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125)

at 
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:195)

at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:103)

at 
org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)

at 
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)

at org.apache.spark.sql.Dataset.withAction(Dataset.scala:4165)

at org.apache.spark.sql.Dataset.head(Dataset.scala:3161)

at org.apache.spark.sql.Dataset.take(Dataset.scala:3382)

at org.apache.spark.sql.Dataset.takeAsList(Dataset.scala:3405)

at 
com.crossix.safemine.cloud.utils.DebugRDDLogger.showDataset(DebugRDDLogger.java:84)

at 
com.crossix.safemine.cloud.components.statistics.spark.StatisticsTransformer.getFillRateCountsWithSparkQuery(StatisticsTransformer.java:122)

at 
com.crossix.safemine.cloud.components.statistics.spark.StatisticsTransformer.calculateStatistics(StatisticsTransformer.java:61)

at 
com.crossix.safemine.cloud.components.statistics.spark.SparkFileStatistics.execute(SparkFileStatistics.java:102)

at 
com.crossix.safemine.cloud.StatisticsFlow.calculateAllStatistics(StatisticsFlow.java:146)

at 
com.crossix.safemine.cloud.StatisticsFlow.runStatistics(StatisticsFlow.java:119)

at 
com.crossix.safemine.cloud.StatisticsFlow.initialFileStatistics(StatisticsFlow.java:77)

at com.crossix.safemine.cloud.SMCFlow.process(SMCFlow.java:221)

at com.crossix.safemine.cloud.SMCFlow.execute(SMCFlow.java:132)

at com.crossix.safemine.cloud.SMCFlow.run(SMCFlow.java:91)



I see that that spark package contains the dependency:

com.fasterxml.jackson.core:jackson-databind:jar:2.10.5:compile

 

But jackson-databind 2.10.5 does not contai

Re: Re: jackson-databind version mismatch

2023-11-02 Thread eab...@163.com
Hi,
But in fact, it does have those packages.

 D:\02_bigdata\spark-3.5.0-bin-hadoop3\jars 

2023/09/09  10:0875,567 jackson-annotations-2.15.2.jar
2023/09/09  10:08   549,207 jackson-core-2.15.2.jar
2023/09/09  10:08   232,248 jackson-core-asl-1.9.13.jar
2023/09/09  10:08 1,620,088 jackson-databind-2.15.2.jar
2023/09/09  10:0854,630 jackson-dataformat-yaml-2.15.2.jar
2023/09/09  10:08   122,937 jackson-datatype-jsr310-2.15.2.jar
2023/09/09  10:08   780,664 jackson-mapper-asl-1.9.13.jar
2023/09/09  10:08   513,968 jackson-module-scala_2.12-2.15.2.jar



eabour
 
From: Bjørn Jørgensen
Date: 2023-11-02 16:40
To: eab...@163.com
CC: user @spark; Saar Barhoom; moshik.vitas
Subject: Re: jackson-databind version mismatch
[SPARK-43225][BUILD][SQL] Remove jackson-core-asl and jackson-mapper-asl from 
pre-built distribution

tor. 2. nov. 2023 kl. 09:15 skrev Bjørn Jørgensen :
In spark 3.5.0 removed  jackson-core-asl and jackson-mapper-asl  those are with 
groupid org.codehaus.jackson. 

Those others jackson-* are with groupid com.fasterxml.jackson.core 


tor. 2. nov. 2023 kl. 01:43 skrev eab...@163.com :
Hi,
Please check the versions of jar files starting with "jackson-". Make sure 
all versions are consistent.  jackson jar list in spark-3.3.0:

2022/06/10  04:3775,714 jackson-annotations-2.13.3.jar
2022/06/10  04:37   374,895 jackson-core-2.13.3.jar
2022/06/10  04:37   232,248 jackson-core-asl-1.9.13.jar
2022/06/10  04:37 1,536,542 jackson-databind-2.13.3.jar
2022/06/10  04:3752,020 jackson-dataformat-yaml-2.13.3.jar
2022/06/10  04:37   121,201 jackson-datatype-jsr310-2.13.3.jar
2022/06/10  04:37   780,664 jackson-mapper-asl-1.9.13.jar
2022/06/10  04:37   458,981 jackson-module-scala_2.12-2.13.3.jar

Spark 3.3.0 uses Jackson version 2.13.3, while Spark 3.5.0 uses Jackson version 
2.15.2. I think you can remove the lower version of Jackson package to keep the 
versions consistent.
eabour
 
From: moshik.vi...@veeva.com.INVALID
Date: 2023-11-01 15:03
To: user@spark.apache.org
CC: 'Saar Barhoom'
Subject: jackson-databind version mismatch
Hi Spark team,
 
On upgrading spark version from 3.2.1 to 3.4.1 got the following issue:
java.lang.NoSuchMethodError: 'com.fasterxml.jackson.core.JsonGenerator 
com.fasterxml.jackson.databind.ObjectMapper.createGenerator(java.io.OutputStream,
 com.fasterxml.jackson.core.JsonEncoding)'
at 
org.apache.spark.util.JsonProtocol$.toJsonString(JsonProtocol.scala:75)
at 
org.apache.spark.SparkThrowableHelper$.getMessage(SparkThrowableHelper.scala:74)
at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$7(SQLExecution.scala:127)
at scala.Option.map(Option.scala:230)
at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125)
at 
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:195)
at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:103)
at 
org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
at 
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:4165)
at org.apache.spark.sql.Dataset.head(Dataset.scala:3161)
at org.apache.spark.sql.Dataset.take(Dataset.scala:3382)
at org.apache.spark.sql.Dataset.takeAsList(Dataset.scala:3405)
at 
com.crossix.safemine.cloud.utils.DebugRDDLogger.showDataset(DebugRDDLogger.java:84)
at 
com.crossix.safemine.cloud.components.statistics.spark.StatisticsTransformer.getFillRateCountsWithSparkQuery(StatisticsTransformer.java:122)
at 
com.crossix.safemine.cloud.components.statistics.spark.StatisticsTransformer.calculateStatistics(StatisticsTransformer.java:61)
at 
com.crossix.safemine.cloud.components.statistics.spark.SparkFileStatistics.execute(SparkFileStatistics.java:102)
at 
com.crossix.safemine.cloud.StatisticsFlow.calculateAllStatistics(StatisticsFlow.java:146)
at 
com.crossix.safemine.cloud.StatisticsFlow.runStatistics(StatisticsFlow.java:119)
at 
com.crossix.safemine.cloud.StatisticsFlow.initialFileStatistics(StatisticsFlow.java:77)
at com.crossix.safemine.cloud.SMCFlow.process(SMCFlow.java:221)
at com.crossix.safemine.cloud.SMCFlow.execute(SMCFlow.java:132)
at com.crossix.safemine.cloud.SMCFlow.run(SMCFlow.java:91)

I see that that spark package contains the dependency:
com.fasterxml.jackson.co

Re: jackson-databind version mismatch

2023-11-02 Thread Bjørn Jørgensen
[SPARK-43225][BUILD][SQL] Remove jackson-core-asl and jackson-mapper-asl
from pre-built distribution 

tor. 2. nov. 2023 kl. 09:15 skrev Bjørn Jørgensen :

> In spark 3.5.0 removed  jackson-core-asl and jackson-mapper-asl  those
> are with groupid org.codehaus.jackson.
>
> Those others jackson-* are with groupid com.fasterxml.jackson.core
>
>
> tor. 2. nov. 2023 kl. 01:43 skrev eab...@163.com :
>
>> Hi,
>> Please check the versions of jar files starting with "jackson-". Make 
>> sure all versions are consistent.
>>  jackson jar list in spark-3.3.0:
>> 
>> 2022/06/10  04:3775,714 jackson-annotations-*2.13.3*.jar
>> 2022/06/10  04:37   374,895 jackson-core-*2.13.3*.jar
>> 2022/06/10  04:37   232,248 jackson-core-asl-1.9.13.jar
>> 2022/06/10  04:37 1,536,542 jackson-databind-*2.13.3*.jar
>> 2022/06/10  04:3752,020 jackson-dataformat-yaml-*2.13.3*.jar
>> 2022/06/10  04:37   121,201 jackson-datatype-jsr310-*2.13.3*.jar
>> 2022/06/10  04:37   780,664 jackson-mapper-asl-1.9.13.jar
>> 2022/06/10  04:37   458,981 jackson-module-scala_2.12-*2.13.3*
>> .jar
>> 
>>
>> Spark 3.3.0 uses Jackson version 2.13.3, while Spark 3.5.0 uses Jackson 
>> version 2.15.2.
>> I think you can remove the lower version of Jackson package to keep the 
>> versions consistent.
>> eabour
>>
>>
>> *From:* moshik.vi...@veeva.com.INVALID
>> *Date:* 2023-11-01 15:03
>> *To:* user@spark.apache.org
>> *CC:* 'Saar Barhoom' 
>> *Subject:* jackson-databind version mismatch
>>
>> Hi Spark team,
>>
>>
>>
>> On upgrading spark version from 3.2.1 to 3.4.1 got the following issue:
>>
>> *java.lang.NoSuchMethodError: 'com.fasterxml.jackson.core.JsonGenerator
>> com.fasterxml.jackson.databind.ObjectMapper.createGenerator(java.io.OutputStream,
>> com.fasterxml.jackson.core.JsonEncoding)'*
>>
>> *at
>> org.apache.spark.util.JsonProtocol$.toJsonString(JsonProtocol.scala:75)*
>>
>> *at
>> org.apache.spark.SparkThrowableHelper$.getMessage(SparkThrowableHelper.scala:74)*
>>
>> *at
>> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$7(SQLExecution.scala:127)*
>>
>> *at scala.Option.map(Option.scala:230)*
>>
>> *at
>> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125)*
>>
>> *at
>> org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:195)*
>>
>> *at
>> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:103)*
>>
>> *at
>> org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)*
>>
>> *at
>> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)*
>>
>> *at
>> org.apache.spark.sql.Dataset.withAction(Dataset.scala:4165)*
>>
>> *at org.apache.spark.sql.Dataset.head(Dataset.scala:3161)*
>>
>> *at org.apache.spark.sql.Dataset.take(Dataset.scala:3382)*
>>
>> *at
>> org.apache.spark.sql.Dataset.takeAsList(Dataset.scala:3405)*
>>
>> *at
>> com.crossix.safemine.cloud.utils.DebugRDDLogger.showDataset(DebugRDDLogger.java:84)*
>>
>> *at
>> com.crossix.safemine.cloud.components.statistics.spark.StatisticsTransformer.getFillRateCountsWithSparkQuery(StatisticsTransformer.java:122)*
>>
>> *at
>> com.crossix.safemine.cloud.components.statistics.spark.StatisticsTransformer.calculateStatistics(StatisticsTransformer.java:61)*
>>
>> *at
>> com.crossix.safemine.cloud.components.statistics.spark.SparkFileStatistics.execute(SparkFileStatistics.java:102)*
>>
>> *at
>> com.crossix.safemine.cloud.StatisticsFlow.calculateAllStatistics(StatisticsFlow.java:146)*
>>
>> *at
>> com.crossix.safemine.cloud.StatisticsFlow.runStatistics(StatisticsFlow.java:119)*
>>
>> *at
>> com.crossix.safemine.cloud.StatisticsFlow.initialFileStatistics(StatisticsFlow.java:77)*
>>
>> *at
>> com.crossix.safemine.cloud.SMCFlow.process(SMCFlow.java:221)*
>>
>> *at
>> com.crossix.safemine.cloud.SMCFlow.execute(SMCFlow.java:132)*
>>
>> *at
>> com.crossix.safemine.cloud.SMCFlow.run(SMCFlow.java:91)*
>>
>>
>>
>> I see that that spark package contains the dependency:
>>
>> com.fasterxml.jackson.core:jackson-databind:jar:2.10.5:compile
>>
>>
>>
>> But jackson-databind 2.10.5 does not contain 
>> *ObjectMapper.createGenerator(java.io.OutputStream,
>> com.fasterxml.jackson.core.JsonEncoding)*
>>
>> It was added on 2.11.0
>>
>>
>>
>> Trying to upgrade jackson-databind fails with:
>>
>> *com.fasterxml.jackson.databind.JsonMappingException: Scala module 2.10.5
>> requires Jackson Databind version >= 2.10.0 and < 2.11.0*
>>
>>
>>

Re: Spark / Scala conflict

2023-11-02 Thread Aironman DirtDiver
The error message Caused by: java.lang.ClassNotFoundException:
scala.Product$class indicates that the Spark job is trying to load a class
that is not available in the classpath. This can happen if the Spark job is
compiled with a different version of Scala than the version of Scala that
is used to run the job.

You have mentioned that you are using Spark 3.5.0, which is compatible with
Scala 2.12. However, you have also mentioned that you have tried Scala
versions 2.10, 2.11, 2.12, and 2.13. This suggests that you may have
multiple versions of Scala installed on your system.

To resolve the issue, you need to make sure that the Spark job is compiled
and run with the same version of Scala. You can do this by setting the
SPARK_SCALA_VERSION environment variable to the desired Scala version
before starting the Spark job.

For example, to compile the Spark job with Scala 2.12, you would run the
following command:

SPARK_SCALA_VERSION=2.12 sbt compile

To run the Spark job with Scala 2.12, you would run the following command:

SPARK_SCALA_VERSION=2.12 spark-submit spark-job.jar

If you are using Databricks, you can set the Scala version for the Spark
cluster in the cluster creation settings.

Once you have ensured that the Spark job is compiled and run with the same
version of Scala, the error should be resolved.

Here are some additional tips for troubleshooting Scala version conflicts:

   - Make sure that you are using the correct version of the Spark
   libraries. The Spark libraries must be compiled with the same version of
   Scala as the Spark job.
   - If you are using a third-party library, make sure that it is
   compatible with the version of Scala that you are using.
   - Check the Spark logs for any ClassNotFoundExceptions. The logs may
   indicate the specific class that is missing from the classpath.
   - Use a tool like sbt dependency:tree to view the dependencies of your
   Spark job. This can help you to identify any conflicting dependencies.


El jue, 2 nov 2023 a las 5:39, Harry Jamison
() escribió:

> I am getting the error below when I try to run a spark job connecting to
> phoneix.  It seems like I have the incorrect scala version that some part
> of the code is expecting.
>
> I am using spark 3.5.0, and I have copied these phoenix jars into the
> spark lib
> phoenix-server-hbase-2.5-5.1.3.jar
> phoenix-spark-5.0.0-HBase-2.0.jar
>
> I have tried scala 2.10, 2.11, 2.12, and 2.13
> I do not see the scala version used in the logs so I am not 100% sure that
> it is using the version I expect that it should be.
>
>
> Here is the exception that I am getting
>
> 2023-11-01T16:13:00,391 INFO  [Thread-4] handler.ContextHandler: Started
> o.s.j.s.ServletContextHandler@15cd3b2a{/static/sql,null,AVAILABLE,@Spark}
> Traceback (most recent call last):
>   File "/hadoop/spark/spark-3.5.0-bin-hadoop3/copy_tables.py", line 10, in
> 
> .option("zkUrl", "namenode:2181").load()
>   File
> "/hadoop/spark/spark/python/lib/pyspark.zip/pyspark/sql/readwriter.py",
> line 314, in load
>   File
> "/hadoop/spark/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py",
> line 1322, in __call__
>   File
> "/hadoop/spark/spark/python/lib/pyspark.zip/pyspark/errors/exceptions/captured.py",
> line 179, in deco
>   File
> "/hadoop/spark/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/protocol.py",
> line 326, in get_return_value
> py4j.protocol.Py4JJavaError: An error occurred while calling o28.load.
> : java.lang.NoClassDefFoundError: scala/Product$class
> at
> org.apache.phoenix.spark.PhoenixRelation.(PhoenixRelation.scala:29)
> at
> org.apache.phoenix.spark.DefaultSource.createRelation(DefaultSource.scala:29)
> at
> org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:346)
> at
> org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:229)
> at
> org.apache.spark.sql.DataFrameReader.$anonfun$load$2(DataFrameReader.scala:211)
> at scala.Option.getOrElse(Option.scala:189)
> at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211)
> at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:172)
> at
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
> Method)
> at
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.base/java.lang.reflect.Method.invoke(Method.java:566)
> at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
> at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
> at py4j.Gateway.invoke(Gateway.java:282)
> at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
> at py4j.commands.CallCommand.execute(CallCommand.java:79)
> at
> py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
> at 

Re: jackson-databind version mismatch

2023-11-02 Thread Bjørn Jørgensen
In spark 3.5.0 removed  jackson-core-asl and jackson-mapper-asl
 those are with
groupid org.codehaus.jackson.

Those others jackson-* are with groupid com.fasterxml.jackson.core


tor. 2. nov. 2023 kl. 01:43 skrev eab...@163.com :

> Hi,
> Please check the versions of jar files starting with "jackson-". Make 
> sure all versions are consistent.
>  jackson jar list in spark-3.3.0:
> 
> 2022/06/10  04:3775,714 jackson-annotations-*2.13.3*.jar
> 2022/06/10  04:37   374,895 jackson-core-*2.13.3*.jar
> 2022/06/10  04:37   232,248 jackson-core-asl-1.9.13.jar
> 2022/06/10  04:37 1,536,542 jackson-databind-*2.13.3*.jar
> 2022/06/10  04:3752,020 jackson-dataformat-yaml-*2.13.3*.jar
> 2022/06/10  04:37   121,201 jackson-datatype-jsr310-*2.13.3*.jar
> 2022/06/10  04:37   780,664 jackson-mapper-asl-1.9.13.jar
> 2022/06/10  04:37   458,981 jackson-module-scala_2.12-*2.13.3*.jar
> 
>
> Spark 3.3.0 uses Jackson version 2.13.3, while Spark 3.5.0 uses Jackson 
> version 2.15.2.
> I think you can remove the lower version of Jackson package to keep the 
> versions consistent.
> eabour
>
>
> *From:* moshik.vi...@veeva.com.INVALID
> *Date:* 2023-11-01 15:03
> *To:* user@spark.apache.org
> *CC:* 'Saar Barhoom' 
> *Subject:* jackson-databind version mismatch
>
> Hi Spark team,
>
>
>
> On upgrading spark version from 3.2.1 to 3.4.1 got the following issue:
>
> *java.lang.NoSuchMethodError: 'com.fasterxml.jackson.core.JsonGenerator
> com.fasterxml.jackson.databind.ObjectMapper.createGenerator(java.io.OutputStream,
> com.fasterxml.jackson.core.JsonEncoding)'*
>
> *at
> org.apache.spark.util.JsonProtocol$.toJsonString(JsonProtocol.scala:75)*
>
> *at
> org.apache.spark.SparkThrowableHelper$.getMessage(SparkThrowableHelper.scala:74)*
>
> *at
> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$7(SQLExecution.scala:127)*
>
> *at scala.Option.map(Option.scala:230)*
>
> *at
> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125)*
>
> *at
> org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:195)*
>
> *at
> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:103)*
>
> *at
> org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)*
>
> *at
> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)*
>
> *at
> org.apache.spark.sql.Dataset.withAction(Dataset.scala:4165)*
>
> *at org.apache.spark.sql.Dataset.head(Dataset.scala:3161)*
>
> *at org.apache.spark.sql.Dataset.take(Dataset.scala:3382)*
>
> *at
> org.apache.spark.sql.Dataset.takeAsList(Dataset.scala:3405)*
>
> *at
> com.crossix.safemine.cloud.utils.DebugRDDLogger.showDataset(DebugRDDLogger.java:84)*
>
> *at
> com.crossix.safemine.cloud.components.statistics.spark.StatisticsTransformer.getFillRateCountsWithSparkQuery(StatisticsTransformer.java:122)*
>
> *at
> com.crossix.safemine.cloud.components.statistics.spark.StatisticsTransformer.calculateStatistics(StatisticsTransformer.java:61)*
>
> *at
> com.crossix.safemine.cloud.components.statistics.spark.SparkFileStatistics.execute(SparkFileStatistics.java:102)*
>
> *at
> com.crossix.safemine.cloud.StatisticsFlow.calculateAllStatistics(StatisticsFlow.java:146)*
>
> *at
> com.crossix.safemine.cloud.StatisticsFlow.runStatistics(StatisticsFlow.java:119)*
>
> *at
> com.crossix.safemine.cloud.StatisticsFlow.initialFileStatistics(StatisticsFlow.java:77)*
>
> *at
> com.crossix.safemine.cloud.SMCFlow.process(SMCFlow.java:221)*
>
> *at
> com.crossix.safemine.cloud.SMCFlow.execute(SMCFlow.java:132)*
>
> *at
> com.crossix.safemine.cloud.SMCFlow.run(SMCFlow.java:91)*
>
>
>
> I see that that spark package contains the dependency:
>
> com.fasterxml.jackson.core:jackson-databind:jar:2.10.5:compile
>
>
>
> But jackson-databind 2.10.5 does not contain 
> *ObjectMapper.createGenerator(java.io.OutputStream,
> com.fasterxml.jackson.core.JsonEncoding)*
>
> It was added on 2.11.0
>
>
>
> Trying to upgrade jackson-databind fails with:
>
> *com.fasterxml.jackson.databind.JsonMappingException: Scala module 2.10.5
> requires Jackson Databind version >= 2.10.0 and < 2.11.0*
>
>
>
> According to spark 3.3.0 release notes: "Upgrade Jackson to 2.13.3" but
> spark package of 3.4.1 contains Jackson of 2.10.5
>
> (https://spark.apache.org/releases/spark-release-3-3-0.html)
>
> What am I missing?
>
>
>
> --
>
> 

Re: jackson-databind version mismatch

2023-11-01 Thread eab...@163.com
Hi,
Please check the versions of jar files starting with "jackson-". Make sure 
all versions are consistent.  jackson jar list in spark-3.3.0:

2022/06/10  04:3775,714 jackson-annotations-2.13.3.jar
2022/06/10  04:37   374,895 jackson-core-2.13.3.jar
2022/06/10  04:37   232,248 jackson-core-asl-1.9.13.jar
2022/06/10  04:37 1,536,542 jackson-databind-2.13.3.jar
2022/06/10  04:3752,020 jackson-dataformat-yaml-2.13.3.jar
2022/06/10  04:37   121,201 jackson-datatype-jsr310-2.13.3.jar
2022/06/10  04:37   780,664 jackson-mapper-asl-1.9.13.jar
2022/06/10  04:37   458,981 jackson-module-scala_2.12-2.13.3.jar

Spark 3.3.0 uses Jackson version 2.13.3, while Spark 3.5.0 uses Jackson version 
2.15.2. I think you can remove the lower version of Jackson package to keep the 
versions consistent.
eabour
 
From: moshik.vi...@veeva.com.INVALID
Date: 2023-11-01 15:03
To: user@spark.apache.org
CC: 'Saar Barhoom'
Subject: jackson-databind version mismatch
Hi Spark team,
 
On upgrading spark version from 3.2.1 to 3.4.1 got the following issue:
java.lang.NoSuchMethodError: 'com.fasterxml.jackson.core.JsonGenerator 
com.fasterxml.jackson.databind.ObjectMapper.createGenerator(java.io.OutputStream,
 com.fasterxml.jackson.core.JsonEncoding)'
at 
org.apache.spark.util.JsonProtocol$.toJsonString(JsonProtocol.scala:75)
at 
org.apache.spark.SparkThrowableHelper$.getMessage(SparkThrowableHelper.scala:74)
at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$7(SQLExecution.scala:127)
at scala.Option.map(Option.scala:230)
at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125)
at 
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:195)
at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:103)
at 
org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
at 
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:4165)
at org.apache.spark.sql.Dataset.head(Dataset.scala:3161)
at org.apache.spark.sql.Dataset.take(Dataset.scala:3382)
at org.apache.spark.sql.Dataset.takeAsList(Dataset.scala:3405)
at 
com.crossix.safemine.cloud.utils.DebugRDDLogger.showDataset(DebugRDDLogger.java:84)
at 
com.crossix.safemine.cloud.components.statistics.spark.StatisticsTransformer.getFillRateCountsWithSparkQuery(StatisticsTransformer.java:122)
at 
com.crossix.safemine.cloud.components.statistics.spark.StatisticsTransformer.calculateStatistics(StatisticsTransformer.java:61)
at 
com.crossix.safemine.cloud.components.statistics.spark.SparkFileStatistics.execute(SparkFileStatistics.java:102)
at 
com.crossix.safemine.cloud.StatisticsFlow.calculateAllStatistics(StatisticsFlow.java:146)
at 
com.crossix.safemine.cloud.StatisticsFlow.runStatistics(StatisticsFlow.java:119)
at 
com.crossix.safemine.cloud.StatisticsFlow.initialFileStatistics(StatisticsFlow.java:77)
at com.crossix.safemine.cloud.SMCFlow.process(SMCFlow.java:221)
at com.crossix.safemine.cloud.SMCFlow.execute(SMCFlow.java:132)
at com.crossix.safemine.cloud.SMCFlow.run(SMCFlow.java:91)

I see that that spark package contains the dependency:
com.fasterxml.jackson.core:jackson-databind:jar:2.10.5:compile
 
But jackson-databind 2.10.5 does not contain 
ObjectMapper.createGenerator(java.io.OutputStream, 
com.fasterxml.jackson.core.JsonEncoding)
It was added on 2.11.0
 
Trying to upgrade jackson-databind fails with:
com.fasterxml.jackson.databind.JsonMappingException: Scala module 2.10.5 
requires Jackson Databind version >= 2.10.0 and < 2.11.0
 
According to spark 3.3.0 release notes: "Upgrade Jackson to 2.13.3" but spark 
package of 3.4.1 contains Jackson of 2.10.5
(https://spark.apache.org/releases/spark-release-3-3-0.html)
What am I missing?
 
--
Moshik Vitas
Senior Software Developer, Crossix
Veeva Systems
m: +972-54-5326-400
moshik.vi...@veeva.com
 


Re: Re: Running Spark Connect Server in Cluster Mode on Kubernetes

2023-10-29 Thread Nagatomi Yasukazu
Hi, eabour

Thank you for the insights.

Based on the information you provided, along with the PR
[SPARK-42371][CONNECT] that add "./sbin/start-connect-server.sh" script,
I'll experiment with launching the Spark Connect Server in Cluster Mode on
Kubernetes.

[SPARK-42371][CONNECT] Add scripts to start and stop Spark Connect server
https://github.com/apache/spark/pull/39928

I'll keep you updated on the progress in this thread.

> ALL

If anyone has successfully launched the Spark Connect Server in Cluster
Mode on an on-premises Kubernetes, I'd greatly appreciate it if you could
share your experience or any relevant information.

Any related insights are also very welcome!

Best regards,
Yasukazu

2023年10月19日(木) 16:11 eab...@163.com :

> Hi,
> I have found three important classes:
>
>1. *org.apache.spark.sql.connect.service.SparkConnectServer* : the 
> ./sbin/start-connect-server.sh
>script use SparkConnectServer  class as main class. In main function,
>use SparkSession.builder.getOrCreate() create local sessin, and
>start SparkConnectService.
>2. *org.apache.spark.sql.connect.SparkConnectPlugin* : To enable Spark
>Connect, simply make sure that the appropriate JAR is available in the
>CLASSPATH and the driver plugin is configured to load this class.
>3. *org.apache.spark.sql.connect.SimpleSparkConnectService* : A simple
>main class method to start the spark connect server as a service for client
>tests.
>
>
>So, I believe that by configuring the spark.plugins and starting the
> Spark cluster on Kubernetes, clients can utilize sc://ip:port to connect
> to the remote server.
>Let me give it a try.
>
> --
> eabour
>
>
> *From:* eab...@163.com
> *Date:* 2023-10-19 14:28
> *To:* Nagatomi Yasukazu ; user @spark
> 
> *Subject:* Re: Re: Running Spark Connect Server in Cluster Mode on
> Kubernetes
> Hi all,
>
> Has the spark connect server running on k8s functionality been implemented?
>
> --
>
>
> *From:* Nagatomi Yasukazu 
> *Date:* 2023-09-05 17:51
> *To:* user 
> *Subject:* Re: Running Spark Connect Server in Cluster Mode on Kubernetes
> Dear Spark Community,
>
> I've been exploring the capabilities of the Spark Connect Server and
> encountered an issue when trying to launch it in a cluster deploy mode with
> Kubernetes as the master.
>
> While initiating the `start-connect-server.sh` script with the `--conf`
> parameter for `spark.master` and `spark.submit.deployMode`, I was met with
> an error message:
>
> ```
> Exception in thread "main" org.apache.spark.SparkException: Cluster deploy
> mode is not applicable to Spark Connect server.
> ```
>
> This error message can be traced back to Spark's source code here:
>
> https://github.com/apache/spark/blob/6c885a7cf57df328b03308cff2eed814bda156e4/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala#L307
>
> Given my observations, I'm curious about the Spark Connect Server roadmap:
>
> Is there a plan or current conversation to enable Kubernetes as a master
> in Spark Connect Server's cluster deploy mode?
>
> I have tried to gather information from existing JIRA tickets, but have
> not been able to get a definitive answer:
>
> https://issues.apache.org/jira/browse/SPARK-42730
> https://issues.apache.org/jira/browse/SPARK-39375
> https://issues.apache.org/jira/browse/SPARK-44117
>
> Any thoughts, updates, or references to similar conversations or
> initiatives would be greatly appreciated.
>
> Thank you for your time and expertise!
>
> Best regards,
> Yasukazu
>
> 2023年9月5日(火) 12:09 Nagatomi Yasukazu :
>
>> Hello Mich,
>> Thank you for your questions. Here are my responses:
>>
>> > 1. What investigation have you done to show that it is running in local
>> mode?
>>
>> I have verified through the History Server's Environment tab that:
>> - "spark.master" is set to local[*]
>> - "spark.app.id" begins with local-xxx
>> - "spark.submit.deployMode" is set to local
>>
>>
>> > 2. who has configured this kubernetes cluster? Is it supplied by a
>> cloud vendor?
>>
>> Our Kubernetes cluster was set up in an on-prem environment using RKE2(
>> https://docs.rke2.io/ ).
>>
>>
>> > 3. Confirm that you have configured Spark Connect Server correctly for
>> cluster mode. Make sure you specify the cluster manager (e.g., Kubernetes)
>> and other relevant Spark configurations in your Spark job submission.
>>
>> Based on the Spark Connect documentation I've read, there doesn't seem to
>> be any specific setti

Re: Spark join produce duplicate rows in resultset

2023-10-27 Thread Meena Rajani
Thanks all:

Patrick selected rev.* and I.* cleared the confusion. The Item actually
brought 4 rows hence the final result set had 4 rows.

Regards,
Meena

On Sun, Oct 22, 2023 at 10:13 AM Bjørn Jørgensen 
wrote:

> alos remove the space in rev. scode
>
> søn. 22. okt. 2023 kl. 19:08 skrev Sadha Chilukoori <
> sage.quoti...@gmail.com>:
>
>> Hi Meena,
>>
>> I'm asking to clarify, are the *on *& *and* keywords optional in the
>> join conditions?
>>
>> Please try this snippet, and see if it helps
>>
>> select rev.* from rev
>> inner join customer c
>> on rev.custumer_id =c.id
>> inner join product p
>> on rev.sys = p.sys
>> and rev.prin = p.prin
>> and rev.scode= p.bcode
>>
>> left join item I
>> on rev.sys = I.sys
>> and rev.custumer_id = I.custumer_id
>> and rev. scode = I.scode;
>>
>> Thanks,
>> Sadha
>>
>> On Sat, Oct 21, 2023 at 3:21 PM Meena Rajani 
>> wrote:
>>
>>> Hello all:
>>>
>>> I am using spark sql to join two tables. To my surprise I am
>>> getting redundant rows. What could be the cause.
>>>
>>>
>>> select rev.* from rev
>>> inner join customer c
>>> on rev.custumer_id =c.id
>>> inner join product p
>>> rev.sys = p.sys
>>> rev.prin = p.prin
>>> rev.scode= p.bcode
>>>
>>> left join item I
>>> on rev.sys = i.sys
>>> rev.custumer_id = I.custumer_id
>>> rev. scode = I.scode
>>>
>>> where rev.custumer_id = '123456789'
>>>
>>> The first part of the code brings one row
>>>
>>> select rev.* from rev
>>> inner join customer c
>>> on rev.custumer_id =c.id
>>> inner join product p
>>> rev.sys = p.sys
>>> rev.prin = p.prin
>>> rev.scode= p.bcode
>>>
>>>
>>> The  item has two rows which have common attributes  and the* final
>>> join should result in 2 rows. But I am seeing 4 rows instead.*
>>>
>>> left join item I
>>> on rev.sys = i.sys
>>> rev.custumer_id = I.custumer_id
>>> rev. scode = I.scode
>>>
>>>
>>>
>>> Regards,
>>> Meena
>>>
>>>
>>>
>
> --
> Bjørn Jørgensen
> Vestre Aspehaug 4, 6010 Ålesund
> Norge
>
> +47 480 94 297
>


Re: [Structured Streaming] Joins after aggregation don't work in streaming

2023-10-27 Thread Andrzej Zera
Hi, thank you very much for an update!

Thanks,
Andrzej

On 2023/10/27 01:50:35 Jungtaek Lim wrote:

> Hi, we are aware of your ticket and plan to look into it. We can't say
> about ETA but just wanted to let you know that we are going to look into
> it. Thanks for reporting!
>
> Thanks,
> Jungtaek Lim (HeartSaVioR)
>
> On Fri, Oct 27, 2023 at 5:22 AM Andrzej Zera 
> wrote:
>
>> Hey All,
>>
>> I'm trying to reproduce the following streaming operation: "Time window
>> aggregation in separate streams followed by stream-stream join". According
>> to documentation, this should be possible in Spark 3.5.0 but I had no
>> success despite different tries.
>>
>> Here is a documentation snippet I'm trying to reproduce:
>> https://github.com/apache/spark/blob/261b281e6e57be32eb28bf4e50bea24ed22a9f21/docs/structured-streaming-programming-guide.md?plain=1#L1939-L1995
>>
>> I created an issue with more details but no one responded yet:
>> https://issues.apache.org/jira/browse/SPARK-45637
>>
>> Thank you!
>> Andrzej
>>
>


Re: [Structured Streaming] Joins after aggregation don't work in streaming

2023-10-26 Thread Jungtaek Lim
Hi, we are aware of your ticket and plan to look into it. We can't say
about ETA but just wanted to let you know that we are going to look into
it. Thanks for reporting!

Thanks,
Jungtaek Lim (HeartSaVioR)

On Fri, Oct 27, 2023 at 5:22 AM Andrzej Zera  wrote:

> Hey All,
>
> I'm trying to reproduce the following streaming operation: "Time window
> aggregation in separate streams followed by stream-stream join". According
> to documentation, this should be possible in Spark 3.5.0 but I had no
> success despite different tries.
>
> Here is a documentation snippet I'm trying to reproduce:
> https://github.com/apache/spark/blob/261b281e6e57be32eb28bf4e50bea24ed22a9f21/docs/structured-streaming-programming-guide.md?plain=1#L1939-L1995
>
> I created an issue with more details but no one responded yet:
> https://issues.apache.org/jira/browse/SPARK-45637
>
> Thank you!
> Andrzej
>


[Resolved] Re: spark.stop() cannot stop spark connect session

2023-10-25 Thread eab...@163.com
Hi all.
I read source code at spark/python/pyspark/sql/connect/session.py at master 
· apache/spark (github.com) and the comment for the "stop" method is described 
as follows:
def stop(self) -> None:
# Stopping the session will only close the connection to the current 
session (and
# the life cycle of the session is maintained by the server),
# whereas the regular PySpark session immediately terminates the Spark 
Context
# itself, meaning that stopping all Spark sessions.
# It is controversial to follow the existing the regular Spark 
session's behavior
# specifically in Spark Connect the Spark Connect server is designed for
# multi-tenancy - the remote client side cannot just stop the server 
and stop
# other remote clients being used from other users.
 
So, that's how it was designed.


eabour
 
From: eab...@163.com
Date: 2023-10-20 15:56
To: user @spark
Subject: spark.stop() cannot stop spark connect session
Hi,
my code:
from pyspark.sql import SparkSession
spark = SparkSession.builder.remote("sc://172.29.190.147").getOrCreate()

import pandas as pd
# 创建pandas dataframe
pdf = pd.DataFrame({
"name": ["Alice", "Bob", "Charlie"],
"age": [25, 30, 35],
"gender": ["F", "M", "M"]
})

# 将pandas dataframe转换为spark dataframe
sdf = spark.createDataFrame(pdf)

# 显示spark dataframe
sdf.show()

spark.stop()

After stop, execute sdf.show() throw 
pyspark.errors.exceptions.connect.SparkConnectException: [NO_ACTIVE_SESSION] No 
active Spark session found. Please create a new Spark session before running 
the code. Visit the Spark web UI at http://172.29.190.147:4040/connect/ to 
check if the current session is still running and has not been stopped yet.
1 session(s) are online, running 0 Request(s)
 Session Statistics (1)
1 Pages. Jump to. Showitems in a page.Go
Page:
1
User
Session ID
Start Time ▾
Finish Time
Duration
Total Execute
29f05cde-8f8b-418d-95c0-8dbbbfb556d22023/10/20 15:30:0414 minutes 49 seconds2


eabour


Re: automatically/dinamically renew aws temporary token

2023-10-24 Thread Carlos Aguni
hi all,

thank you for your reply.

> Can’t you attach the cross account permission to the glue job role? Why
the detour via AssumeRole ?
yes Jorn, i also believe this is the best approach. but here we're dealing
with company policies and all the bureaucracy that comes along.
in parallel i'm trying to argue on that path. by now even requesting an
increase on the session duration is a struggle.
but at the moment, since I was only allowed the AssumeRole approach i'm
figuring out a way through this path.

> https://github.com/zillow/aws-custom-credential-provider
thank you Pol. I'll take a look into the project.

regards,c.

On Mon, Oct 23, 2023 at 7:03 AM Pol Santamaria  wrote:

> Hi Carlos!
>
> Take a look at this project, it's 6 years old but the approach is still
> valid:
>
> https://github.com/zillow/aws-custom-credential-provider
>
> The credential provider gets called each time an S3 or Glue Catalog is
> accessed, and then you can decide whether to use a cached token or renew.
>
> Best,
>
> *Pol Santamaria*
>
>
> On Mon, Oct 23, 2023 at 8:08 AM Jörn Franke  wrote:
>
>> Can’t you attach the cross account permission to the glue job role? Why
>> the detour via AssumeRole ?
>>
>> Assumerole can make sense if you use an AWS IAM user and STS
>> authentication, but this would make no sense within AWS for cross-account
>> access as attaching the permissions to the Glue job role is more secure (no
>> need for static credentials, automatically renew permissions in shorter
>> time without any specific configuration in Spark).
>>
>> Have you checked with AWS support?
>>
>> Am 22.10.2023 um 21:14 schrieb Carlos Aguni :
>>
>> 
>> hi all,
>>
>> i've a scenario where I need to assume a cross account role to have S3
>> bucket access.
>>
>> the problem is that this role only allows for 1h time span (no
>> negotiation).
>>
>> that said.
>> does anyone know a way to tell spark to automatically renew the token
>> or to dinamically renew the token on each node?
>> i'm currently using spark on AWS glue.
>>
>> wonder what options do I have.
>>
>> regards,c.
>>
>>


Re: Maximum executors in EC2 Machine

2023-10-24 Thread Riccardo Ferrari
Hi,

I would refer to their documentation to better understand the concepts
behind cluster overview and submitting applications:

   -
   
https://spark.apache.org/docs/latest/cluster-overview.html#cluster-manager-types
   - https://spark.apache.org/docs/latest/submitting-applications.html

When using local[*]  you can get as many worker threads as your cores  in
the same jvm running your driver and not executors. If you want to test
against a real cluster you can look into using stand-alone mode.

HTH,
Riccardo

On Mon, Oct 23, 2023 at 5:31 PM KhajaAsmath Mohammed <
mdkhajaasm...@gmail.com> wrote:

> Hi,
>
> I am running a spark job in spark EC2 machine whiich has 40 cores. Driver
> and executor memory is 16 GB. I am using local[*] but I still get only one
> executor(driver). Is there a way to get more executors with this config.
>
> I am not using yarn or mesos in this case. Only one machine which is
> enough for our work load but the data is increased.
>
> Thanks,
> Asmath
>


Re: automatically/dinamically renew aws temporary token

2023-10-23 Thread Pol Santamaria
Hi Carlos!

Take a look at this project, it's 6 years old but the approach is still
valid:

https://github.com/zillow/aws-custom-credential-provider

The credential provider gets called each time an S3 or Glue Catalog is
accessed, and then you can decide whether to use a cached token or renew.

Best,

*Pol Santamaria*


On Mon, Oct 23, 2023 at 8:08 AM Jörn Franke  wrote:

> Can’t you attach the cross account permission to the glue job role? Why
> the detour via AssumeRole ?
>
> Assumerole can make sense if you use an AWS IAM user and STS
> authentication, but this would make no sense within AWS for cross-account
> access as attaching the permissions to the Glue job role is more secure (no
> need for static credentials, automatically renew permissions in shorter
> time without any specific configuration in Spark).
>
> Have you checked with AWS support?
>
> Am 22.10.2023 um 21:14 schrieb Carlos Aguni :
>
> 
> hi all,
>
> i've a scenario where I need to assume a cross account role to have S3
> bucket access.
>
> the problem is that this role only allows for 1h time span (no
> negotiation).
>
> that said.
> does anyone know a way to tell spark to automatically renew the token
> or to dinamically renew the token on each node?
> i'm currently using spark on AWS glue.
>
> wonder what options do I have.
>
> regards,c.
>
>


Re: automatically/dinamically renew aws temporary token

2023-10-23 Thread Jörn Franke
Can’t you attach the cross account permission to the glue job role? Why the 
detour via AssumeRole ?

Assumerole can make sense if you use an AWS IAM user and STS authentication, 
but this would make no sense within AWS for cross-account access as attaching 
the permissions to the Glue job role is more secure (no need for static 
credentials, automatically renew permissions in shorter time without any 
specific configuration in Spark).

Have you checked with AWS support?

> Am 22.10.2023 um 21:14 schrieb Carlos Aguni :
> 
> 
> hi all,
> 
> i've a scenario where I need to assume a cross account role to have S3 bucket 
> access.
> 
> the problem is that this role only allows for 1h time span (no negotiation).
> 
> that said.
> does anyone know a way to tell spark to automatically renew the token
> or to dinamically renew the token on each node?
> i'm currently using spark on AWS glue.
> 
> wonder what options do I have.
> 
> regards,c.


Re: Spark join produce duplicate rows in resultset

2023-10-22 Thread Bjørn Jørgensen
alos remove the space in rev. scode

søn. 22. okt. 2023 kl. 19:08 skrev Sadha Chilukoori :

> Hi Meena,
>
> I'm asking to clarify, are the *on *& *and* keywords optional in the join
> conditions?
>
> Please try this snippet, and see if it helps
>
> select rev.* from rev
> inner join customer c
> on rev.custumer_id =c.id
> inner join product p
> on rev.sys = p.sys
> and rev.prin = p.prin
> and rev.scode= p.bcode
>
> left join item I
> on rev.sys = I.sys
> and rev.custumer_id = I.custumer_id
> and rev. scode = I.scode;
>
> Thanks,
> Sadha
>
> On Sat, Oct 21, 2023 at 3:21 PM Meena Rajani 
> wrote:
>
>> Hello all:
>>
>> I am using spark sql to join two tables. To my surprise I am
>> getting redundant rows. What could be the cause.
>>
>>
>> select rev.* from rev
>> inner join customer c
>> on rev.custumer_id =c.id
>> inner join product p
>> rev.sys = p.sys
>> rev.prin = p.prin
>> rev.scode= p.bcode
>>
>> left join item I
>> on rev.sys = i.sys
>> rev.custumer_id = I.custumer_id
>> rev. scode = I.scode
>>
>> where rev.custumer_id = '123456789'
>>
>> The first part of the code brings one row
>>
>> select rev.* from rev
>> inner join customer c
>> on rev.custumer_id =c.id
>> inner join product p
>> rev.sys = p.sys
>> rev.prin = p.prin
>> rev.scode= p.bcode
>>
>>
>> The  item has two rows which have common attributes  and the* final join
>> should result in 2 rows. But I am seeing 4 rows instead.*
>>
>> left join item I
>> on rev.sys = i.sys
>> rev.custumer_id = I.custumer_id
>> rev. scode = I.scode
>>
>>
>>
>> Regards,
>> Meena
>>
>>
>>

-- 
Bjørn Jørgensen
Vestre Aspehaug 4, 6010 Ålesund
Norge

+47 480 94 297


Re: Spark join produce duplicate rows in resultset

2023-10-22 Thread Sadha Chilukoori
Hi Meena,

I'm asking to clarify, are the *on *& *and* keywords optional in the join
conditions?

Please try this snippet, and see if it helps

select rev.* from rev
inner join customer c
on rev.custumer_id =c.id
inner join product p
on rev.sys = p.sys
and rev.prin = p.prin
and rev.scode= p.bcode

left join item I
on rev.sys = I.sys
and rev.custumer_id = I.custumer_id
and rev. scode = I.scode;

Thanks,
Sadha

On Sat, Oct 21, 2023 at 3:21 PM Meena Rajani  wrote:

> Hello all:
>
> I am using spark sql to join two tables. To my surprise I am
> getting redundant rows. What could be the cause.
>
>
> select rev.* from rev
> inner join customer c
> on rev.custumer_id =c.id
> inner join product p
> rev.sys = p.sys
> rev.prin = p.prin
> rev.scode= p.bcode
>
> left join item I
> on rev.sys = i.sys
> rev.custumer_id = I.custumer_id
> rev. scode = I.scode
>
> where rev.custumer_id = '123456789'
>
> The first part of the code brings one row
>
> select rev.* from rev
> inner join customer c
> on rev.custumer_id =c.id
> inner join product p
> rev.sys = p.sys
> rev.prin = p.prin
> rev.scode= p.bcode
>
>
> The  item has two rows which have common attributes  and the* final join
> should result in 2 rows. But I am seeing 4 rows instead.*
>
> left join item I
> on rev.sys = i.sys
> rev.custumer_id = I.custumer_id
> rev. scode = I.scode
>
>
>
> Regards,
> Meena
>
>
>


Re: Spark join produce duplicate rows in resultset

2023-10-22 Thread Patrick Tucci
Hi Meena,

It's not impossible, but it's unlikely that there's a bug in Spark SQL
randomly duplicating rows. The most likely explanation is there are more
records in the item table that match your sys/custumer_id/scode criteria
than you expect.

In your original query, try changing select rev.* to select I.*. This will
show you the records from item that the join produces. If the first part of
the code only returns one record, I expect you will see 4 distinct records
returned here.

Thanks,

Patrick


On Sun, Oct 22, 2023 at 1:29 AM Meena Rajani  wrote:

> Hello all:
>
> I am using spark sql to join two tables. To my surprise I am
> getting redundant rows. What could be the cause.
>
>
> select rev.* from rev
> inner join customer c
> on rev.custumer_id =c.id
> inner join product p
> rev.sys = p.sys
> rev.prin = p.prin
> rev.scode= p.bcode
>
> left join item I
> on rev.sys = i.sys
> rev.custumer_id = I.custumer_id
> rev. scode = I.scode
>
> where rev.custumer_id = '123456789'
>
> The first part of the code brings one row
>
> select rev.* from rev
> inner join customer c
> on rev.custumer_id =c.id
> inner join product p
> rev.sys = p.sys
> rev.prin = p.prin
> rev.scode= p.bcode
>
>
> The  item has two rows which have common attributes  and the* final join
> should result in 2 rows. But I am seeing 4 rows instead.*
>
> left join item I
> on rev.sys = i.sys
> rev.custumer_id = I.custumer_id
> rev. scode = I.scode
>
>
>
> Regards,
> Meena
>
>
>


Re: Re: Running Spark Connect Server in Cluster Mode on Kubernetes

2023-10-19 Thread eab...@163.com
Hi,
I have found three important classes:
org.apache.spark.sql.connect.service.SparkConnectServer : the 
./sbin/start-connect-server.sh script use SparkConnectServer  class as main 
class. In main function, use SparkSession.builder.getOrCreate() create local 
sessin, and start SparkConnectService.
org.apache.spark.sql.connect.SparkConnectPlugin : To enable Spark Connect, 
simply make sure that the appropriate JAR is available in the CLASSPATH and the 
driver plugin is configured to load this class.
org.apache.spark.sql.connect.SimpleSparkConnectService : A simple main class 
method to start the spark connect server as a service for client tests. 

   So, I believe that by configuring the spark.plugins and starting the Spark 
cluster on Kubernetes, clients can utilize sc://ip:port to connect to the 
remote server. 
   Let me give it a try.



eabour
 
From: eab...@163.com
Date: 2023-10-19 14:28
To: Nagatomi Yasukazu; user @spark
Subject: Re: Re: Running Spark Connect Server in Cluster Mode on Kubernetes
Hi all, 
Has the spark connect server running on k8s functionality been implemented?



 
From: Nagatomi Yasukazu
Date: 2023-09-05 17:51
To: user
Subject: Re: Running Spark Connect Server in Cluster Mode on Kubernetes
Dear Spark Community,

I've been exploring the capabilities of the Spark Connect Server and 
encountered an issue when trying to launch it in a cluster deploy mode with 
Kubernetes as the master.

While initiating the `start-connect-server.sh` script with the `--conf` 
parameter for `spark.master` and `spark.submit.deployMode`, I was met with an 
error message:

```
Exception in thread "main" org.apache.spark.SparkException: Cluster deploy mode 
is not applicable to Spark Connect server.
```

This error message can be traced back to Spark's source code here:
https://github.com/apache/spark/blob/6c885a7cf57df328b03308cff2eed814bda156e4/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala#L307

Given my observations, I'm curious about the Spark Connect Server roadmap:

Is there a plan or current conversation to enable Kubernetes as a master in 
Spark Connect Server's cluster deploy mode?

I have tried to gather information from existing JIRA tickets, but have not 
been able to get a definitive answer:

https://issues.apache.org/jira/browse/SPARK-42730
https://issues.apache.org/jira/browse/SPARK-39375
https://issues.apache.org/jira/browse/SPARK-44117

Any thoughts, updates, or references to similar conversations or initiatives 
would be greatly appreciated.

Thank you for your time and expertise!

Best regards,
Yasukazu

2023年9月5日(火) 12:09 Nagatomi Yasukazu :
Hello Mich,
Thank you for your questions. Here are my responses:

> 1. What investigation have you done to show that it is running in local mode?

I have verified through the History Server's Environment tab that:
- "spark.master" is set to local[*]
- "spark.app.id" begins with local-xxx
- "spark.submit.deployMode" is set to local


> 2. who has configured this kubernetes cluster? Is it supplied by a cloud 
> vendor?

Our Kubernetes cluster was set up in an on-prem environment using RKE2( 
https://docs.rke2.io/ ).


> 3. Confirm that you have configured Spark Connect Server correctly for 
> cluster mode. Make sure you specify the cluster manager (e.g., Kubernetes) 
> and other relevant Spark configurations in your Spark job submission.

Based on the Spark Connect documentation I've read, there doesn't seem to be 
any specific settings for cluster mode related to the Spark Connect Server.

Configuration - Spark 3.4.1 Documentation
https://spark.apache.org/docs/3.4.1/configuration.html#spark-connect

Quickstart: Spark Connect — PySpark 3.4.1 documentation
https://spark.apache.org/docs/latest/api/python/getting_started/quickstart_connect.html

Spark Connect Overview - Spark 3.4.1 Documentation
https://spark.apache.org/docs/latest/spark-connect-overview.html

The documentation only suggests running ./sbin/start-connect-server.sh 
--packages org.apache.spark:spark-connect_2.12:3.4.0, leaving me at a loss.


> 4. Can you provide a full spark submit command

Given the nature of Spark Connect, I don't use the spark-submit command. 
Instead, as per the documentation, I can execute workloads using only a Python 
script. For the Spark Connect Server, I have a Kubernetes manifest executing 
"/opt.spark/sbin/start-connect-server.sh --packages 
org.apache.spark:spark-connect_2.12:3.4.0".


> 5. Make sure that the Python client script connecting to Spark Connect Server 
> specifies the cluster mode explicitly, like using --master or --deploy-mode 
> flags when creating a SparkSession.

The Spark Connect Server operates as a Driver, so it isn't possible to specify 
the --master or --deploy-mode flags in the Python client script. If I try, I 
encounter a RuntimeError.

like this:
RuntimeError: Spark master cannot be configured with Spark Connect server; 
howeve

Re: Re: Running Spark Connect Server in Cluster Mode on Kubernetes

2023-10-19 Thread eab...@163.com
Hi all, 
Has the spark connect server running on k8s functionality been implemented?



 
From: Nagatomi Yasukazu
Date: 2023-09-05 17:51
To: user
Subject: Re: Running Spark Connect Server in Cluster Mode on Kubernetes
Dear Spark Community,

I've been exploring the capabilities of the Spark Connect Server and 
encountered an issue when trying to launch it in a cluster deploy mode with 
Kubernetes as the master.

While initiating the `start-connect-server.sh` script with the `--conf` 
parameter for `spark.master` and `spark.submit.deployMode`, I was met with an 
error message:

```
Exception in thread "main" org.apache.spark.SparkException: Cluster deploy mode 
is not applicable to Spark Connect server.
```

This error message can be traced back to Spark's source code here:
https://github.com/apache/spark/blob/6c885a7cf57df328b03308cff2eed814bda156e4/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala#L307

Given my observations, I'm curious about the Spark Connect Server roadmap:

Is there a plan or current conversation to enable Kubernetes as a master in 
Spark Connect Server's cluster deploy mode?

I have tried to gather information from existing JIRA tickets, but have not 
been able to get a definitive answer:

https://issues.apache.org/jira/browse/SPARK-42730
https://issues.apache.org/jira/browse/SPARK-39375
https://issues.apache.org/jira/browse/SPARK-44117

Any thoughts, updates, or references to similar conversations or initiatives 
would be greatly appreciated.

Thank you for your time and expertise!

Best regards,
Yasukazu

2023年9月5日(火) 12:09 Nagatomi Yasukazu :
Hello Mich,
Thank you for your questions. Here are my responses:

> 1. What investigation have you done to show that it is running in local mode?

I have verified through the History Server's Environment tab that:
- "spark.master" is set to local[*]
- "spark.app.id" begins with local-xxx
- "spark.submit.deployMode" is set to local


> 2. who has configured this kubernetes cluster? Is it supplied by a cloud 
> vendor?

Our Kubernetes cluster was set up in an on-prem environment using RKE2( 
https://docs.rke2.io/ ).


> 3. Confirm that you have configured Spark Connect Server correctly for 
> cluster mode. Make sure you specify the cluster manager (e.g., Kubernetes) 
> and other relevant Spark configurations in your Spark job submission.

Based on the Spark Connect documentation I've read, there doesn't seem to be 
any specific settings for cluster mode related to the Spark Connect Server.

Configuration - Spark 3.4.1 Documentation
https://spark.apache.org/docs/3.4.1/configuration.html#spark-connect

Quickstart: Spark Connect — PySpark 3.4.1 documentation
https://spark.apache.org/docs/latest/api/python/getting_started/quickstart_connect.html

Spark Connect Overview - Spark 3.4.1 Documentation
https://spark.apache.org/docs/latest/spark-connect-overview.html

The documentation only suggests running ./sbin/start-connect-server.sh 
--packages org.apache.spark:spark-connect_2.12:3.4.0, leaving me at a loss.


> 4. Can you provide a full spark submit command

Given the nature of Spark Connect, I don't use the spark-submit command. 
Instead, as per the documentation, I can execute workloads using only a Python 
script. For the Spark Connect Server, I have a Kubernetes manifest executing 
"/opt.spark/sbin/start-connect-server.sh --packages 
org.apache.spark:spark-connect_2.12:3.4.0".


> 5. Make sure that the Python client script connecting to Spark Connect Server 
> specifies the cluster mode explicitly, like using --master or --deploy-mode 
> flags when creating a SparkSession.

The Spark Connect Server operates as a Driver, so it isn't possible to specify 
the --master or --deploy-mode flags in the Python client script. If I try, I 
encounter a RuntimeError.

like this:
RuntimeError: Spark master cannot be configured with Spark Connect server; 
however, found URL for Spark Connect [sc://.../]


> 6. Ensure that you have allocated the necessary resources (CPU, memory etc) 
> to Spark Connect Server when running it on Kubernetes.

Resources are ample, so that shouldn't be the problem.


> 7. Review the environment variables and configurations you have set, 
> including the SPARK_NO_DAEMONIZE=1 variable. Ensure that these variables are 
> not conflicting with 

I'm unsure if SPARK_NO_DAEMONIZE=1 conflicts with cluster mode settings. But 
without it, the process goes to the background when executing 
start-connect-server.sh, causing the Pod to terminate prematurely.


> 8. Are you using the correct spark client version that is fully compatible 
> with your spark on the server?

Yes, I have verified that without using Spark Connect (e.g., using Spark 
Operator), Spark applications run as expected.

> 9. check the kubernetes error logs

The Kubernetes logs don't show any errors, and jobs are running in local mode.


> 10. Insuffici

Re: hive: spark as execution engine. class not found problem

2023-10-17 Thread Vijay Shankar
UNSUBSCRIBE

On Tue, Oct 17, 2023 at 5:09 PM Amirhossein Kabiri <
amirhosseikab...@gmail.com> wrote:

> I used Ambari to config and install Hive and Spark. I want to insert into
> a hive table using Spark execution Engine but I face to this weird error.
> The error is:
>
> Job failed with java.lang.ClassNotFoundException:
> ive_20231017100559_301568f9-bdfa-4f7c-89a6-f69a65b30aaf:1
> 2023-10-17 10:07:42,972 ERROR [c4aeb932-743e-4736-b00f-6b905381fa03 main]
> status.SparkJobMonitor: Job failed with java.lang.ClassNotFoundException:
> ive_20231017100559_301568f9-bdfa-4f7c-89a6-f69a65b30aaf:1
> com.esotericsoftware.kryo.KryoException: Unable to find class:
> ive_20231017100559_301568f9-bdfa-4f7c-89a6-f69a65b30aaf:1
> Serialization trace:
> invertedWorkGraph (org.apache.hadoop.hive.ql.plan.SparkWork)
> at
> com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:160)
> at
> com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:133)
> at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:693)
> at
> org.apache.hadoop.hive.ql.exec.SerializationUtilities$KryoWithHooks.readClass(SerializationUtilities.java:181)
> at
> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:118)
> at
> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:543)
> at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:709)
> at
> org.apache.hadoop.hive.ql.exec.SerializationUtilities$KryoWithHooks.readObject(SerializationUtilities.java:206)
> at
> org.apache.hadoop.hive.ql.exec.spark.KryoSerializer.deserialize(KryoSerializer.java:60)
> at
> org.apache.hadoop.hive.ql.exec.spark.RemoteHiveSparkClient$JobStatusJob.call(RemoteHiveSparkClient.java:329)
> at
> org.apache.hive.spark.client.RemoteDriver$JobWrapper.call(RemoteDriver.java:378)
> at
> org.apache.hive.spark.client.RemoteDriver$JobWrapper.call(RemoteDriver.java:343)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.ClassNotFoundException:
> ive_20231017100559_301568f9-bdfa-4f7c-89a6-f69a65b30aaf:1
> at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
> at java.lang.Class.forName0(Native Method)
> at java.lang.Class.forName(Class.java:348)
> at
> com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:154)
> ... 15 more
>
> 2023-10-17 10:07:43,067 INFO  [c4aeb932-743e-4736-b00f-6b905381fa03 main]
> reexec.ReOptimizePlugin: ReOptimization: retryPossible: false
> FAILED: Execution Error, return code 3 from
> org.apache.hadoop.hive.ql.exec.spark.SparkTask. Spark job failed during
> runtime. Please check stacktrace for the root cause.
>
> the weird part is Hive make this itself and asks me where to find it! I
> would appreciate any helps to solve and locate the problem.
>
> note: The Ambari, Hadoop, Hive, Zookeeper and Spark Works Well according
> to the Ambari service health check.
> note: Since I didnt find any spark specific hive-site.xml I added the
> following configs to the hive-site.xml file:
> 
>   hive.execution.engine
>   spark
> 
>
> 
>   hive.spark.warehouse.location
>   /tmp/spark/warehouse
> 
>
> 
>   hive.spark.sql.execution.mode
>   adaptive
> 
>
> 
>   hive.spark.sql.shuffle.partitions
>   200
> 
>
> 
>   hive.spark.sql.shuffle.partitions.pernode
>   2
> 
>
> 
>   hive.spark.sql.memory.fraction
>   0.6
> 
>
> 
>   hive.spark.sql.codegen.enabled
>   true
> 
>
> 
>   spark.sql.hive.hiveserver2.jdbc.url
>   jdbc:hive2://my.ambari.com:2181
> /;serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=hiveserver2
> 
>
> 
>   spark.datasource.hive.warehouse.load.staging.dir
>   /tmp
> 
>
>
> 
>   spark.hadoop.hive.zookeeper.quorum
>   my.ambari.com:2181
> 
>
> 
>
> spark.datasource.hive.warehouse.write.path.strictColumnNamesMapping
>   true
> 
>
> 
>   spark.sql.hive.conf.list
>
> hive.vectorized.execution.filesink.arrow.native.enabled=true;hive.vectorized.execution.enabled=true
> 
>
> 
>   hive.spark.client.connect.timeout
>   3ms
> 
>
> 
>   hive.spark.client.server.connect.timeout
>   30ms
>
> 
> hive.hook.proto.base-directory
> /tmp/hive/hooks
>   
>   
> hive.spark.sql.shuffle.partitions
> 200
>   
>   
> hive.strict.managed.tables
> true
>   
>   
> hive.stats.fetch.partition.stats
> true
>   
>   
> hive.spark.sql.memory.fraction
> 0.6
>   
>   
> hive.spark.sql.execution.mode
> spark
>   
>   
> hive.spark.sql.codegen.enabled
> 

Re: Spark stand-alone mode

2023-10-17 Thread Ilango
Hi all,

Thanks a lot for your suggestions and knowledge sharing. I like to let you
know that, I completed setting up the stand alone cluster and couple of
data science users are able to use it already for last two weeks. And the
performance is really good. Almost 10X performance improvement compare to
HPC local mode. They tested with some complex data science scripts using
spark and other data science projects. The cluster is really stable and
very performant.

I enabled dynamic allocation and cap the memory and cpu accordingly at
spark-defaults. Conf and at our spark framework code. So its been pretty
impressive for the last few weeks.

Thanks you so much!

Thanks,
Elango


On Tue, 19 Sep 2023 at 6:40 PM, Patrick Tucci 
wrote:

> Multiple applications can run at once, but you need to either configure
> Spark or your applications to allow that. In stand-alone mode, each
> application attempts to take all resources available by default. This
> section of the documentation has more details:
>
>
> https://spark.apache.org/docs/latest/spark-standalone.html#resource-scheduling
>
> Explicitly setting the resources per application limits the resources to
> the configured values for the lifetime of the application. You can use
> dynamic allocation to allow Spark to scale the resources up and down per
> application based on load, but the configuration is relatively more complex:
>
>
> https://spark.apache.org/docs/latest/job-scheduling.html#dynamic-resource-allocation
>
> On Mon, Sep 18, 2023 at 3:53 PM Ilango  wrote:
>
>>
>> Thanks all for your suggestions. Noted with thanks.
>> Just wanted share few more details about the environment
>> 1. We use NFS for data storage and data is in parquet format
>> 2. All HPC nodes are connected and already work as a cluster for Studio
>> workbench. I can setup password less SSH if it not exist already.
>> 3. We will stick with NFS for now and stand alone then may be will
>> explore HDFS and YARN.
>>
>> Can you please confirm whether multiple users can run spark jobs at the
>> same time?
>> If so I will start working on it and let you know how it goes
>>
>> Mich, the link to Hadoop is not working. Can you please check and let me
>> know the correct link. Would like to explore Hadoop option as well.
>>
>>
>>
>> Thanks,
>> Elango
>>
>> On Sat, Sep 16, 2023, 4:20 AM Bjørn Jørgensen 
>> wrote:
>>
>>> you need to setup ssh without password, use key instead.  How to
>>> connect without password using SSH (passwordless)
>>> 
>>>
>>> fre. 15. sep. 2023 kl. 20:55 skrev Mich Talebzadeh <
>>> mich.talebza...@gmail.com>:
>>>
 Hi,

 Can these 4 nodes talk to each other through ssh as trusted hosts (on
 top of the network that Sean already mentioned)? Otherwise you need to set
 it up. You can install a LAN if you have another free port at the back of
 your HPC nodes. They should

 You ought to try to set up a Hadoop cluster pretty easily. Check this
 old article of mine for Hadoop set-up.


 https://www.linkedin.com/pulse/diy-festive-season-how-install-configure-big-data-so-mich/?trackingId=z7n5tx7tQOGK9tcG9VClkw%3D%3D

 Hadoop will provide you with a common storage layer (HDFS) that these
 nodes will be able to share and talk. Yarn is your best bet as the resource
 manager with reasonably powerful hosts you have. However, for now the Stand
 Alone mode will do. Make sure that the Metastore you choose, (by default it
 will use Hive Metastore called Derby :( ) is something respetable like
 Postgres DB that can handle multiple concurrent spark jobs

 HTH


 Mich Talebzadeh,
 Distinguished Technologist, Solutions Architect & Engineer
 London
 United Kingdom


view my Linkedin profile
 


  https://en.everybodywiki.com/Mich_Talebzadeh



 *Disclaimer:* Use it at your own risk. Any and all responsibility for
 any loss, damage or destruction of data or any other property which may
 arise from relying on this email's technical content is explicitly
 disclaimed. The author will in no case be liable for any monetary damages
 arising from such loss, damage or destruction.




 On Fri, 15 Sept 2023 at 07:04, Ilango  wrote:

>
> Hi all,
>
> We have 4 HPC nodes and installed spark individually in all nodes.
>
> Spark is used as local mode(each driver/executor will have 8 cores and
> 65 GB) in Sparklyr/pyspark using Rstudio/Posit workbench. Slurm is used as
> scheduler.
>
> As this is local mode, we are facing performance issue(as only one
> executor) when it comes dealing with large datasets.
>
> Can I convert this 4 nodes into spark standalone cluster. We dont have
> hadoop so yarn mode is out of 

Re: Can not complete the read csv task

2023-10-14 Thread Khalid Mammadov
This command only defines a new DataFrame, in order to see some results you
need to do something like merged_spark_data.show() on a new line.

Regarding the error I think it's typical error that you get when you run
Spark on Windows OS. You can suppress it using Winutils tool (Google it or
ChatGPT it to see how).

On Thu, 12 Oct 2023, 11:58 Kelum Perera,  wrote:

> Dear friends,
>
> I'm trying to get a fresh start with Spark. I tried to read few CSV files
> in a folder, but the task got stuck and not completed as shown in the
> copied content from the terminal.
>
> Can someone help to understand what is going wrong?
>
> Versions;
> java version "11.0.16" 2022-07-19 LTS
> Java(TM) SE Runtime Environment 18.9 (build 11.0.16+11-LTS-199)
> Java HotSpot(TM) 64-Bit Server VM 18.9 (build 11.0.16+11-LTS-199, mixed
> mode)
>
> Python 3.9.13
> Windows 10
>
> Copied from the terminal;
>     __
>  / __/__  ___ _/ /__
> _\ \/ _ \/ _ `/ __/  '_/
>/__ / .__/\_,_/_/ /_/\_\   version 3.5.0
>   /_/
>
> Using Python version 3.9.13 (main, Aug 25 2022 23:51:50)
> Spark context Web UI available at http://LK510FIDSLW4.ey.net:4041
> Spark context available as 'sc' (master = local[*], app id =
> local-1697089858181).
> SparkSession available as 'spark'.
> >>> merged_spark_data =
> spark.read.csv(r"C:\Users\Kelum.Perera\Downloads\data-master\nyse_all\nyse_data\*",
> header=False )
> Exception in thread "globPath-ForkJoinPool-1-worker-115"
> java.lang.UnsatisfiedLinkError:
> org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Ljava/lang/String;I)Z
> at org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Native
> Method)
> at
> org.apache.hadoop.io.nativeio.NativeIO$Windows.access(NativeIO.java:793)
> at org.apache.hadoop.fs.FileUtil.canRead(FileUtil.java:1249)
> at org.apache.hadoop.fs.FileUtil.list(FileUtil.java:1454)
> at
> org.apache.hadoop.fs.RawLocalFileSystem.listStatus(RawLocalFileSystem.java:601)
> at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1972)
> at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:2014)
> at
> org.apache.hadoop.fs.ChecksumFileSystem.listStatus(ChecksumFileSystem.java:761)
> at org.apache.hadoop.fs.Globber.listStatus(Globber.java:128)
> at org.apache.hadoop.fs.Globber.doGlob(Globber.java:291)
> at org.apache.hadoop.fs.Globber.glob(Globber.java:202)
> at org.apache.hadoop.fs.FileSystem.globStatus(FileSystem.java:2124)
> at
> org.apache.spark.deploy.SparkHadoopUtil.globPath(SparkHadoopUtil.scala:238)
> at
> org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$checkAndGlobPathIfNecessary$3(DataSource.scala:737)
> at
> org.apache.spark.util.ThreadUtils$.$anonfun$parmap$2(ThreadUtils.scala:380)
> at scala.concurrent.Future$.$anonfun$apply$1(Future.scala:659)
> at scala.util.Success.$anonfun$map$1(Try.scala:255)
> at scala.util.Success.map(Try.scala:213)
> at scala.concurrent.Future.$anonfun$map$1(Future.scala:292)
> at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:33)
> at
> scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:33)
> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)
> at
> java.base/java.util.concurrent.ForkJoinTask$RunnableExecuteAction.exec(ForkJoinTask.java:1426)
> at
> java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290)
> at
> java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020)
> at
> java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1656)
> at
> java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1594)
> at
> java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183)
>
>
>
> Noting happens afterwards. Appreciate your kind input to solve this.
>
> Best Regards,
> Kelum Perera
>
>
>
>


Re: [ SPARK SQL ]: UPPER in WHERE condition is not working in Apache Spark 3.5.0 for Mysql ENUM Column

2023-10-13 Thread Suyash Ajmera
This issue is related to CharVarcharCodegenUtils readSidePadding method .

Appending white spaces while reading ENUM data from mysql

Causing issue in querying , writing the same data to Cassandra.

On Thu, 12 Oct, 2023, 7:46 pm Suyash Ajmera, 
wrote:

> I have upgraded my spark job from spark 3.3.1 to spark 3.5.0, I am
> querying to Mysql Database and applying
>
> `*UPPER(col) = UPPER(value)*` in the subsequent sql query. It is working
> as expected in spark 3.3.1 , but not working with 3.5.0.
>
> Where Condition ::  `*UPPER(vn) = 'ERICSSON' AND (upper(st) = 'OPEN' OR
> upper(st) = 'REOPEN' OR upper(st) = 'CLOSED')*`
>
> The *st *column is ENUM in the database and it is causing the issue.
>
> Below is the Physical Plan of *FILTER* phase :
>
> For 3.3.1 :
>
> +- Filter ((upper(vn#11) = ERICSSON) AND (((upper(st#42) = OPEN) OR
> (upper(st#42) = REOPEN)) OR (upper(st#42) = CLOSED)))
>
> For 3.5.0 :
>
> +- Filter ((upper(vn#11) = ERICSSON) AND (((upper(staticinvoke(class
> org.apache.spark.sql.catalyst.util.CharVarcharCodegenUtils, StringType,
> readSidePadding, st#42, 13, true, false, true)) = OPEN) OR
> (upper(staticinvoke(class
> org.apache.spark.sql.catalyst.util.CharVarcharCodegenUtils, StringType,
> readSidePadding, st#42, 13, true, false, true)) = REOPEN)) OR
> (upper(staticinvoke(class
> org.apache.spark.sql.catalyst.util.CharVarcharCodegenUtils, StringType,
> readSidePadding, st#42, 13, true, false, true)) = CLOSED)))
>
> -
>
> I have debug it and found that Spark added a property in version 3.4.0 ,
> i.e. **spark.sql.readSideCharPadding** which has default value **true**.
>
> Link to the JIRA : https://issues.apache.org/jira/browse/SPARK-40697
>
> Added a new method in Class **CharVarcharCodegenUtils**
>
> public static UTF8String readSidePadding(UTF8String inputStr, int limit) {
> int numChars = inputStr.numChars();
> if (numChars == limit) {
>   return inputStr;
> } else if (numChars < limit) {
>   return inputStr.rpad(limit, SPACE);
> } else {
>   return inputStr;
> }
>   }
>
>
> **This method is appending some whitespace padding to the ENUM values
> while reading and causing the Issue.**
>
> ---
>
> When I am removing the UPPER function from the where condition the
> **FILTER** Phase looks like this :
>
>  +- Filter (((staticinvoke(class
> org.apache.spark.sql.catalyst.util.CharVarcharCodegenUtils,
>  StringType, readSidePadding, st#42, 13, true, false, true) = OPEN
> ) OR (staticinvoke(class
> org.apache.spark.sql.catalyst.util.CharVarcharCodegenUtils, StringType,
> readSidePadding, st#42, 13, true, false, true) = REOPEN   )) OR
> (staticinvoke(class
> org.apache.spark.sql.catalyst.util.CharVarcharCodegenUtils, StringType,
> readSidePadding, st#42, 13, true, false, true) = CLOSED   ))
>
>
> **You can see it has added some white space after the value and the query
> runs fine giving the correct result.**
>
> But with the UPPER function I am not getting the data.
>
> --
>
> I have also tried to disable this Property *spark.sql.readSideCharPadding
> = false* with following cases :
>
> 1. With Upper function in where clause :
>It is not pushing the filters to Database and the *query works fine*.
>
>   +- Filter (((upper(st#42) = OPEN) OR (upper(st#42) = REOPEN)) OR
> (upper(st#42) = CLOSED))
>
> 2. But when I am removing the upper function
>
>  *It is pushing the filter to Mysql with the white spaces and I am not
> getting the data. (THIS IS A CAUSING VERY BIG ISSUE)*
>
>   PushedFilters: [*IsNotNull(vn), *EqualTo(vn,ERICSSON),
> *Or(Or(EqualTo(st,OPEN ),EqualTo(st,REOPEN
> )),EqualTo(st,CLOSED   ))]
>
> I cannot move this filter to JDBC read query , also I can't remove this
> UPPER function in the where clause.
>
>
> 
>
> Also I found same data getting written to CASSANDRA with *PADDING .*
>


Re: Autoscaling in Spark

2023-10-10 Thread Mich Talebzadeh
This has been brought up a few times. I will focus on Spark Structured
Streaming

Autoscaling does not support Spark Structured Streaming (SSS). Why because
streaming jobs are typically long-running jobs that need to maintain state
across micro-batches. Autoscaling is designed to scale up and down Spark
clusters in response to workload changes However, this would cause problems
for Spark Structured Streaming jobs because it would cause the jobs to lose
their state. These jobs continuously process incoming data and update their
state incrementally (see checkpoint directory). Autoscaling, which can
dynamically add or remove worker nodes, would disrupt this stateful
processing. Although Spark itself supports dynamic allocation, (i.e. which
can add or remove executor nodes based on demand), it is not the same as
autoscaling in cloud  like GCP etc like Kubernetes or managed clusters. For
now you need to plan your workload in SSS accordingly.

My general advice, the usual thing to watch  from Spark GUI

Processing Time (Process Rate)  + Reserved Capacity < Batch Interval (Batch
Duration)

If your sink  has an issue absorbing data in a timely manner as per above
formulae, you will see the defect on the Processing Rate

Batch Interval, i.e. the rate at which the upstream source sends messages
through Kafka or other source. We can start by assuming that the rate of
increase in the number of messages processed (processing time) will require
an *additional reserved capacity*. We can anticipate a heuristic 70% (~1SD)
increase in the processing time so in theory you  should be able to handle
all this work below the batch interval.

HTH

Mich Talebzadeh,
Distinguished Technologist, Solutions Architect & Engineer
London
United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Tue, 10 Oct 2023 at 16:11, Kiran Biswal  wrote:

> Hello Experts
>
> Is there any true auto scaling option for spark? The dynamic auto scaling
> works only for batch. Any guidelines on spark streaming  autoscaling and
> how that will be tied to any cluster level autoscaling solutions?
>
> Thanks
>


Re: Updating delta file column data

2023-10-10 Thread Mich Talebzadeh
Hi,

Since you mentioned that  there could be duplicate records with the same
unique key in the Delta table, you will need a way to handle these
duplicate records. One approach I can suggest is to use a timestamp to
determine the latest or most relevant record among duplicates, the
so-called op_time column df = df.withColumn("op_time", current_timestamp())
at ingestion time, so you can determine the most relevant record etc

This is the pseudo-code suggestion

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, struct
appName = "DeltaHexToIntConversion"
spark = SparkSession.builder.appName(appName).getOrCreate()
delta_table_path = "path_to_your_delta_table"
df = spark.read.format("delta").load(delta_table_path)
df = df.withColumn(
"exploded_data",
struct(col("data.field1").cast("int").alias("field1_int"),
col("data.field2"))
)
df = df.select("other_columns", "exploded_data.field1_int",
"exploded_data.field2")
# Handling Duplicates:
# Define your logic here to select the most relevant record among
duplicates, say timestamp as mentioned above
df = df.dropDuplicates(["unique_key"], keep="last")
# merge the DataFrame back to the Delta table
df.write.format("delta").mode("mergr").option("mergeSchema",
"true").save(delta_table_path)


HTH

Mich Talebzadeh,
Distinguished Technologist, Solutions Architect & Engineer
London
United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Mon, 9 Oct 2023 at 17:12, Mich Talebzadeh 
wrote:

> In a nutshell, is this what you are trying to do?
>
>
>1. Read the Delta table into a Spark DataFrame.
>2. Explode the string column into a struct column.
>3. Convert the hexadecimal field to an integer.
>4. Write the DataFrame back to the Delta table in merge mode with a
>unique key.
>
> Is this a fair assessment
>
> HTH
>
> Mich Talebzadeh,
> Distinguished Technologist, Solutions Architect & Engineer
> London
> United Kingdom
>
>
>view my Linkedin profile
> 
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Mon, 9 Oct 2023 at 14:46, Karthick Nk  wrote:
>
>> Hi All,
>>
>> I have  mentioned the sample data below and the operation I need to
>> perform over there,
>>
>> I have delta tables with columns, in that columns I have the data in the
>> string data type(contains the struct data),
>>
>> So, I need to update one key value in the struct field data in the string
>> column of the delta table.
>>
>> Note: I can able to explode the string column into the struct field and
>> into the individual field by using the following operation in the spark,
>>
>> [image: image.png]
>>
>> df_new = spark.read.json(df.rdd.map(lambda x: '{"data": x.data }')
>>
>> Could you suggest a possible way to perform the required action in an
>> optimistic way?
>>
>> Note: Please feel free to ask, if you need further information.
>>
>> Thanks & regards,
>> Karthick
>>
>> On Mon, Oct 2, 2023 at 10:53 PM Karthick Nk 
>> wrote:
>>
>>> Hi community members,
>>>
>>> In databricks adls2 delta tables, I need to perform the below operation,
>>> could you help me with your thoughts
>>>
>>>  I have the delta tables with one colum with data type string , which
>>> contains the json data in string data type, I need to do the following
>>> 1. I have to update one particular field value in the json and update it
>>> back in the same column of the data.
>>>
>>> Example :
>>>
>>> In string column, inside json I have one field with value in hexadecimal.
>>> Like { version : ''0xabcd1234"}
>>>
>>> I have to convert this field into corresponding integer value and update
>>> back into same strong column json value.
>>> Note: I have to perform this operation within this column. This column
>>> is basically with data type string in delta table.
>>>
>>> Could you suggest some sample example.
>>>
>>> Thanks in advance.
>>>
>>


Re: Log file location in Spark on K8s

2023-10-09 Thread Prashant Sharma
Hi Sanket,

Driver and executor logs are written to stdout by default, it can be
configured using SPARK_HOME/conf/log4j.properties file. The file including
the entire SPARK_HOME/conf is auto propogateded to all driver and executor
container and mounted as volume.

Thanks

On Mon, 9 Oct, 2023, 5:37 pm Agrawal, Sanket,
 wrote:

> Hi All,
>
>
>
> We are trying to send the spark logs using fluent-bit. We validated that
> fluent-bit is able to move logs of all other pods except the
> driver/executor pods.
>
>
>
> It would be great if someone can guide us where should I look for spark
> logs in Spark on Kubernetes with client/cluster mode deployment.
>
>
>
> Thanks,
> Sanket A.
>
> This message (including any attachments) contains confidential information
> intended for a specific individual and purpose, and is protected by law. If
> you are not the intended recipient, you should delete this message and any
> disclosure, copying, or distribution of this message, or the taking of any
> action based on it, by you is strictly prohibited.
>
> Deloitte refers to a Deloitte member firm, one of its related entities, or
> Deloitte Touche Tohmatsu Limited ("DTTL"). Each Deloitte member firm is a
> separate legal entity and a member of DTTL. DTTL does not provide services
> to clients. Please see www.deloitte.com/about to learn more.
>
> v.E.1
>


Re: Clarification with Spark Structured Streaming

2023-10-09 Thread Danilo Sousa
Unsubscribe

> Em 9 de out. de 2023, à(s) 07:03, Mich Talebzadeh  
> escreveu:
> 
> Hi,
> 
> Please see my responses below:
> 
> 1) In Spark Structured Streaming does commit mean streaming data has been 
> delivered to the sink like Snowflake?
> 
> No. a commit does not refer to data being delivered to a sink like Snowflake 
> or bigQuery. The term commit refers to Spark Structured Streaming (SS) 
> internals. Specifically it means that a micro-batch of data has been 
> processed by SSS. In the checkpoint directory there is a subdirectory called 
> commits that marks the micro-batch process as completed.
> 
> 2) if sinks like Snowflake  cannot absorb or digest streaming data in a 
> timely manner, will there be an impact on spark streaming itself?
> 
> Yes, it can potentially impact SSS. If the sink cannot absorb data in a 
> timely manner, the batches will start to back up in SSS. This can cause Spark 
> to run out of memory and the streaming job to fail. As I understand, Spark 
> will use a combination of memory and disk storage (checkpointing). This can 
> also happen if the network interface between Spark and the sink is disrupted. 
> On the other hand Spark may slow down, as it tries to process the backed-up 
> batches of data. You want to avoid these scenarios.
> 
> HTH
> 
> Mich Talebzadeh,
> Distinguished Technologist, Solutions Architect & Engineer
> London
> United Kingdom
> 
>view my Linkedin profile 
> 
> 
>  https://en.everybodywiki.com/Mich_Talebzadeh
> 
>  
> Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
> damage or destruction of data or any other property which may arise from 
> relying on this email's technical content is explicitly disclaimed. The 
> author will in no case be liable for any monetary damages arising from such 
> loss, damage or destruction.
>  
> 
> 
> On Sun, 8 Oct 2023 at 19:50, ashok34...@yahoo.com.INVALID 
>  wrote:
>> Hello team
>> 
>> 1) In Spark Structured Streaming does commit mean streaming data has been 
>> delivered to the sink like Snowflake?
>> 
>> 2) if sinks like Snowflake  cannot absorb or digest streaming data in a 
>> timely manner, will there be an impact on spark streaming itself?
>> 
>> Thanks
>> 
>> AK



Re: Clarification with Spark Structured Streaming

2023-10-09 Thread Mich Talebzadeh
Your mileage varies. Often there is a flavour of Cloud Data warehouse
already there. CDWs like BigQuery, Redshift, Snowflake and so forth. They
can all do a good job for various degrees

   - Use efficient data types. Choose data types that are efficient for
   Spark to process. For example, use integer data types for columns that
   store integer values.
   - Avoid using complex data types. Complex data types, such as nested
   structs and arrays, can be less efficient for Spark to process.
   - Opt for columnar storage format like Parquet for your sink table.
   Columnar storage is highly efficient for analytical workloads as it allows
   for column-level compression and predicate pushdown.
   - These CDW come with partitioning options. Popular are date or time
   formats that can be used for partitioning. This will reduce the amount of
   data scanned during queries.
   - Some of these CDWs come with native streaming capabilities like
   BigQuery Streaming, I believe Snowflake has Snowpipe Streaming API as well
   (don't know much about it) . These options  enable real-time data ingestion
   and processing, No need for manual batch processing etc.
   - You can batch incoming data for efficiency processing, which can
   improve performance and simplify data handling. Start by configuring your
   Spark Streaming context with an appropriate batch interval. The batch
   interval defines how often Spark will process a batch of data. Choose a
   batch interval that balances latency and throughput based on the
   application's needs. Spark can process batches of data more efficiently
   than it can process individual records.
   - Snowflake says it is serverless and so is BigQuery. They are designed
   to provide a uniform performance regardless of workload. Serverless CDWs
   can efficiently handle both batch and streaming workloads without the need
   for manual resource provisioning.
   - Use materialized views to pre-compute query results, which can improve
   the performance of frequently executed queries. This has been around from
   classics RDBMs

HTH

Mich Talebzadeh,
Distinguished Technologist, Solutions Architect & Engineer
London
United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Mon, 9 Oct 2023 at 17:50, ashok34...@yahoo.com 
wrote:

> Thank you for your feedback Mich.
>
> In general how can one optimise the cloud data warehouses (the sink part),
> to handle streaming Spark data efficiently, avoiding bottlenecks that
> discussed.
>
>
> AK
> On Monday, 9 October 2023 at 11:04:41 BST, Mich Talebzadeh <
> mich.talebza...@gmail.com> wrote:
>
>
> Hi,
>
> Please see my responses below:
>
> 1) In Spark Structured Streaming does commit mean streaming data has been
> delivered to the sink like Snowflake?
>
> No. a commit does not refer to data being delivered to a sink like
> Snowflake or bigQuery. The term commit refers to Spark Structured Streaming
> (SS) internals. Specifically it means that a micro-batch of data has been
> processed by SSS. In the checkpoint directory there is a
> subdirectory called commits that marks the micro-batch process as completed.
>
> 2) if sinks like Snowflake  cannot absorb or digest streaming data in a
> timely manner, will there be an impact on spark streaming itself?
>
> Yes, it can potentially impact SSS. If the sink cannot absorb data in a
> timely manner, the batches will start to back up in SSS. This can cause
> Spark to run out of memory and the streaming job to fail. As I understand,
> Spark will use a combination of memory and disk storage (checkpointing).
> This can also happen if the network interface between Spark and the sink is
> disrupted. On the other hand Spark may slow down, as it tries to process
> the backed-up batches of data. You want to avoid these scenarios.
>
> HTH
>
> Mich Talebzadeh,
> Distinguished Technologist, Solutions Architect & Engineer
> London
> United Kingdom
>
>
>view my Linkedin profile
> 
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Sun, 8 Oct 2023 at 19:50, ashok34...@yahoo.com.INVALID
>  wrote:
>
> Hello team
>
> 1) In Spark Structured Streaming does commit mean streaming data 

Re: Clarification with Spark Structured Streaming

2023-10-09 Thread ashok34...@yahoo.com.INVALID
 Thank you for your feedback Mich.
In general how can one optimise the cloud data warehouses (the sink part), to 
handle streaming Spark data efficiently, avoiding bottlenecks that discussed.

AKOn Monday, 9 October 2023 at 11:04:41 BST, Mich Talebzadeh 
 wrote:  
 
 Hi,
Please see my responses below:
1) In Spark Structured Streaming does commit mean streaming data has been 
delivered to the sink like Snowflake?

No. a commit does not refer to data being delivered to a sink like Snowflake or 
bigQuery. The term commit refers to Spark Structured Streaming (SS) internals. 
Specifically it means that a micro-batch of data has been processed by SSS. In 
the checkpoint directory there is a subdirectory called commits that marks the 
micro-batch process as completed.
2) if sinks like Snowflake  cannot absorb or digest streaming data in a timely 
manner, will there be an impact on spark streaming itself?

Yes, it can potentially impact SSS. If the sink cannot absorb data in a timely 
manner, the batches will start to back up in SSS. This can cause Spark to run 
out of memory and the streaming job to fail. As I understand, Spark will use a 
combination of memory and disk storage (checkpointing). This can also happen if 
the network interface between Spark and the sink is disrupted. On the other 
hand Spark may slow down, as it tries to process the backed-up batches of data. 
You want to avoid these scenarios.
HTH
Mich Talebzadeh,Distinguished Technologist, Solutions Architect & Engineer
London
United Kingdom



   view my Linkedin profile




 https://en.everybodywiki.com/Mich_Talebzadeh

 

Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
damage or destructionof data or any other property which may arise from relying 
on this email's technical content is explicitly disclaimed.The author will in 
no case be liable for any monetary damages arising from suchloss, damage or 
destruction. 

 


On Sun, 8 Oct 2023 at 19:50, ashok34...@yahoo.com.INVALID 
 wrote:

Hello team
1) In Spark Structured Streaming does commit mean streaming data has been 
delivered to the sink like Snowflake?
2) if sinks like Snowflake  cannot absorb or digest streaming data in a timely 
manner, will there be an impact on spark streaming itself?
Thanks

AK
  

Re: Updating delta file column data

2023-10-09 Thread Mich Talebzadeh
In a nutshell, is this what you are trying to do?


   1. Read the Delta table into a Spark DataFrame.
   2. Explode the string column into a struct column.
   3. Convert the hexadecimal field to an integer.
   4. Write the DataFrame back to the Delta table in merge mode with a
   unique key.

Is this a fair assessment

HTH

Mich Talebzadeh,
Distinguished Technologist, Solutions Architect & Engineer
London
United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Mon, 9 Oct 2023 at 14:46, Karthick Nk  wrote:

> Hi All,
>
> I have  mentioned the sample data below and the operation I need to
> perform over there,
>
> I have delta tables with columns, in that columns I have the data in the
> string data type(contains the struct data),
>
> So, I need to update one key value in the struct field data in the string
> column of the delta table.
>
> Note: I can able to explode the string column into the struct field and
> into the individual field by using the following operation in the spark,
>
> [image: image.png]
>
> df_new = spark.read.json(df.rdd.map(lambda x: '{"data": x.data }')
>
> Could you suggest a possible way to perform the required action in an
> optimistic way?
>
> Note: Please feel free to ask, if you need further information.
>
> Thanks & regards,
> Karthick
>
> On Mon, Oct 2, 2023 at 10:53 PM Karthick Nk  wrote:
>
>> Hi community members,
>>
>> In databricks adls2 delta tables, I need to perform the below operation,
>> could you help me with your thoughts
>>
>>  I have the delta tables with one colum with data type string , which
>> contains the json data in string data type, I need to do the following
>> 1. I have to update one particular field value in the json and update it
>> back in the same column of the data.
>>
>> Example :
>>
>> In string column, inside json I have one field with value in hexadecimal.
>> Like { version : ''0xabcd1234"}
>>
>> I have to convert this field into corresponding integer value and update
>> back into same strong column json value.
>> Note: I have to perform this operation within this column. This column is
>> basically with data type string in delta table.
>>
>> Could you suggest some sample example.
>>
>> Thanks in advance.
>>
>


Re: Clarification with Spark Structured Streaming

2023-10-09 Thread Mich Talebzadeh
Hi,

Please see my responses below:

1) In Spark Structured Streaming does commit mean streaming data has been
delivered to the sink like Snowflake?

No. a commit does not refer to data being delivered to a sink like
Snowflake or bigQuery. The term commit refers to Spark Structured Streaming
(SS) internals. Specifically it means that a micro-batch of data has been
processed by SSS. In the checkpoint directory there is a
subdirectory called commits that marks the micro-batch process as completed.

2) if sinks like Snowflake  cannot absorb or digest streaming data in a
timely manner, will there be an impact on spark streaming itself?

Yes, it can potentially impact SSS. If the sink cannot absorb data in a
timely manner, the batches will start to back up in SSS. This can cause
Spark to run out of memory and the streaming job to fail. As I understand,
Spark will use a combination of memory and disk storage (checkpointing).
This can also happen if the network interface between Spark and the sink is
disrupted. On the other hand Spark may slow down, as it tries to process
the backed-up batches of data. You want to avoid these scenarios.

HTH

Mich Talebzadeh,
Distinguished Technologist, Solutions Architect & Engineer
London
United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Sun, 8 Oct 2023 at 19:50, ashok34...@yahoo.com.INVALID
 wrote:

> Hello team
>
> 1) In Spark Structured Streaming does commit mean streaming data has been
> delivered to the sink like Snowflake?
>
> 2) if sinks like Snowflake  cannot absorb or digest streaming data in a
> timely manner, will there be an impact on spark streaming itself?
>
> Thanks
>
> AK
>


Re: Updating delta file column data

2023-10-09 Thread Karthick Nk
Hi All,

I have  mentioned the sample data below and the operation I need to perform
over there,

I have delta tables with columns, in that columns I have the data in the
string data type(contains the struct data),

So, I need to update one key value in the struct field data in the string
column of the delta table.

Note: I can able to explode the string column into the struct field and
into the individual field by using the following operation in the spark,

[image: image.png]

df_new = spark.read.json(df.rdd.map(lambda x: '{"data": x.data }')

Could you suggest a possible way to perform the required action in an
optimistic way?

Note: Please feel free to ask, if you need further information.

Thanks & regards,
Karthick

On Mon, Oct 2, 2023 at 10:53 PM Karthick Nk  wrote:

> Hi community members,
>
> In databricks adls2 delta tables, I need to perform the below operation,
> could you help me with your thoughts
>
>  I have the delta tables with one colum with data type string , which
> contains the json data in string data type, I need to do the following
> 1. I have to update one particular field value in the json and update it
> back in the same column of the data.
>
> Example :
>
> In string column, inside json I have one field with value in hexadecimal.
> Like { version : ''0xabcd1234"}
>
> I have to convert this field into corresponding integer value and update
> back into same strong column json value.
> Note: I have to perform this operation within this column. This column is
> basically with data type string in delta table.
>
> Could you suggest some sample example.
>
> Thanks in advance.
>


Re: Connection pool shut down in Spark Iceberg Streaming Connector

2023-10-05 Thread Igor Calabria
You might be affected by this issue:
https://github.com/apache/iceberg/issues/8601

It was already patched but it isn't released yet.

On Thu, Oct 5, 2023 at 7:47 PM Prashant Sharma  wrote:

> Hi Sanket, more details might help here.
>
> How does your spark configuration look like?
>
> What exactly was done when this happened?
>
> On Thu, 5 Oct, 2023, 2:29 pm Agrawal, Sanket,
>  wrote:
>
>> Hello Everyone,
>>
>>
>>
>> We are trying to stream the changes in our Iceberg tables stored in AWS
>> S3. We are achieving this through Spark-Iceberg Connector and using JAR
>> files for Spark-AWS. Suddenly we have started receiving error “Connection
>> pool shut down”.
>>
>>
>>
>> Spark Version: 3.4.1
>>
>> Iceberg: 1.3.1
>>
>>
>>
>> Any help or guidance would of great help.
>>
>>
>>
>> Thank You,
>>
>> Sanket A.
>>
>>
>>
>> This message (including any attachments) contains confidential
>> information intended for a specific individual and purpose, and is
>> protected by law. If you are not the intended recipient, you should delete
>> this message and any disclosure, copying, or distribution of this message,
>> or the taking of any action based on it, by you is strictly prohibited.
>>
>> Deloitte refers to a Deloitte member firm, one of its related entities,
>> or Deloitte Touche Tohmatsu Limited ("DTTL"). Each Deloitte member firm is
>> a separate legal entity and a member of DTTL. DTTL does not provide
>> services to clients. Please see www.deloitte.com/about to learn more.
>>
>> v.E.1
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Spark Compatibility with Spring Boot 3.x

2023-10-05 Thread Angshuman Bhattacharya
Thanks Ahmed. I am trying to bring this up with Spark DE community

On Thu, Oct 5, 2023 at 12:32 PM Ahmed Albalawi <
ahmed.albal...@capitalone.com> wrote:

> Hello team,
>
> We are in the process of upgrading one of our apps to Spring Boot 3.x
> while using Spark, and we have encountered an issue with Spark
> compatibility, specifically with Jakarta Servlet. Spring Boot 3.x uses
> Jakarta Servlet while Spark uses Javax Servlet. Can we get some guidance on
> how to upgrade to Spring Boot 3.x while continuing to use Spark.
>
> The specific error is listed below:
>
> java.lang.NoClassDefFoundError: javax/servlet/Servlet
> at org.apache.spark.ui.SparkUI$.create(SparkUI.scala:239)
> at org.apache.spark.SparkContext.(SparkContext.scala:503)
> at org.apache.spark.SparkContext$.getOrCreate(SparkContext.scala:2888)
> at org.apache.spark.SparkContext.getOrCreate(SparkContext.scala)
>
> The error comes up when we try to run a mvn clean install, and the issue is 
> in our test cases. This issue happens specifically when we build our spark 
> session. The line of code it traces down to is as follows:
>
> *session = 
> SparkSession.builder().sparkContext(SparkContext.getOrCreate(sparkConf)).getOrCreate();*
>
> What we have tried:
>
> - We noticed according to this post 
> ,
>  there are no compatible versions of spark using version 5 of the Jakarta 
> Servlet API
>
> - We've tried 
> 
>  using the maven shade plugin to use jakarta instead of javax, but are 
> running into some other issues with this.
> - We've also looked at the following 
> 
>  to use jakarta 4.x with jersey 2.x and still have an issue with the servlet
>
>
> Please let us know if there are any solutions to this issue. Thanks!
>
>
> --
> *Ahmed Albalawi*
>
> Senior Associate Software Engineer • EP2 Tech - CuRE
>
> 571-668-3911 •  1680 Capital One Dr.
>

__



The information contained in this e-mail may be confidential and/or proprietary 
to Capital One and/or its affiliates and may only be used solely in performance 
of work or services for Capital One. The information transmitted herewith is 
intended only for use by the individual or entity to which it is addressed. If 
the reader of this message is not the intended recipient, you are hereby 
notified that any review, retransmission, dissemination, distribution, copying 
or other use of, or taking of any action in reliance upon this information is 
strictly prohibited. If you have received this communication in error, please 
contact the sender and delete the material from your computer.





Re: Spark Compatibility with Spring Boot 3.x

2023-10-05 Thread Sean Owen
I think we already updated this in Spark 4. However for now you would have
to also include a JAR with the jakarta.* classes instead.
You are welcome to try Spark 4 now by building from master, but it's far
from release.

On Thu, Oct 5, 2023 at 11:53 AM Ahmed Albalawi
 wrote:

> Hello team,
>
> We are in the process of upgrading one of our apps to Spring Boot 3.x
> while using Spark, and we have encountered an issue with Spark
> compatibility, specifically with Jakarta Servlet. Spring Boot 3.x uses
> Jakarta Servlet while Spark uses Javax Servlet. Can we get some guidance on
> how to upgrade to Spring Boot 3.x while continuing to use Spark.
>
> The specific error is listed below:
>
> java.lang.NoClassDefFoundError: javax/servlet/Servlet
> at org.apache.spark.ui.SparkUI$.create(SparkUI.scala:239)
> at org.apache.spark.SparkContext.(SparkContext.scala:503)
> at org.apache.spark.SparkContext$.getOrCreate(SparkContext.scala:2888)
> at org.apache.spark.SparkContext.getOrCreate(SparkContext.scala)
>
> The error comes up when we try to run a mvn clean install, and the issue is 
> in our test cases. This issue happens specifically when we build our spark 
> session. The line of code it traces down to is as follows:
>
> *session = 
> SparkSession.builder().sparkContext(SparkContext.getOrCreate(sparkConf)).getOrCreate();*
>
> What we have tried:
>
> - We noticed according to this post 
> ,
>  there are no compatible versions of spark using version 5 of the Jakarta 
> Servlet API
>
> - We've tried 
> 
>  using the maven shade plugin to use jakarta instead of javax, but are 
> running into some other issues with this.
> - We've also looked at the following 
> 
>  to use jakarta 4.x with jersey 2.x and still have an issue with the servlet
>
>
> Please let us know if there are any solutions to this issue. Thanks!
>
>
> --
> *Ahmed Albalawi*
>
> Senior Associate Software Engineer • EP2 Tech - CuRE
>
> 571-668-3911 •  1680 Capital One Dr.
> --
>
> The information contained in this e-mail may be confidential and/or
> proprietary to Capital One and/or its affiliates and may only be used
> solely in performance of work or services for Capital One. The information
> transmitted herewith is intended only for use by the individual or entity
> to which it is addressed. If the reader of this message is not the intended
> recipient, you are hereby notified that any review, retransmission,
> dissemination, distribution, copying or other use of, or taking of any
> action in reliance upon this information is strictly prohibited. If you
> have received this communication in error, please contact the sender and
> delete the material from your computer.
>
>
>
>
>


Re: [PySpark Structured Streaming] How to tune .repartition(N) ?

2023-10-05 Thread Mich Talebzadeh
The fact that you have 60 partitions or brokers in kaka  is not directly
correlated  to Spark Structured Streaming (SSS) executors by itself. See
below.

Spark starts with 200 partitions. However, by default, Spark/PySpark
creates partitions that are equal to the number of CPU cores in the node,
the so called vcores. So it depends on the number of nodes you are using in
your spark cluster.

Without doing a PoC you would not need to worry about repartition(10) in
your writeStream. I suggest that for now you remove that parameter and
observe the spark processing through Spark GUI (default port 4040) and in
particular the page on Structured Streaming". Your sink is Delta Lake which
is no different from any other data warehouses such as Google BigQuery.

My general advice, the usual thing to watch  from Spark GUI

Processing Time (Process Rate)  + Reserved Capacity < Batch Interval (Batch
Duration)

If your sink ( Delta Lake) has an issue absorbing data in a timely manner
as per above formulae, you will see the defect on the Processing Rate

Batch Interval, i.e. the rate at which the upstream source sends messages
through Kafka. We can start by assuming that the rate of increase in the
number of messages processed (processing time) will require an additional
reserved capacity. We can anticipate a heuristic 70% (~1SD) increase in the
processing time so in theory you  should be able to handle all this work
below the batch interval.

The parameter which I think many deploy is
spark.streaming.backpressure.enabled
> (spark.conf.set("spark.streaming.backpressure.enabled", "true"). The
central idea is that if a component is struggling to keep up, it should
communicate to upstream components and get them to reduce the load. In the
context of Spark Streaming, the receiver is the upstream component which
gets notified if the executors cannot keep up. There are a number of
occasions this will  (not just necessarily the spike in the incoming
messages). For example:

   - Streaming Source: Unexpected short burst of incoming messages in
   source system
   - YARN: Lost Spark executors due to node(s) failure
   - External Sink System: High load on external systems such as Delta
   Lake, BigQuery etc

Without backpressure, microbatches queue up over time and the scheduling
delay increases (check Operation Duration from GUI).

The next parameter I think of is sparkStreamingBackpressurePidMinRate. It is
 the total records per second. It relies on
spark.streaming.kafka.maxRatePerPartition, (not set), which is the maximum
rate (number of records per second) at which messages will be read from
each Kafka partition.

So  sparkStreamingBackpressurePidMinRate starts with

n (total number of kafka partitions)
* spark.streaming.kafka.maxRatePerPartition * Batch Interval

spark.streaming.kafka.maxRatePerPartition is used to control the maximum
rate of data ingestion from Kafka per partition. Kafka topics can have
multiple partitions, and Spark Streaming processes data in parallel by
reading from these partitions.
If you set spark.streaming.kafka.maxRatePerPartition to 1000, Spark
Streaming will consume data from each Kafka partition at a rate of up to
1000 messages per second.

So in your case if you set it goes something like

60 * 1000 * Batch Interval (in seconds)

Of course I stand corrected.

HTH

Mich Talebzadeh,
Distinguished Technologist, Solutions Architect & Engineer
London
United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Thu, 5 Oct 2023 at 05:54, Shao Yang Hong
 wrote:

> Hi all on user@spark:
>
> We are looking for advice and suggestions on how to tune the
> .repartition() parameter.
>
> We are using Spark Streaming on our data pipeline to consume messages
> and persist them to a Delta Lake
> (https://delta.io/learn/getting-started/).
>
> We read messages from a Kafka topic, then add a generated date column
> as a daily partitioning, and save these records to Delta Lake. We have
> 60 Kafka partitions on the Kafka topic, 15 Spark executor instances
> (so 4 Kafka partitions per executor).
>
> How then, should we use .repartition()? Should we omit this parameter?
> Or set it to 15? or 4?
>
> Our code looks roughly like the below:
>
> ```
> df = (
> spark.readStream.format("kafka")
> .option("kafka.bootstrap.servers", os.environ["KAFKA_BROKERS"])
> .option("subscribe", os.environ["KAFKA_TOPIC"])
> .load()
> )
>
> table = (
> df.select(
> from_protobuf(
> "value", "table", "/opt/protobuf-desc/table.desc"
>   

Re: [PySpark Structured Streaming] How to tune .repartition(N) ?

2023-10-05 Thread Perez
You can try the 'optimize' command of delta lake. That will help you for
sure. It merges small files. Also, it depends on the file format. If you
are working with Parquet then still small files should not cause any issues.

P.

On Thu, Oct 5, 2023 at 10:55 AM Shao Yang Hong
 wrote:

> Hi Raghavendra,
>
> Yes, we are trying to reduce the number of files in delta as well (the
> small file problem [0][1]).
>
> We already have a scheduled app to compact files, but the number of
> files is still large, at 14K files per day.
>
> [0]: https://docs.delta.io/latest/optimizations-oss.html#language-python
> [1]:
> https://delta.io/blog/2023-01-25-delta-lake-small-file-compaction-optimize/
>
> On Thu, Oct 5, 2023 at 12:53 PM Raghavendra Ganesh
>  wrote:
> >
> > Hi,
> > What is the purpose for which you want to use repartition() .. to reduce
> the number of files in delta?
> > Also note that there is an alternative option of using coalesce()
> instead of repartition().
> > --
> > Raghavendra
> >
> >
> > On Thu, Oct 5, 2023 at 10:15 AM Shao Yang Hong <
> shaoyang.h...@ninjavan.co.invalid> wrote:
> >>
> >> Hi all on user@spark:
> >>
> >> We are looking for advice and suggestions on how to tune the
> >> .repartition() parameter.
> >>
> >> We are using Spark Streaming on our data pipeline to consume messages
> >> and persist them to a Delta Lake
> >> (https://delta.io/learn/getting-started/).
> >>
> >> We read messages from a Kafka topic, then add a generated date column
> >> as a daily partitioning, and save these records to Delta Lake. We have
> >> 60 Kafka partitions on the Kafka topic, 15 Spark executor instances
> >> (so 4 Kafka partitions per executor).
> >>
> >> How then, should we use .repartition()? Should we omit this parameter?
> >> Or set it to 15? or 4?
> >>
> >> Our code looks roughly like the below:
> >>
> >> ```
> >> df = (
> >> spark.readStream.format("kafka")
> >> .option("kafka.bootstrap.servers", os.environ["KAFKA_BROKERS"])
> >> .option("subscribe", os.environ["KAFKA_TOPIC"])
> >> .load()
> >> )
> >>
> >> table = (
> >> df.select(
> >> from_protobuf(
> >> "value", "table", "/opt/protobuf-desc/table.desc"
> >> ).alias("msg")
> >> )
> >> .withColumn("uuid", col("msg.uuid"))
> >> # etc other columns...
> >>
> >> # generated column for daily partitioning in Delta Lake
> >> .withColumn(CREATED_DATE,
> >> date_format(from_unixtime("msg.logged_at"), "-MM-dd"))
> >> .drop("msg")
> >> )
> >>
> >> query = (
> >> table
> >> .repartition(10).writeStream
> >> .queryName(APP_NAME)
> >> .outputMode("append")
> >> .format("delta")
> >> .partitionBy(CREATED_DATE)
> >> .option("checkpointLocation", os.environ["CHECKPOINT"])
> >> .start(os.environ["DELTA_PATH"])
> >> )
> >>
> >> query.awaitTermination()
> >> spark.stop()
> >> ```
> >>
> >> Any advice would be appreciated.
> >>
> >> --
> >> Best Regards,
> >> Shao Yang HONG
> >>
> >> -
> >> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> >>
>
>
> --
> Best Regards,
> Shao Yang HONG
> Software Engineer, Pricing, Tech
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Connection pool shut down in Spark Iceberg Streaming Connector

2023-10-05 Thread Prashant Sharma
Hi Sanket, more details might help here.

How does your spark configuration look like?

What exactly was done when this happened?

On Thu, 5 Oct, 2023, 2:29 pm Agrawal, Sanket,
 wrote:

> Hello Everyone,
>
>
>
> We are trying to stream the changes in our Iceberg tables stored in AWS
> S3. We are achieving this through Spark-Iceberg Connector and using JAR
> files for Spark-AWS. Suddenly we have started receiving error “Connection
> pool shut down”.
>
>
>
> Spark Version: 3.4.1
>
> Iceberg: 1.3.1
>
>
>
> Any help or guidance would of great help.
>
>
>
> Thank You,
>
> Sanket A.
>
>
>
> This message (including any attachments) contains confidential information
> intended for a specific individual and purpose, and is protected by law. If
> you are not the intended recipient, you should delete this message and any
> disclosure, copying, or distribution of this message, or the taking of any
> action based on it, by you is strictly prohibited.
>
> Deloitte refers to a Deloitte member firm, one of its related entities, or
> Deloitte Touche Tohmatsu Limited ("DTTL"). Each Deloitte member firm is a
> separate legal entity and a member of DTTL. DTTL does not provide services
> to clients. Please see www.deloitte.com/about to learn more.
>
> v.E.1
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org


Re: [PySpark Structured Streaming] How to tune .repartition(N) ?

2023-10-04 Thread Shao Yang Hong
Hi Raghavendra,

Yes, we are trying to reduce the number of files in delta as well (the
small file problem [0][1]).

We already have a scheduled app to compact files, but the number of
files is still large, at 14K files per day.

[0]: https://docs.delta.io/latest/optimizations-oss.html#language-python
[1]: https://delta.io/blog/2023-01-25-delta-lake-small-file-compaction-optimize/

On Thu, Oct 5, 2023 at 12:53 PM Raghavendra Ganesh
 wrote:
>
> Hi,
> What is the purpose for which you want to use repartition() .. to reduce the 
> number of files in delta?
> Also note that there is an alternative option of using coalesce() instead of 
> repartition().
> --
> Raghavendra
>
>
> On Thu, Oct 5, 2023 at 10:15 AM Shao Yang Hong 
>  wrote:
>>
>> Hi all on user@spark:
>>
>> We are looking for advice and suggestions on how to tune the
>> .repartition() parameter.
>>
>> We are using Spark Streaming on our data pipeline to consume messages
>> and persist them to a Delta Lake
>> (https://delta.io/learn/getting-started/).
>>
>> We read messages from a Kafka topic, then add a generated date column
>> as a daily partitioning, and save these records to Delta Lake. We have
>> 60 Kafka partitions on the Kafka topic, 15 Spark executor instances
>> (so 4 Kafka partitions per executor).
>>
>> How then, should we use .repartition()? Should we omit this parameter?
>> Or set it to 15? or 4?
>>
>> Our code looks roughly like the below:
>>
>> ```
>> df = (
>> spark.readStream.format("kafka")
>> .option("kafka.bootstrap.servers", os.environ["KAFKA_BROKERS"])
>> .option("subscribe", os.environ["KAFKA_TOPIC"])
>> .load()
>> )
>>
>> table = (
>> df.select(
>> from_protobuf(
>> "value", "table", "/opt/protobuf-desc/table.desc"
>> ).alias("msg")
>> )
>> .withColumn("uuid", col("msg.uuid"))
>> # etc other columns...
>>
>> # generated column for daily partitioning in Delta Lake
>> .withColumn(CREATED_DATE,
>> date_format(from_unixtime("msg.logged_at"), "-MM-dd"))
>> .drop("msg")
>> )
>>
>> query = (
>> table
>> .repartition(10).writeStream
>> .queryName(APP_NAME)
>> .outputMode("append")
>> .format("delta")
>> .partitionBy(CREATED_DATE)
>> .option("checkpointLocation", os.environ["CHECKPOINT"])
>> .start(os.environ["DELTA_PATH"])
>> )
>>
>> query.awaitTermination()
>> spark.stop()
>> ```
>>
>> Any advice would be appreciated.
>>
>> --
>> Best Regards,
>> Shao Yang HONG
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>


-- 
Best Regards,
Shao Yang HONG
Software Engineer, Pricing, Tech

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: [PySpark Structured Streaming] How to tune .repartition(N) ?

2023-10-04 Thread Raghavendra Ganesh
Hi,
What is the purpose for which you want to use repartition() .. to reduce
the number of files in delta?
Also note that there is an alternative option of using coalesce() instead
of repartition().
--
Raghavendra


On Thu, Oct 5, 2023 at 10:15 AM Shao Yang Hong
 wrote:

> Hi all on user@spark:
>
> We are looking for advice and suggestions on how to tune the
> .repartition() parameter.
>
> We are using Spark Streaming on our data pipeline to consume messages
> and persist them to a Delta Lake
> (https://delta.io/learn/getting-started/).
>
> We read messages from a Kafka topic, then add a generated date column
> as a daily partitioning, and save these records to Delta Lake. We have
> 60 Kafka partitions on the Kafka topic, 15 Spark executor instances
> (so 4 Kafka partitions per executor).
>
> How then, should we use .repartition()? Should we omit this parameter?
> Or set it to 15? or 4?
>
> Our code looks roughly like the below:
>
> ```
> df = (
> spark.readStream.format("kafka")
> .option("kafka.bootstrap.servers", os.environ["KAFKA_BROKERS"])
> .option("subscribe", os.environ["KAFKA_TOPIC"])
> .load()
> )
>
> table = (
> df.select(
> from_protobuf(
> "value", "table", "/opt/protobuf-desc/table.desc"
> ).alias("msg")
> )
> .withColumn("uuid", col("msg.uuid"))
> # etc other columns...
>
> # generated column for daily partitioning in Delta Lake
> .withColumn(CREATED_DATE,
> date_format(from_unixtime("msg.logged_at"), "-MM-dd"))
> .drop("msg")
> )
>
> query = (
> table
> .repartition(10).writeStream
> .queryName(APP_NAME)
> .outputMode("append")
> .format("delta")
> .partitionBy(CREATED_DATE)
> .option("checkpointLocation", os.environ["CHECKPOINT"])
> .start(os.environ["DELTA_PATH"])
> )
>
> query.awaitTermination()
> spark.stop()
> ```
>
> Any advice would be appreciated.
>
> --
> Best Regards,
> Shao Yang HONG
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Seeking Guidance on Spark on Kubernetes Secrets Configuration

2023-10-01 Thread Jon Rodríguez Aranguren
Dear Jörn Franke, Jayabindu Singh and Spark Community members,

Thank you profoundly for your initial insights. I feel it's necessary to
provide more precision on our setup to facilitate a deeper understanding.

We're interfacing with S3 Compatible storages, but our operational context
is somewhat distinct. Our infrastructure doesn't lean on conventional cloud
providers like AWS. Instead, we've architected our environment on
On-Premise Kubernetes distributions, specifically k0s and Openshift.

Our objective extends beyond just handling S3 keys. We're orchestrating a
solution that integrates Azure SPNs, API Credentials, and other sensitive
credentials, intending to make Kubernetes' native secrets our central
management hub. The aspiration is to have a universally deployable JAR, one
that can function unmodified across different ecosystems like EMR,
Databricks (on both AWS and Azure), etc. Platforms like Databricks have
already made strides in this direction, allowing secrets to be woven
directly into the Spark Conf through mechanisms like
{{secret_scope/secret_name}}, which are resolved dynamically.

The spark-on-k8s-operator's user guide suggests the feasibility of mounting
secrets. However, a gap exists in our understanding of how to subsequently
access these mounted secret values within the Spark application's context.

Here lies my inquiry: is the spark-on-k8s-operator currently equipped to
support this level of integration? If it does, any elucidation on the
method or best practices would be pivotal for our project. Alternatively,
if you could point me to resources or community experts who have tackled
similar challenges, it would be of immense assistance.

Thank you for bearing with the intricacies of our query, and I appreciate
your continued guidance in this endeavor.

Warm regards,

Jon Rodríguez Aranguren.

El sáb, 30 sept 2023 a las 23:19, Jayabindu Singh ()
escribió:

> Hi Jon,
>
> Using IAM as suggested by Jorn is the best approach.
> We recently moved our spark workload from HDP to Spark on K8 and utilizing
> IAM.
> It will save you from secret management headaches and also allows a lot
> more flexibility on access control and option to allow access to multiple
> S3 buckets in the same pod.
> We have implemented this across Azure, Google and AWS. Azure does require
> some extra work to make it work.
>
> On Sat, Sep 30, 2023 at 12:05 PM Jörn Franke  wrote:
>
>> Don’t use static iam (s3) credentials. It is an outdated insecure method
>> - even AWS recommend against using this for anything (cf eg
>> https://docs.aws.amazon.com/cli/latest/userguide/cli-authentication-user.html
>> ).
>> It is almost a guarantee to get your data stolen and your account
>> manipulated.
>>
>> If you need to use kubernetes (which has its own very problematic
>> security issues) then assign AWS IAM roles with minimal permissions to the
>> pods (for EKS it means using OIDC, cf
>> https://docs.aws.amazon.com/eks/latest/userguide/service_IAM_role.html).
>>
>> Am 30.09.2023 um 03:41 schrieb Jon Rodríguez Aranguren <
>> jon.r.arangu...@gmail.com>:
>>
>> 
>> Dear Spark Community Members,
>>
>> I trust this message finds you all in good health and spirits.
>>
>> I'm reaching out to the collective expertise of this esteemed community
>> with a query regarding Spark on Kubernetes. As a newcomer, I have always
>> admired the depth and breadth of knowledge shared within this forum, and it
>> is my hope that some of you might have insights on a specific challenge I'm
>> facing.
>>
>> I am currently trying to configure multiple Kubernetes secrets, notably
>> multiple S3 keys, at the SparkConf level for a Spark application. My
>> objective is to understand the best approach or methods to ensure that
>> these secrets can be smoothly accessed by the Spark application.
>>
>> If any of you have previously encountered this scenario or possess
>> relevant insights on the matter, your guidance would be highly beneficial.
>>
>> Thank you for your time and consideration. I'm eager to learn from the
>> experiences and knowledge present within this community.
>>
>> Warm regards,
>> Jon
>>
>>


Re: Seeking Guidance on Spark on Kubernetes Secrets Configuration

2023-10-01 Thread Jörn Franke
There is nowadays more a trend to move away from static credentials/certificates that are stored in a secret vault. The issue is that the rotation of them is complex, once they are leaked they can be abused, making minimal permissions feasible is cumbersome etc. That is why keyless approaches are used for A2A access (workload identity federation was mentioned). E.g. in AWS EKS you would build this on oidc (https://docs.aws.amazon.com/eks/latest/userguide/enable-iam-roles-for-service-accounts.html) and configure this instead of using secrets. Similar approaches exist in other clouds and even on-premise (eg SPIFFE https://spiffe.io/).If this will become the standard will be difficult to say - for sure they seem to more easier to manage.Since you seem to have a Kubernetes setup which means per cloud/data Centre a lot of extra work, infrastructure cost and security issues, workload Identity federation may ease this compared to a secret store.Am 01.10.2023 um 08:27 schrieb Jon Rodríguez Aranguren :Dear Jörn Franke, Jayabindu Singh and Spark Community members,Thank you profoundly for your initial insights. I feel it's necessary to provide more precision on our setup to facilitate a deeper understanding.We're interfacing with S3 Compatible storages, but our operational context is somewhat distinct. Our infrastructure doesn't lean on conventional cloud providers like AWS. Instead, we've architected our environment on On-Premise Kubernetes distributions, specifically k0s and Openshift.Our objective extends beyond just handling S3 keys. We're orchestrating a solution that integrates Azure SPNs, API Credentials, and other sensitive credentials, intending to make Kubernetes' native secrets our central management hub. The aspiration is to have a universally deployable JAR, one that can function unmodified across different ecosystems like EMR, Databricks (on both AWS and Azure), etc. Platforms like Databricks have already made strides in this direction, allowing secrets to be woven directly into the Spark Conf through mechanisms like {{secret_scope/secret_name}}, which are resolved dynamically.The spark-on-k8s-operator's user guide suggests the feasibility of mounting secrets. However, a gap exists in our understanding of how to subsequently access these mounted secret values within the Spark application's context.Here lies my inquiry: is the spark-on-k8s-operator currently equipped to support this level of integration? If it does, any elucidation on the method or best practices would be pivotal for our project. Alternatively, if you could point me to resources or community experts who have tackled similar challenges, it would be of immense assistance.Thank you for bearing with the intricacies of our query, and I appreciate your continued guidance in this endeavor.Warm regards,Jon Rodríguez Aranguren.El sáb, 30 sept 2023 a las 23:19, Jayabindu Singh () escribió:Hi Jon,Using IAM as suggested by Jorn is the best approach.We recently moved our spark workload from HDP to Spark on K8 and utilizing IAM.It will save you from secret management headaches and also allows a lot more flexibility on access control and option to allow access to multiple S3 buckets in the same pod. We have implemented this across Azure, Google and AWS. Azure does require some extra work to make it work.On Sat, Sep 30, 2023 at 12:05 PM Jörn Franke  wrote:Don’t use static iam (s3) credentials. It is an outdated insecure method - even AWS recommend against using this for anything (cf eg https://docs.aws.amazon.com/cli/latest/userguide/cli-authentication-user.html).It is almost a guarantee to get your data stolen and your account manipulated. If you need to use kubernetes (which has its own very problematic security issues) then assign AWS IAM roles with minimal permissions to the pods (for EKS it means using OIDC, cf https://docs.aws.amazon.com/eks/latest/userguide/service_IAM_role.html).Am 30.09.2023 um 03:41 schrieb Jon Rodríguez Aranguren :Dear Spark Community Members,I trust this message finds you all in good health and spirits.I'm reaching out to the collective expertise of this esteemed community with a query regarding Spark on Kubernetes. As a newcomer, I have always admired the depth and breadth of knowledge shared within this forum, and it is my hope that some of you might have insights on a specific challenge I'm facing.I am currently trying to configure multiple Kubernetes secrets, notably multiple S3 keys, at the SparkConf level for a Spark application. My objective is to understand the best approach or methods to ensure that these secrets can be smoothly accessed by the Spark application.If any of you have previously encountered this scenario or possess relevant insights on the matter, your guidance would be highly beneficial.Thank you for your time and consideration. I'm eager to learn from the experiences and knowledge present within this community.Warm regards,Jon




Re: Seeking Guidance on Spark on Kubernetes Secrets Configuration

2023-10-01 Thread Jörn Franke
With oidc sth comparable is possible: https://docs.aws.amazon.com/eks/latest/userguide/enable-iam-roles-for-service-accounts.htmlAm 01.10.2023 um 11:13 schrieb Mich Talebzadeh :It seems that workload identity is not available on AWS. Workload Identity replaces the need to use Metadata concealment on exposed storage such as s3 and gcs. The sensitive metadata protected by metadata concealment is also protected by Workload Identity.Both Google Cloud Kubernetes (GKE) and Azure Kubernetes Service support Workload Identity. Taking notes from Google Cloud:  "Workload Identity is the recommended way for your workloads running on Google Kubernetes Engine (GKE) to access Google Cloud services in a secure and manageable way."HTH

Mich Talebzadeh,Distinguished Technologist, Solutions Architect & EngineerLondonUnited Kingdom

   view my Linkedin profile https://en.everybodywiki.com/Mich_Talebzadeh

 Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction
of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from such
loss, damage or destruction.  

On Sun, 1 Oct 2023 at 06:36, Jayabindu Singh  wrote:Hi Jon,Using IAM as suggested by Jorn is the best approach.We recently moved our spark workload from HDP to Spark on K8 and utilizing IAM.It will save you from secret management headaches and also allows a lot more flexibility on access control and option to allow access to multiple S3 buckets in the same pod. We have implemented this across Azure, Google and AWS. Azure does require some extra work to make it work.On Sat, Sep 30, 2023 at 12:05 PM Jörn Franke  wrote:Don’t use static iam (s3) credentials. It is an outdated insecure method - even AWS recommend against using this for anything (cf eg https://docs.aws.amazon.com/cli/latest/userguide/cli-authentication-user.html).It is almost a guarantee to get your data stolen and your account manipulated. If you need to use kubernetes (which has its own very problematic security issues) then assign AWS IAM roles with minimal permissions to the pods (for EKS it means using OIDC, cf https://docs.aws.amazon.com/eks/latest/userguide/service_IAM_role.html).Am 30.09.2023 um 03:41 schrieb Jon Rodríguez Aranguren :Dear Spark Community Members,I trust this message finds you all in good health and spirits.I'm reaching out to the collective expertise of this esteemed community with a query regarding Spark on Kubernetes. As a newcomer, I have always admired the depth and breadth of knowledge shared within this forum, and it is my hope that some of you might have insights on a specific challenge I'm facing.I am currently trying to configure multiple Kubernetes secrets, notably multiple S3 keys, at the SparkConf level for a Spark application. My objective is to understand the best approach or methods to ensure that these secrets can be smoothly accessed by the Spark application.If any of you have previously encountered this scenario or possess relevant insights on the matter, your guidance would be highly beneficial.Thank you for your time and consideration. I'm eager to learn from the experiences and knowledge present within this community.Warm regards,Jon




Re: Seeking Guidance on Spark on Kubernetes Secrets Configuration

2023-10-01 Thread Mich Talebzadeh
It seems that workload identity
 is not
available on AWS. Workload Identity replaces the need to use Metadata
concealment on exposed storage such as s3 and gcs. The sensitive metadata
protected by metadata concealment is also protected by Workload Identity.

Both Google Cloud Kubernetes (GKE
)
and Azure Kubernetes Servi
ce
support Workload Identity. Taking notes from Google Cloud:  "Workload
Identity is the recommended way for your workloads running on Google
Kubernetes Engine (GKE) to access Google Cloud services in a secure and
manageable way."


HTH


Mich Talebzadeh,
Distinguished Technologist, Solutions Architect & Engineer
London
United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Sun, 1 Oct 2023 at 06:36, Jayabindu Singh  wrote:

> Hi Jon,
>
> Using IAM as suggested by Jorn is the best approach.
> We recently moved our spark workload from HDP to Spark on K8 and utilizing
> IAM.
> It will save you from secret management headaches and also allows a lot
> more flexibility on access control and option to allow access to multiple
> S3 buckets in the same pod.
> We have implemented this across Azure, Google and AWS. Azure does require
> some extra work to make it work.
>
> On Sat, Sep 30, 2023 at 12:05 PM Jörn Franke  wrote:
>
>> Don’t use static iam (s3) credentials. It is an outdated insecure method
>> - even AWS recommend against using this for anything (cf eg
>> https://docs.aws.amazon.com/cli/latest/userguide/cli-authentication-user.html
>> ).
>> It is almost a guarantee to get your data stolen and your account
>> manipulated.
>>
>> If you need to use kubernetes (which has its own very problematic
>> security issues) then assign AWS IAM roles with minimal permissions to the
>> pods (for EKS it means using OIDC, cf
>> https://docs.aws.amazon.com/eks/latest/userguide/service_IAM_role.html).
>>
>> Am 30.09.2023 um 03:41 schrieb Jon Rodríguez Aranguren <
>> jon.r.arangu...@gmail.com>:
>>
>> 
>> Dear Spark Community Members,
>>
>> I trust this message finds you all in good health and spirits.
>>
>> I'm reaching out to the collective expertise of this esteemed community
>> with a query regarding Spark on Kubernetes. As a newcomer, I have always
>> admired the depth and breadth of knowledge shared within this forum, and it
>> is my hope that some of you might have insights on a specific challenge I'm
>> facing.
>>
>> I am currently trying to configure multiple Kubernetes secrets, notably
>> multiple S3 keys, at the SparkConf level for a Spark application. My
>> objective is to understand the best approach or methods to ensure that
>> these secrets can be smoothly accessed by the Spark application.
>>
>> If any of you have previously encountered this scenario or possess
>> relevant insights on the matter, your guidance would be highly beneficial.
>>
>> Thank you for your time and consideration. I'm eager to learn from the
>> experiences and knowledge present within this community.
>>
>> Warm regards,
>> Jon
>>
>>


Re: Seeking Guidance on Spark on Kubernetes Secrets Configuration

2023-09-30 Thread Jayabindu Singh
Hi Jon,

Using IAM as suggested by Jorn is the best approach.
We recently moved our spark workload from HDP to Spark on K8 and utilizing
IAM.
It will save you from secret management headaches and also allows a lot
more flexibility on access control and option to allow access to multiple
S3 buckets in the same pod.
We have implemented this across Azure, Google and AWS. Azure does require
some extra work to make it work.

On Sat, Sep 30, 2023 at 12:05 PM Jörn Franke  wrote:

> Don’t use static iam (s3) credentials. It is an outdated insecure method -
> even AWS recommend against using this for anything (cf eg
> https://docs.aws.amazon.com/cli/latest/userguide/cli-authentication-user.html
> ).
> It is almost a guarantee to get your data stolen and your account
> manipulated.
>
> If you need to use kubernetes (which has its own very problematic security
> issues) then assign AWS IAM roles with minimal permissions to the pods (for
> EKS it means using OIDC, cf
> https://docs.aws.amazon.com/eks/latest/userguide/service_IAM_role.html).
>
> Am 30.09.2023 um 03:41 schrieb Jon Rodríguez Aranguren <
> jon.r.arangu...@gmail.com>:
>
> 
> Dear Spark Community Members,
>
> I trust this message finds you all in good health and spirits.
>
> I'm reaching out to the collective expertise of this esteemed community
> with a query regarding Spark on Kubernetes. As a newcomer, I have always
> admired the depth and breadth of knowledge shared within this forum, and it
> is my hope that some of you might have insights on a specific challenge I'm
> facing.
>
> I am currently trying to configure multiple Kubernetes secrets, notably
> multiple S3 keys, at the SparkConf level for a Spark application. My
> objective is to understand the best approach or methods to ensure that
> these secrets can be smoothly accessed by the Spark application.
>
> If any of you have previously encountered this scenario or possess
> relevant insights on the matter, your guidance would be highly beneficial.
>
> Thank you for your time and consideration. I'm eager to learn from the
> experiences and knowledge present within this community.
>
> Warm regards,
> Jon
>
>


Re: Seeking Guidance on Spark on Kubernetes Secrets Configuration

2023-09-30 Thread Jörn Franke
Don’t use static iam (s3) credentials. It is an outdated insecure method - even 
AWS recommend against using this for anything (cf eg 
https://docs.aws.amazon.com/cli/latest/userguide/cli-authentication-user.html).
It is almost a guarantee to get your data stolen and your account manipulated. 

If you need to use kubernetes (which has its own very problematic security 
issues) then assign AWS IAM roles with minimal permissions to the pods (for EKS 
it means using OIDC, cf 
https://docs.aws.amazon.com/eks/latest/userguide/service_IAM_role.html).

> Am 30.09.2023 um 03:41 schrieb Jon Rodríguez Aranguren 
> :
> 
> 
> Dear Spark Community Members,
> 
> I trust this message finds you all in good health and spirits.
> 
> I'm reaching out to the collective expertise of this esteemed community with 
> a query regarding Spark on Kubernetes. As a newcomer, I have always admired 
> the depth and breadth of knowledge shared within this forum, and it is my 
> hope that some of you might have insights on a specific challenge I'm facing.
> 
> I am currently trying to configure multiple Kubernetes secrets, notably 
> multiple S3 keys, at the SparkConf level for a Spark application. My 
> objective is to understand the best approach or methods to ensure that these 
> secrets can be smoothly accessed by the Spark application.
> 
> If any of you have previously encountered this scenario or possess relevant 
> insights on the matter, your guidance would be highly beneficial.
> 
> Thank you for your time and consideration. I'm eager to learn from the 
> experiences and knowledge present within this community.
> 
> Warm regards,
> Jon


Re: Inquiry about Processing Speed

2023-09-28 Thread Jack Goodson
Hi Haseeb,

I think the user mailing list is what you're looking for, people are
usually pretty active on here if you present a direct question about apache
spark. I've linked below the community guidelines which says which mailing
lists are for what etc

https://spark.apache.org/community.html

There's a few resources below for cluster management and code performance
tweaks but if you write declaratively in Spark the planning engine does a
pretty good job of optimising jobs, it's hard to answer without a specific
problem presented, hope the docs get you started

https://spark.apache.org/docs/latest/cluster-overview.html

https://spark.apache.org/docs/latest/tuning.html

https://spark.apache.org/docs/latest/sql-performance-tuning.html

On Thu, Sep 28, 2023 at 3:22 PM Haseeb Khalid 
wrote:

> Dear Support Team,
>
> I hope this message finds you well. My name is Haseeb Khalid, and I am
> reaching out to discuss a scenario related to processing speed in Apache
> Spark.
>
> I have been utilizing these technologies in our projects, and we have
> encountered a specific use case where we are seeking to optimize processing
> speed. Given the critical nature of this requirement, I would greatly
> appreciate the opportunity to discuss this with a knowledgeable
> representative from your team.
>
> I am particularly interested in understanding any best practices,
> configuration tweaks, or architectural considerations that can be employed
> to enhance processing speed in our specific scenario.
>
> Would it be possible to schedule a call or exchange emails to delve deeper
> into this matter? I am available at your convenience and can accommodate
> any preferred mode of communication.
>
> I genuinely value the expertise of the Apache Spark communities and
> believe that your insights will be instrumental in achieving our objectives.
>
> Thank you very much for your time and consideration. I look forward to
> hearing from you soon.
>
> --
>
> Thanks & Best Regards,
>
> *Haseeb Khalid*
>
> *Senior Data Analyst*
>
> +92 306 4436 790
>
>
>


Re: Inquiry about Processing Speed

2023-09-27 Thread Deepak Goel
Hi

"Processing Speed" can be at a software level (Code Optimization) and at a
hardware level (Capacity Planning)

Deepak
"The greatness of a nation can be judged by the way its animals are treated
- Mahatma Gandhi"

+91 73500 12833
deic...@gmail.com

Facebook: https://www.facebook.com/deicool
LinkedIn: www.linkedin.com/in/deicool

"Plant a Tree, Go Green"

Make In India : http://www.makeinindia.com/home


On Thu, Sep 28, 2023 at 7:53 AM Haseeb Khalid 
wrote:

> Dear Support Team,
>
> I hope this message finds you well. My name is Haseeb Khalid, and I am
> reaching out to discuss a scenario related to processing speed in Apache
> Spark.
>
> I have been utilizing these technologies in our projects, and we have
> encountered a specific use case where we are seeking to optimize processing
> speed. Given the critical nature of this requirement, I would greatly
> appreciate the opportunity to discuss this with a knowledgeable
> representative from your team.
>
> I am particularly interested in understanding any best practices,
> configuration tweaks, or architectural considerations that can be employed
> to enhance processing speed in our specific scenario.
>
> Would it be possible to schedule a call or exchange emails to delve deeper
> into this matter? I am available at your convenience and can accommodate
> any preferred mode of communication.
>
> I genuinely value the expertise of the Apache Spark communities and
> believe that your insights will be instrumental in achieving our objectives.
>
> Thank you very much for your time and consideration. I look forward to
> hearing from you soon.
>
> --
>
> Thanks & Best Regards,
>
> *Haseeb Khalid*
>
> *Senior Data Analyst*
>
> +92 306 4436 790
>
>
>


Re: Urgent: Seeking Guidance on Kafka Slow Consumer and Data Skew Problem

2023-09-22 Thread Karthick
Hi All,

It will be helpful if we gave any pointers to the problem addressed.

Thanks
Karthick.

On Wed, Sep 20, 2023 at 3:03 PM Gowtham S  wrote:

> Hi Spark Community,
>
> Thank you for bringing up this issue. We've also encountered the same
> challenge and are actively working on finding a solution. It's reassuring
> to know that we're not alone in this.
>
> If you have any insights or suggestions regarding how to address this
> problem, please feel free to share them.
>
> Looking forward to hearing from others who might have encountered similar
> issues.
>
>
> Thanks and regards,
> Gowtham S
>
>
> On Tue, 19 Sept 2023 at 17:23, Karthick 
> wrote:
>
>> Subject: Seeking Guidance on Kafka Slow Consumer and Data Skew Problem
>>
>> Dear Spark Community,
>>
>> I recently reached out to the Apache Flink community for assistance with
>> a critical issue we are facing in our IoT platform, which relies on Apache
>> Kafka and real-time data processing. We received some valuable insights and
>> suggestions from the Apache Flink community, and now, we would like to seek
>> your expertise and guidance on the same problem.
>>
>> In our IoT ecosystem, we are dealing with data streams from thousands of
>> devices, each uniquely identified. To maintain data integrity and ordering,
>> we have configured a Kafka topic with ten partitions, ensuring that each
>> device's data is directed to its respective partition based on its unique
>> identifier. While this architectural choice has been effective in
>> maintaining data order, it has unveiled a significant challenge:
>>
>> *Slow Consumer and Data Skew Problem:* When a single device experiences
>> processing delays, it acts as a bottleneck within the Kafka partition,
>> leading to delays in processing data from other devices sharing the same
>> partition. This issue severely affects the efficiency and scalability of
>> our entire data processing pipeline.
>>
>> Here are some key details:
>>
>> - Number of Devices: 1000 (with potential growth)
>> - Target Message Rate: 1000 messages per second (with expected growth)
>> - Kafka Partitions: 10 (some partitions are overloaded)
>> - We are planning to migrate from Apache Storm to Apache Flink/Spark.
>>
>> We are actively seeking guidance on the following aspects:
>>
>> *1. Independent Device Data Processing*: We require a strategy that
>> guarantees one device's processing speed does not affect other devices in
>> the same Kafka partition. In other words, we need a solution that ensures
>> the independent processing of each device's data.
>>
>> *2. Custom Partitioning Strategy:* We are looking for a custom
>> partitioning strategy to distribute the load evenly across Kafka
>> partitions. Currently, we are using Murmur hashing with the device's unique
>> identifier, but we are open to exploring alternative partitioning
>> strategies.
>>
>> *3. Determining Kafka Partition Count:* We seek guidance on how to
>> determine the optimal number of Kafka partitions to handle the target
>> message rate efficiently.
>>
>> *4. Handling Data Skew:* Strategies or techniques for handling data skew
>> within Apache Flink.
>>
>> We believe that many in your community may have faced similar challenges
>> or possess valuable insights into addressing them. Your expertise and
>> experiences can greatly benefit our team and the broader community dealing
>> with real-time data processing.
>>
>> If you have any knowledge, solutions, or references to open-source
>> projects, libraries, or community-contributed solutions that align with our
>> requirements, we would be immensely grateful for your input.
>>
>> We appreciate your prompt attention to this matter and eagerly await your
>> responses and insights. Your support will be invaluable in helping us
>> overcome this critical challenge.
>>
>> Thank you for your time and consideration.
>>
>> Thanks & regards,
>> Karthick.
>>
>


Re: Parallel write to different partitions

2023-09-21 Thread Shrikant Prasad
Found this issue reported earlier but was bulk closed:
https://issues.apache.org/jira/browse/SPARK-27030

Regards,
Shrikant

On Fri, 22 Sep 2023 at 12:03 AM, Shrikant Prasad 
wrote:

> Hi all,
>
> We have multiple spark jobs running in parallel trying to write into same
> hive table but each job writing into different partition. This was working
> fine with Spark 2.3 and Hadoop 2.7.
>
> But after upgrading to Spark 3.2 and Hadoop 3.2.2, these parallel jobs are
> failing with FileNotFound exceptions for files under
> /warehouse/db/table/temporary/0/ directory.
>
> It seems earlier the temporary dir was created under the partition being
> written but now its created directly under the table directory which is
> causing concurrency issues with multiple jobs trying to cleanup the same
> temporary directory.
>
> Is there a way now to achieve parallel writes to different partitions of
> same table? Also any insight into what caused the change in behavior of
> temporary dir creation will be helpful.
>
> Thanks and regards,
> Shrikant
>


Re: Need to split incoming data into PM on time column and find the top 5 by volume of data

2023-09-21 Thread Mich Talebzadeh
In general you can probably do all this in spark-sql by reading in Hive
table through a DF in Pyspark, then creating a TempView on that DF, select
PM data through CAST() function and then use a windowing function to select
the top 5 with DENSE_RANK()

#Read  Hive table as a DataFrame
df = spark.read.table("hive.sample_data")
#  Create a view on the DataFrame
df.createOrReplaceTempView("tmp")
 sqltext = f"""
   SELECT incoming_ip, total_volume
FROM ( SELECT incoming_ip, SUM(volume) AS total_volume, DENSE_RANK() OVER (
ORDER BY SUM(volume) DESC) AS rank FROM tmp WHERE CAST(time_in AS TIME)
BETWEEN '12:00:00' AND '23:59:59' GROUP BY incoming_ip ) ranked_ips WHERE
rank <= 5;
"""
spark.sql(sqltext).show(5,False)

HTH

Mich Talebzadeh,
Distinguished Technologist, Solutions Architect & Engineer
London
United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Thu, 21 Sept 2023 at 18:03, ashok34...@yahoo.com.INVALID
 wrote:

> Hello gurus,
>
> I have a Hive table created as below (there are more columns)
>
> CREATE TABLE hive.sample_data ( incoming_ip STRING, time_in TIMESTAMP,
> volume INT );
>
> Data is stored in that table
>
> In PySpark, I want to  select the top 5 incoming IP addresses with the
> highest total volume of data transferred during the PM hours. PM hours are
> decided by the column time_in with values like '00:45:00', '11:35:00',
> '18:25:00'
>
> Any advice is appreciated.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: PySpark 3.5.0 on PyPI

2023-09-20 Thread Kezhi Xiong
Oh, I saw it now. Thanks!

On Wed, Sep 20, 2023 at 1:04 PM Sean Owen  wrote:

> [ External sender. Exercise caution. ]
>
> I think the announcement mentioned there were some issues with pypi and
> the upload size this time. I am sure it's intended to be there when
> possible.
>
> On Wed, Sep 20, 2023, 3:00 PM Kezhi Xiong 
> wrote:
>
>> Hi,
>>
>> Are there any plans to upload PySpark 3.5.0 to PyPI (
>> https://pypi.org/project/pyspark/)? It's still 3.4.1.
>>
>> Thanks,
>> Kezhi
>>
>>
>>


Re: PySpark 3.5.0 on PyPI

2023-09-20 Thread Sean Owen
I think the announcement mentioned there were some issues with pypi and the
upload size this time. I am sure it's intended to be there when possible.

On Wed, Sep 20, 2023, 3:00 PM Kezhi Xiong  wrote:

> Hi,
>
> Are there any plans to upload PySpark 3.5.0 to PyPI (
> https://pypi.org/project/pyspark/)? It's still 3.4.1.
>
> Thanks,
> Kezhi
>
>
>


Re: Discriptency sample standard deviation pyspark and Excel

2023-09-20 Thread Sean Owen
This has turned into a big thread for a simple thing and has been answered
3 times over now.

Neither is better, they just calculate different things. That the 'default'
is sample stddev is just convention.
stddev_pop is the simple standard deviation of a set of numbers
stddev_samp is used when the set of numbers is a sample from a notional
larger population, and you estimate the stddev of the population from the
sample.

They only differ in the denominator. Neither is more efficient at all or
more/less sensitive to outliers.

On Wed, Sep 20, 2023 at 3:06 AM Mich Talebzadeh 
wrote:

> Spark uses the sample standard deviation stddev_samp by default, whereas
> *Hive* uses population standard deviation stddev_pop as default.
>
> My understanding is that spark uses sample standard deviation by default
> because
>
>- It is more commonly used.
>- It is more efficient to calculate.
>- It is less sensitive to outliers. (data points that differ
>significantly from other observations in a dataset. They can be caused by a
>variety of factors, such as measurement errors or edge events.)
>
> The sample standard deviation is less sensitive to outliers because it
> divides by N-1 instead of N. This means that a single outlier will have a
> smaller impact on the sample standard deviation than it would on the
> population standard deviation.
>
> HTH
>
> Mich Talebzadeh,
> Distinguished Technologist, Solutions Architect & Engineer
> London
> United Kingdom
>
>
>view my Linkedin profile
> 
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Tue, 19 Sept 2023 at 21:50, Sean Owen  wrote:
>
>> Pyspark follows SQL databases here. stddev is stddev_samp, and sample
>> standard deviation is the calculation with the Bessel correction, n-1 in
>> the denominator. stddev_pop is simply standard deviation, with n in the
>> denominator.
>>
>> On Tue, Sep 19, 2023 at 7:13 AM Helene Bøe 
>> wrote:
>>
>>> Hi!
>>>
>>>
>>>
>>> I am applying the stddev function (so actually stddev_samp), however
>>> when comparing with the sample standard deviation in Excel the resuls do
>>> not match.
>>>
>>> I cannot find in your documentation any more specifics on how the sample
>>> standard deviation is calculated, so I cannot compare the difference toward
>>> excel, which uses
>>>
>>> .
>>>
>>> I am trying to avoid using Excel at all costs, but if the stddev_samp
>>> function is not calculating the standard deviation correctly I have a
>>> problem.
>>>
>>> I hope you can help me resolve this issue.
>>>
>>>
>>>
>>> Kindest regards,
>>>
>>>
>>>
>>> *Helene Bøe*
>>> *Graduate Project Engineer*
>>> Recycling Process & Support
>>>
>>> M: +47 980 00 887
>>> helene.b...@hydro.com
>>> 
>>>
>>> Norsk Hydro ASA
>>> Drammensveien 264
>>> NO-0283 Oslo, Norway
>>> www.hydro.com
>>> 
>>>
>>>
>>> NOTICE: This e-mail transmission, and any documents, files or previous
>>> e-mail messages attached to it, may contain confidential or privileged
>>> information. If you are not the intended recipient, or a person responsible
>>> for delivering it to the intended recipient, you are hereby notified that
>>> any disclosure, copying, distribution or use of any of the information
>>> contained in or attached to this message is STRICTLY PROHIBITED. If you
>>> have received this transmission in error, please immediately notify the
>>> sender and delete the e-mail and attached documents. Thank you.
>>>
>>


Re: Urgent: Seeking Guidance on Kafka Slow Consumer and Data Skew Problem

2023-09-20 Thread Gowtham S
Hi Spark Community,

Thank you for bringing up this issue. We've also encountered the same
challenge and are actively working on finding a solution. It's reassuring
to know that we're not alone in this.

If you have any insights or suggestions regarding how to address this
problem, please feel free to share them.

Looking forward to hearing from others who might have encountered similar
issues.


Thanks and regards,
Gowtham S


On Tue, 19 Sept 2023 at 17:23, Karthick  wrote:

> Subject: Seeking Guidance on Kafka Slow Consumer and Data Skew Problem
>
> Dear Spark Community,
>
> I recently reached out to the Apache Flink community for assistance with a
> critical issue we are facing in our IoT platform, which relies on Apache
> Kafka and real-time data processing. We received some valuable insights and
> suggestions from the Apache Flink community, and now, we would like to seek
> your expertise and guidance on the same problem.
>
> In our IoT ecosystem, we are dealing with data streams from thousands of
> devices, each uniquely identified. To maintain data integrity and ordering,
> we have configured a Kafka topic with ten partitions, ensuring that each
> device's data is directed to its respective partition based on its unique
> identifier. While this architectural choice has been effective in
> maintaining data order, it has unveiled a significant challenge:
>
> *Slow Consumer and Data Skew Problem:* When a single device experiences
> processing delays, it acts as a bottleneck within the Kafka partition,
> leading to delays in processing data from other devices sharing the same
> partition. This issue severely affects the efficiency and scalability of
> our entire data processing pipeline.
>
> Here are some key details:
>
> - Number of Devices: 1000 (with potential growth)
> - Target Message Rate: 1000 messages per second (with expected growth)
> - Kafka Partitions: 10 (some partitions are overloaded)
> - We are planning to migrate from Apache Storm to Apache Flink/Spark.
>
> We are actively seeking guidance on the following aspects:
>
> *1. Independent Device Data Processing*: We require a strategy that
> guarantees one device's processing speed does not affect other devices in
> the same Kafka partition. In other words, we need a solution that ensures
> the independent processing of each device's data.
>
> *2. Custom Partitioning Strategy:* We are looking for a custom
> partitioning strategy to distribute the load evenly across Kafka
> partitions. Currently, we are using Murmur hashing with the device's unique
> identifier, but we are open to exploring alternative partitioning
> strategies.
>
> *3. Determining Kafka Partition Count:* We seek guidance on how to
> determine the optimal number of Kafka partitions to handle the target
> message rate efficiently.
>
> *4. Handling Data Skew:* Strategies or techniques for handling data skew
> within Apache Flink.
>
> We believe that many in your community may have faced similar challenges
> or possess valuable insights into addressing them. Your expertise and
> experiences can greatly benefit our team and the broader community dealing
> with real-time data processing.
>
> If you have any knowledge, solutions, or references to open-source
> projects, libraries, or community-contributed solutions that align with our
> requirements, we would be immensely grateful for your input.
>
> We appreciate your prompt attention to this matter and eagerly await your
> responses and insights. Your support will be invaluable in helping us
> overcome this critical challenge.
>
> Thank you for your time and consideration.
>
> Thanks & regards,
> Karthick.
>


Re: Discriptency sample standard deviation pyspark and Excel

2023-09-20 Thread Mich Talebzadeh
Spark uses the sample standard deviation stddev_samp by default, whereas
*Hive* uses population standard deviation stddev_pop as default.

My understanding is that spark uses sample standard deviation by default
because

   - It is more commonly used.
   - It is more efficient to calculate.
   - It is less sensitive to outliers. (data points that differ
   significantly from other observations in a dataset. They can be caused by a
   variety of factors, such as measurement errors or edge events.)

The sample standard deviation is less sensitive to outliers because it
divides by N-1 instead of N. This means that a single outlier will have a
smaller impact on the sample standard deviation than it would on the
population standard deviation.

HTH

Mich Talebzadeh,
Distinguished Technologist, Solutions Architect & Engineer
London
United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Tue, 19 Sept 2023 at 21:50, Sean Owen  wrote:

> Pyspark follows SQL databases here. stddev is stddev_samp, and sample
> standard deviation is the calculation with the Bessel correction, n-1 in
> the denominator. stddev_pop is simply standard deviation, with n in the
> denominator.
>
> On Tue, Sep 19, 2023 at 7:13 AM Helene Bøe 
> wrote:
>
>> Hi!
>>
>>
>>
>> I am applying the stddev function (so actually stddev_samp), however when
>> comparing with the sample standard deviation in Excel the resuls do not
>> match.
>>
>> I cannot find in your documentation any more specifics on how the sample
>> standard deviation is calculated, so I cannot compare the difference toward
>> excel, which uses
>>
>> .
>>
>> I am trying to avoid using Excel at all costs, but if the stddev_samp
>> function is not calculating the standard deviation correctly I have a
>> problem.
>>
>> I hope you can help me resolve this issue.
>>
>>
>>
>> Kindest regards,
>>
>>
>>
>> *Helene Bøe*
>> *Graduate Project Engineer*
>> Recycling Process & Support
>>
>> M: +47 980 00 887
>> helene.b...@hydro.com
>> 
>>
>> Norsk Hydro ASA
>> Drammensveien 264
>> NO-0283 Oslo, Norway
>> www.hydro.com
>> 
>>
>>
>> NOTICE: This e-mail transmission, and any documents, files or previous
>> e-mail messages attached to it, may contain confidential or privileged
>> information. If you are not the intended recipient, or a person responsible
>> for delivering it to the intended recipient, you are hereby notified that
>> any disclosure, copying, distribution or use of any of the information
>> contained in or attached to this message is STRICTLY PROHIBITED. If you
>> have received this transmission in error, please immediately notify the
>> sender and delete the e-mail and attached documents. Thank you.
>>
>


Re: Discriptency sample standard deviation pyspark and Excel

2023-09-19 Thread Mich Talebzadeh
Hi Helen,

Assuming you want to calculate stddev_samp,  Spark correctly points  STDDEV
to STDDEV_SAMP.

In below replace sales with your table name and AMOUNT_SOLD with the column
you want to do the calculation

SELECT

SQRT((SUM(POWER(AMOUNT_SOLD,2))-(COUNT(1)*POWER(AVG(AMOUNT_SOLD),2)))/(COUNT(1)-1))
AS MYSTDDEV,
STDDEV(amount_sold) AS STDDEV,
STDDEV_SAMP(amount_sold) AS STDDEV_SAMP,
STDDEV_POP(amount_sold) AS STDDEV_POP
fromsales;

for me it returned

++-++-+--+
|  mystddev  |   stddev|stddev_samp |
stddev_pop  |
++-++-+--+
| 260.7270919450411  | 260.7270722861637   | 260.7270722861637  |
260.72704617042166  |
++-++-+--+

HTH

Mich Talebzadeh,
Distinguished Technologist, Solutions Architect & Engineer
London
United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Tue, 19 Sept 2023 at 13:14, Helene Bøe 
wrote:

> Hi!
>
>
>
> I am applying the stddev function (so actually stddev_samp), however when
> comparing with the sample standard deviation in Excel the resuls do not
> match.
>
> I cannot find in your documentation any more specifics on how the sample
> standard deviation is calculated, so I cannot compare the difference toward
> excel, which uses
>
> .
>
> I am trying to avoid using Excel at all costs, but if the stddev_samp
> function is not calculating the standard deviation correctly I have a
> problem.
>
> I hope you can help me resolve this issue.
>
>
>
> Kindest regards,
>
>
>
> *Helene Bøe*
> *Graduate Project Engineer*
> Recycling Process & Support
>
> M: +47 980 00 887
> helene.b...@hydro.com
> 
>
> Norsk Hydro ASA
> Drammensveien 264
> NO-0283 Oslo, Norway
> www.hydro.com
> 
>
>
> NOTICE: This e-mail transmission, and any documents, files or previous
> e-mail messages attached to it, may contain confidential or privileged
> information. If you are not the intended recipient, or a person responsible
> for delivering it to the intended recipient, you are hereby notified that
> any disclosure, copying, distribution or use of any of the information
> contained in or attached to this message is STRICTLY PROHIBITED. If you
> have received this transmission in error, please immediately notify the
> sender and delete the e-mail and attached documents. Thank you.
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

<    1   2   3   4   5   6   7   8   9   10   >