help needed with SPARK-45598 and SPARK-45769

2023-11-09 Thread Maksym M
Greetings,

tl;dr there must have been a regression in spark *connect*'s ability to 
retrieve data, more details in linked issues

https://issues.apache.org/jira/browse/SPARK-45598
https://issues.apache.org/jira/browse/SPARK-45769

we have projects that depend on spark connect 3.5 and we'd appreciate any 
suggestions on what could be wrong and how to resolve it.

happy to contribute!

best regards,
maksym

-- 

Confidentiality note: This e-mail may contain confidential information 
from Nu Holdings Ltd and/or its affiliates. If you have received it by 
mistake, please let us know by e-mail reply and delete it from your system; 
you may not copy this message or disclose its contents to anyone; for 
details about what personal information we collect and why, please refer to 
our privacy policy 
.


Storage Partition Joins only works for buckets?

2023-11-08 Thread Arwin Tio
Hey team,

I was reading through the Storage Partition Join SPIP 
(https://docs.google.com/document/d/1foTkDSM91VxKgkEcBMsuAvEjNybjja-uHk-r3vtXWFE/edit#heading=h.82w8qxfl2uwl)
 but it seems like it only supports buckets, not partitions. Is that true? And 
if so does anybody have an intuition for why - is it simply a bad idea?

Thanks,

Arwin



Re: Unsubscribe

2023-11-08 Thread Xin Zhang
Unsubscribe


--
Email:josseph.zh...@gmail.com


Unsubscribe

2023-11-07 Thread Kiran Kumar Dusi
Unsubscribe


unsubscribe

2023-11-07 Thread Kalhara Gurugamage
unsubscribeSent from my phone
-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org


unsubscribe

2023-11-07 Thread Suraj Choubey
unsubscribe


Re: [ SPARK SQL ]: UPPER in WHERE condition is not working in Apache Spark 3.5.0 for Mysql ENUM Column

2023-11-07 Thread Suyash Ajmera
Any update on this?


On Fri, 13 Oct, 2023, 12:56 pm Suyash Ajmera, 
wrote:

> This issue is related to CharVarcharCodegenUtils readSidePadding method .
>
> Appending white spaces while reading ENUM data from mysql
>
> Causing issue in querying , writing the same data to Cassandra.
>
> On Thu, 12 Oct, 2023, 7:46 pm Suyash Ajmera, 
> wrote:
>
>> I have upgraded my spark job from spark 3.3.1 to spark 3.5.0, I am
>> querying to Mysql Database and applying
>>
>> `*UPPER(col) = UPPER(value)*` in the subsequent sql query. It is working
>> as expected in spark 3.3.1 , but not working with 3.5.0.
>>
>> Where Condition ::  `*UPPER(vn) = 'ERICSSON' AND (upper(st) = 'OPEN' OR
>> upper(st) = 'REOPEN' OR upper(st) = 'CLOSED')*`
>>
>> The *st *column is ENUM in the database and it is causing the issue.
>>
>> Below is the Physical Plan of *FILTER* phase :
>>
>> For 3.3.1 :
>>
>> +- Filter ((upper(vn#11) = ERICSSON) AND (((upper(st#42) = OPEN) OR
>> (upper(st#42) = REOPEN)) OR (upper(st#42) = CLOSED)))
>>
>> For 3.5.0 :
>>
>> +- Filter ((upper(vn#11) = ERICSSON) AND (((upper(staticinvoke(class
>> org.apache.spark.sql.catalyst.util.CharVarcharCodegenUtils, StringType,
>> readSidePadding, st#42, 13, true, false, true)) = OPEN) OR
>> (upper(staticinvoke(class
>> org.apache.spark.sql.catalyst.util.CharVarcharCodegenUtils, StringType,
>> readSidePadding, st#42, 13, true, false, true)) = REOPEN)) OR
>> (upper(staticinvoke(class
>> org.apache.spark.sql.catalyst.util.CharVarcharCodegenUtils, StringType,
>> readSidePadding, st#42, 13, true, false, true)) = CLOSED)))
>>
>> -
>>
>> I have debug it and found that Spark added a property in version 3.4.0 ,
>> i.e. **spark.sql.readSideCharPadding** which has default value **true**.
>>
>> Link to the JIRA : https://issues.apache.org/jira/browse/SPARK-40697
>>
>> Added a new method in Class **CharVarcharCodegenUtils**
>>
>> public static UTF8String readSidePadding(UTF8String inputStr, int limit) {
>> int numChars = inputStr.numChars();
>> if (numChars == limit) {
>>   return inputStr;
>> } else if (numChars < limit) {
>>   return inputStr.rpad(limit, SPACE);
>> } else {
>>   return inputStr;
>> }
>>   }
>>
>>
>> **This method is appending some whitespace padding to the ENUM values
>> while reading and causing the Issue.**
>>
>> ---
>>
>> When I am removing the UPPER function from the where condition the
>> **FILTER** Phase looks like this :
>>
>>  +- Filter (((staticinvoke(class
>> org.apache.spark.sql.catalyst.util.CharVarcharCodegenUtils,
>>  StringType, readSidePadding, st#42, 13, true, false, true) = OPEN
>> ) OR (staticinvoke(class
>> org.apache.spark.sql.catalyst.util.CharVarcharCodegenUtils, StringType,
>> readSidePadding, st#42, 13, true, false, true) = REOPEN   )) OR
>> (staticinvoke(class
>> org.apache.spark.sql.catalyst.util.CharVarcharCodegenUtils, StringType,
>> readSidePadding, st#42, 13, true, false, true) = CLOSED   ))
>>
>>
>> **You can see it has added some white space after the value and the query
>> runs fine giving the correct result.**
>>
>> But with the UPPER function I am not getting the data.
>>
>> --
>>
>> I have also tried to disable this Property *spark.sql.readSideCharPadding
>> = false* with following cases :
>>
>> 1. With Upper function in where clause :
>>It is not pushing the filters to Database and the *query works fine*.
>>
>>
>>   +- Filter (((upper(st#42) = OPEN) OR (upper(st#42) = REOPEN)) OR
>> (upper(st#42) = CLOSED))
>>
>> 2. But when I am removing the upper function
>>
>>  *It is pushing the filter to Mysql with the white spaces and I am not
>> getting the data. (THIS IS A CAUSING VERY BIG ISSUE)*
>>
>>   PushedFilters: [*IsNotNull(vn), *EqualTo(vn,ERICSSON),
>> *Or(Or(EqualTo(st,OPEN ),EqualTo(st,REOPEN
>> )),EqualTo(st,CLOSED   ))]
>>
>> I cannot move this filter to JDBC read query , also I can't remove this
>> UPPER function in the where clause.
>>
>>
>> 
>>
>> Also I found same data getting written to CASSANDRA with *PADDING .*
>>
>


org.apache.ranger.authorization.hive.authorizer.RangerHiveAuthorizerFactory ClassNotFoundException

2023-11-07 Thread Yi Zheng
Hi,


The problem I’ve encountered is: after “spark-shell” command, when I first 
enter “spark.sql("select * from test.test_3 ").show(false)” command, it throws 
“ERROR session.SessionState: Error setting up authorization: 
java.lang.ClassNotFoundException: 
org.apache.ranger.authorization.hive.authorizer.RangerHiveAuthorizerFactory

org.apache.hadoop.hive.ql.metadata.HiveException: 
java.lang.ClassNotFoundException: 
org.apache.ranger.authorization.hive.authorizer.RangerHiveAuthorizerFactory”. 
when I enter “spark.sql("select * from test.test_3 ").show(false)” command for 
the second time, no error is thrown, and correct result is given back. In 
summary, whenever a new spark session is established, the first spark sql 
command always throws the error, while the following spark sql command will 
not. I have a feeling of some configuration is not set correctly but couldn’t 
figure out what the reason might be of this problem.


Below is some background information. Please let me know if additional 
information is needed. Thank you.

Modules and version:

  *   CDH:6.3.2
  *   Zookeeper:
  *   HDFS:
  *   Spark:2.4.0
  *   Yarn:
  *   Hive:2.1.1
  *   Ranger:2.1.0

Complete error message:

[root@poc6-node1 conf]# spark-shell

Setting default log level to "WARN".

To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use 
setLogLevel(newLevel).

23/11/07 11:16:41 WARN cluster.YarnSchedulerBackend$YarnSchedulerEndpoint: 
Attempted to request executors before the AM has registered!

23/11/07 11:16:41 WARN lineage.LineageWriter: Lineage directory 
/var/log/spark/lineage doesn't exist or is not writable. Lineage for this 
application will be disabled.

Spark context Web UI available at

Spark context available as 'sc' (master = yarn, app id =).

Spark session available as 'spark'.

Welcome to

    __

 / __/__  ___ _/ /__

_\ \/ _ \/ _ `/ __/  '_/

   /___/ .__/\_,_/_/ /_/\_\   version 2.4.0-cdh6.3.2

  /_/



Using Scala version 2.11.12 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_181)

Type in expressions to have them evaluated.

Type :help for more information.



scala> spark.sql("select * from test.test_3 ").show(false)

23/11/07 11:17:30 WARN lineage.LineageWriter: Lineage directory 
/var/log/spark/lineage doesn't exist or is not writable. Lineage for this 
application will be disabled.

23/11/07 11:17:35 ERROR session.SessionState: Error setting up authorization: 
java.lang.ClassNotFoundException: 
org.apache.ranger.authorization.hive.authorizer.RangerHiveAuthorizerFactory

org.apache.hadoop.hive.ql.metadata.HiveException: 
java.lang.ClassNotFoundException: 
org.apache.ranger.authorization.hive.authorizer.RangerHiveAuthorizerFactory

at 
org.apache.hadoop.hive.ql.metadata.HiveUtils.getAuthorizeProviderManager(HiveUtils.java:385)

at 
org.apache.hadoop.hive.ql.session.SessionState.setupAuth(SessionState.java:873)

at 
org.apache.hadoop.hive.ql.session.SessionState.getAuthenticator(SessionState.java:1559)

at 
org.apache.hadoop.hive.ql.session.SessionState.getUserFromAuthenticator(SessionState.java:1239)

at 
org.apache.hadoop.hive.ql.metadata.Table.getEmptyTable(Table.java:181)

at org.apache.hadoop.hive.ql.metadata.Table.(Table.java:123)

at 
org.apache.spark.sql.hive.client.HiveClientImpl$.toHiveTable(HiveClientImpl.scala:927)

at 
org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$getPartitions$1.apply(HiveClientImpl.scala:670)

at 
org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$getPartitions$1.apply(HiveClientImpl.scala:669)

at 
org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$withHiveState$1.apply(HiveClientImpl.scala:283)

at 
org.apache.spark.sql.hive.client.HiveClientImpl.liftedTree1$1(HiveClientImpl.scala:221)

at 
org.apache.spark.sql.hive.client.HiveClientImpl.retryLocked(HiveClientImpl.scala:220)

at 
org.apache.spark.sql.hive.client.HiveClientImpl.withHiveState(HiveClientImpl.scala:266)

at 
org.apache.spark.sql.hive.client.HiveClientImpl.getPartitions(HiveClientImpl.scala:669)

at 
org.apache.spark.sql.hive.client.HiveClient$class.getPartitions(HiveClient.scala:210)

at 
org.apache.spark.sql.hive.client.HiveClientImpl.getPartitions(HiveClientImpl.scala:84)

at 
org.apache.spark.sql.hive.HiveExternalCatalog$$anonfun$listPartitions$1.apply(HiveExternalCatalog.scala:1232)

at 
org.apache.spark.sql.hive.HiveExternalCatalog$$anonfun$listPartitions$1.apply(HiveExternalCatalog.scala:1230)

at 
org.apache.spark.sql.hive.HiveExternalCatalog.withClient(HiveExternalCatalog.scala:99)

at 
org.apache.spark.sql.hive.HiveExternalCatalog.listPartitions(HiveExternalCatalog.scala:1230)

at 
org.apache.spark.sql.catalyst.catalog.ExternalCatalogWithListener.listPartitions(ExternalCatalogWithListener.scala:254)

at 

Re: Spark master shuts down when one of zookeeper dies

2023-11-07 Thread Mich Talebzadeh
Hi,

Spark standalone mode does not use or rely on ZooKeeper by default. The
Spark master and workers communicate directly with each other without using
ZooKeeper. However, it appears that in your case you are relying on
ZooKeeper to provide high availability for your standalone cluster. By
configuring Spark to use ZooKeeper for leader election, you can ensure that
there is always a Spark master running, even if one of the ZooKeeper
servers goes down.

To use ZooKeeper for high availability in Spark standalone mode, you need
to configure the following properties:

spark.deploy.recoveryMode: Set to ZOOKEEPER to enable high availability
spark.deploy.zookeeper.url: The ZooKeeper cluster URL

Now the Spark master shuts down when a Zookeeper instance is down because
it loses its leadership. Zookeeper uses a leader election algorithm to
ensure that there is always a single leader in the cluster. When a
Zookeeper instance goes down, the remaining Zookeeper instances will elect
a new leader.

The original master that was down never comes up because it has lost its
state. The Spark master stores its state in Zookeeper. When the Zookeeper
instance that the master was connected to goes down, the master loses its
state. This means that the master cannot restart without losing data.

To avoid this problem, you can run multiple Spark masters in high
availability mode. This means that you will have at least two Spark masters
running at all times. When a Zookeeper instance goes down, the remaining
Spark masters will continue to run and serve applications. As stated, to
run Spark masters in high availability mode, you will need to configure the
spark.deploy.recoveryMode property to ZOOKEEPER. You will also need to
configure the spark.deploy.zookeeper.url property to point to your
Zookeeper cluster.

HTH,

Mich Talebzadeh,
Distinguished Technologist, Solutions Architect & Engineer
London
United Kingdom

Mich Talebzadeh (Ph.D.) | LinkedIn


https://en.everybodywiki.com/Mich_Talebzadeh



Disclaimer: Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




Mich Talebzadeh,
Distinguished Technologist, Solutions Architect & Engineer
London
United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Mon, 6 Nov 2023 at 15:19, Kaustubh Ghode  wrote:

> I am using spark-3.4.1 I have a setup with three ZooKeeper servers, Spark
> master shuts down when a Zookeeper instance is down a new master is elected
> as leader and the cluster is up. But the original master that was down
> never comes up. can you please help me with this issue?
>
> Stackoverflow link:- https://stackoverflow.com/questions/77431515
>
> Thanks,
> Kaustubh
>


unsubscribe

2023-11-07 Thread Kelvin Qin
unsubscribe

[ANNOUNCE] Apache Kyuubi released 1.8.0

2023-11-06 Thread Cheng Pan
Hi all,

The Apache Kyuubi community is pleased to announce that
Apache Kyuubi 1.8.0 has been released!

Apache Kyuubi is a distributed and multi-tenant gateway to provide
serverless SQL on data warehouses and lakehouses.

Kyuubi provides a pure SQL gateway through Thrift JDBC/ODBC interface
for end-users to manipulate large-scale data with pre-programmed and
extensible Spark SQL engines.

We are aiming to make Kyuubi an "out-of-the-box" tool for data warehouses
and lakehouses.

This "out-of-the-box" model minimizes the barriers and costs for end-users
to use Spark, Flink, and other computing engines at the client side.

At the server-side, Kyuubi server and engine's multi-tenant architecture
provides the administrators a way to achieve computing resource isolation,
data security, high availability, high client concurrency, etc.

The full release notes and download links are available at:
Release Notes: https://kyuubi.apache.org/release/1.8.0.html

To learn more about Apache Kyuubi, please see
https://kyuubi.apache.org/

Kyuubi Resources:
- Issue: https://github.com/apache/kyuubi/issues
- Mailing list: d...@kyuubi.apache.org

We would like to thank all contributors of the Kyuubi community
who made this release possible!

Thanks,
On behalf of Apache Kyuubi community

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



unsubscribe

2023-11-06 Thread Stefan Hagedorn



Spark master shuts down when one of zookeeper dies

2023-11-06 Thread Kaustubh Ghode
I am using spark-3.4.1 I have a setup with three ZooKeeper servers, Spark
master shuts down when a Zookeeper instance is down a new master is elected
as leader and the cluster is up. But the original master that was down
never comes up. can you please help me with this issue?

Stackoverflow link:- https://stackoverflow.com/questions/77431515

Thanks,
Kaustubh


How to configure authentication from a pySpark client to a Spark Connect server ?

2023-11-05 Thread Xiaolong Wang
Hi,

Our company is currently introducing the Spark Connect server to
production.

Most of the issues have been solved yet I don't know how to configure
authentication from a pySpark client to the Spark Connect server.

I noticed that there is some interceptor configs at the Scala client side,
where users can call the following codes:

val spark = SparkSession.builder().remote(host).interceptor(...)
>
> to configure a client side interceptor and at the server side there is a
config called
spark.connect.grpc.interceptor.classes
I'm wondering if there is any way to pass some authentication information
at the pySpark side. If not, is there any on-road plans to support this ?


[Spark SQL] [Bug] Adding `checkpoint()` causes "column [...] cannot be resolved" error

2023-11-05 Thread Robin Zimmerman
Hi all,

Wondering if anyone has run into this as I can't find any similar issues in
JIRA, mailing list archives, Stack Overflow, etc. I had a query that was
running successfully, but the query planning time was extremely long (4+
hours). To fix this I added `checkpoint()` calls earlier in the code to
truncate the query plan. This worked to improve the performance, but now I
am getting the error "A column or function parameter with name
`B`.`JOIN_KEY` cannot be resolved." Nothing else in the query changed
besides the `checkpoint()` calls. The only thing I can surmise is that this
is related to a very complex nested query plan where the same table is used
multiple times upstream. The general flow is something like this:

```py
df = spark.sql("...")
df = df.checkpoint()
df.createOrReplaceTempView("df")

df2 = spark.sql("SELECT  JOIN df ...")
df2.createOrReplaceTempView("df2")

# Error happens here: A column or function parameter with name
`a`.`join_key` cannot be resolved. Did you mean one of the following?
[`b`.`join_key`, `a`.`col1`, `b`.`col2`]
spark.sql(""'
SELECT *
FROM  (
SELECT
a.join_key,
a.col1,
b.col2
FROM df2 b
LEFT JOIN df a ON b.join_key = a.join_key
)
""")
```

In the actual code df and df2 are very complex multi-level nested views
built upon other views. If I checkpoint all of the dataframes in the query
right before I run it the error goes away. Unfortunately I have not been
able to put together a minimal reproducible example.

Any ideas?

Thanks,
Robin


Re: Parser error when running PySpark on Windows connecting to GCS

2023-11-04 Thread Mich Talebzadeh
General

The reason why os.path.join is appending double backslash on Windows is
because that is how Windows paths are represented. However, GCS paths (a
Hadoop Compatible File System  (HCFS) use forward slashes like in Linux.
This can cause problems if you are trying to use a Windows path in a Spark
job, *because Spark assumes that all paths are Linux paths*.

A way to avoid this problem is to use the os.path.normpath function to
normalize the path before passing it to Spark. This will ensure that the
path is in a format that is compatible with Spark.

*In Python*

import os
# example
path = "gs://etcbucket/data-file"
normalized_path = os.path.normpath(path)
# Pass the normalized path to Spark

*In Scala*


import java.io.File val path = "gs://etcbucket/data-file" val
normalizedPath = new File(path).getCanonicalPath() // Pass the normalized
path to Spark

HTH

Mich Talebzadeh,
Distinguished Technologist, Solutions Architect & Engineer
London
United Kingdom



 view my Linkedin profile


 https://en.everybodywiki.com/Mich_Talebzadeh


Disclaimer: Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.



Mich Talebzadeh,
Distinguished Technologist, Solutions Architect & Engineer
London
United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Sat, 4 Nov 2023 at 12:28, Richard Smith
 wrote:

> Hi All,
>
> I've just encountered and worked around a problem that is pretty obscure
> and unlikely to affect many people, but I thought I'd better report it
> anyway
>
> All the data I'm using is inside Google Cloud Storage buckets (path starts
> with gs://) and I'm running Spark 3.5.0 locally (for testing, real thing is
> on serverless Dataproc) on a Windows 10 laptop. The job fails when reading
> metadata via the machine learning scripts.
>
> The error is *org.apache.hadoop.shaded.com.google.rej2.PatternSyntaxException:
> error parsing regexp: invalid escape sequence: '\m'*
>
> I tracked it down to *site-packages/pyspark/ml/util.py* line 578
>
> metadataPath = os.path.join(path,"metadata")
>
> which seems innocuous but what's happening is because I'm on Windows,
> os.path.join is appending double backslash, whilst the gcs path uses
> forward slashes like Linux.
>
> I hacked the code to explicitly use forward slash if path contains gs: and
> the job now runs successfully.
>
> Richard
>


Parser error when running PySpark on Windows connecting to GCS

2023-11-04 Thread Richard Smith

Hi All,

I've just encountered and worked around a problem that is pretty obscure 
and unlikely to affect many people, but I thought I'd better report it 
anyway


All the data I'm using is inside Google Cloud Storage buckets (path 
starts with gs://) and I'm running Spark 3.5.0 locally (for testing, 
real thing is on serverless Dataproc) on a Windows 10 laptop. The job 
fails when reading metadata via the machine learning scripts.


The error is 
/org.apache.hadoop.shaded.com.google.rej2.PatternSyntaxException: error 
parsing regexp: invalid escape sequence: '\m'/


I tracked it down to /site-packages/pyspark/ml/util.py/ line 578

metadataPath = os.path.join(path,"metadata")

which seems innocuous but what's happening is because I'm on Windows, 
os.path.join is appending double backslash, whilst the gcs path uses 
forward slashes like Linux.


I hacked the code to explicitly use forward slash if path contains gs: 
and the job now runs successfully.


Richard


Re: Data analysis issues

2023-11-02 Thread Mich Talebzadeh
Hi,

Your mileage varies so to speak.Whether or not the data you use to analyze
in Spark through RStudio will be seen by Spark's back-end depends on how
you deploy Spark and RStudio. If you are deploying Spark and RStudio on
your own premises or in a private cloud environment, then the data you use
will only be accessible to the roles that have access to your environment.
However, if you are using a managed Spark service such as Google Dataproc
or Amazon EMR etc, then the data you use may be accessible to Spark's
back-end. This is because managed Spark services typically store your data
on their own servers. Try using encryption combined with RBAC (who can
access what), to protect your data privacy. Also beware of security risks
associated with third-party libraries if you are deploying them.

HTH

Mich Talebzadeh,
Distinguished Technologist, Solutions Architect & Engineer
London
United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Thu, 2 Nov 2023 at 22:46, Jauru Lin  wrote:

> Hello all,
>
> I have a question about Apache Spark,
> I would like to ask if I use Rstudio to connect to Spark to analyze data,
> will the data I use be seen by Spark's back-end personnel?
>
> Hope someone can solve my problem.
> Thanks!
>


Re: Spark / Scala conflict

2023-11-02 Thread Harry Jamison
Thanks Alonso,
I think this gives me some ideas.

My code is written in Python, and I use spark-submit to submit it.
I am not sure what code is written in scala.  Maybe the Phoenix driver based on 
the stack trace?
How do I tell which version of scala that was compiled against?

Is there a jar that I need to add to the spark or hbase classpath?




On Thursday, November 2, 2023 at 01:38:21 AM PDT, Aironman DirtDiver 
 wrote: 





The error message Caused by: java.lang.ClassNotFoundException: 
scala.Product$class indicates that the Spark job is trying to load a class that 
is not available in the classpath. This can happen if the Spark job is compiled 
with a different version of Scala than the version of Scala that is used to run 
the job.
You have mentioned that you are using Spark 3.5.0, which is compatible with 
Scala 2.12. However, you have also mentioned that you have tried Scala versions 
2.10, 2.11, 2.12, and 2.13. This suggests that you may have multiple versions 
of Scala installed on your system.
To resolve the issue, you need to make sure that the Spark job is compiled and 
run with the same version of Scala. You can do this by setting the 
SPARK_SCALA_VERSION environment variable to the desired Scala version before 
starting the Spark job.
For example, to compile the Spark job with Scala 2.12, you would run the 
following command:
SPARK_SCALA_VERSION=2.12 sbt compile

To run the Spark job with Scala 2.12, you would run the following command:
SPARK_SCALA_VERSION=2.12 spark-submit spark-job.jar

If you are using Databricks, you can set the Scala version for the Spark 
cluster in the cluster creation settings.
Once you have ensured that the Spark job is compiled and run with the same 
version of Scala, the error should be resolved.
Here are some additional tips for troubleshooting Scala version conflicts:
* Make sure that you are using the correct version of the Spark libraries. 
The Spark libraries must be compiled with the same version of Scala as the 
Spark job.
* If you are using a third-party library, make sure that it is compatible 
with the version of Scala that you are using.
* Check the Spark logs for any ClassNotFoundExceptions. The logs may 
indicate the specific class that is missing from the classpath.
* Use a tool like sbt dependency:tree to view the dependencies of your 
Spark job. This can help you to identify any conflicting dependencies.

El jue, 2 nov 2023 a las 5:39, Harry Jamison 
() escribió:
> I am getting the error below when I try to run a spark job connecting to 
> phoneix.  It seems like I have the incorrect scala version that some part of 
> the code is expecting.
> 
> I am using spark 3.5.0, and I have copied these phoenix jars into the spark 
> lib
> phoenix-server-hbase-2.5-5.1.3.jar  
> phoenix-spark-5.0.0-HBase-2.0.jar
> 
> I have tried scala 2.10, 2.11, 2.12, and 2.13
> I do not see the scala version used in the logs so I am not 100% sure that it 
> is using the version I expect that it should be.
> 
> 
> Here is the exception that I am getting
> 
> 2023-11-01T16:13:00,391 INFO  [Thread-4] handler.ContextHandler: Started 
> o.s.j.s.ServletContextHandler@15cd3b2a{/static/sql,null,AVAILABLE,@Spark}
> Traceback (most recent call last):
>   File "/hadoop/spark/spark-3.5.0-bin-hadoop3/copy_tables.py", line 10, in 
> 
> .option("zkUrl", "namenode:2181").load()
>   File 
> "/hadoop/spark/spark/python/lib/pyspark.zip/pyspark/sql/readwriter.py", line 
> 314, in load
>   File 
> "/hadoop/spark/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", 
> line 1322, in __call__
>   File 
> "/hadoop/spark/spark/python/lib/pyspark.zip/pyspark/errors/exceptions/captured.py",
>  line 179, in deco
>   File 
> "/hadoop/spark/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/protocol.py", line 
> 326, in get_return_value
> py4j.protocol.Py4JJavaError: An error occurred while calling o28.load.
> : java.lang.NoClassDefFoundError: scala/Product$class
> at 
> org.apache.phoenix.spark.PhoenixRelation.(PhoenixRelation.scala:29)
> at 
> org.apache.phoenix.spark.DefaultSource.createRelation(DefaultSource.scala:29)
> at 
> org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:346)
> at 
> org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:229)
> at 
> org.apache.spark.sql.DataFrameReader.$anonfun$load$2(DataFrameReader.scala:211)
> at scala.Option.getOrElse(Option.scala:189)
> at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211)
> at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:172)
> at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
> Method)
> at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at 

RE: jackson-databind version mismatch

2023-11-02 Thread moshik.vitas
Thanks for replying,

 

The issue was import of spring-boot-dependencies on my dependencyManagement pom 
that forced invalid jar version.

Removed this section and got valid spark dependencies.

 

Regards,
Moshik Vitas

 

From: Bjørn Jørgensen  
Sent: Thursday, 2 November 2023 10:40
To: eab...@163.com
Cc: user @spark ; Saar Barhoom ; 
moshik.vi...@veeva.com
Subject: Re: jackson-databind version mismatch

 

[SPARK-43225][BUILD][SQL] Remove jackson-core-asl and jackson-mapper-asl from 
pre-built distribution  

 

tor. 2. nov. 2023 kl. 09:15 skrev Bjørn Jørgensen mailto:bjornjorgen...@gmail.com> >:

In spark 3.5.0 removed  jackson-core-asl and jackson-mapper-asl  those are with 
groupid org.codehaus.jackson. 

 

Those others jackson-* are with groupid com.fasterxml.jackson.core 

 

 

tor. 2. nov. 2023 kl. 01:43 skrev eab...@163.com   
mailto:eab...@163.com> >:

Hi,

Please check the versions of jar files starting with "jackson-". Make sure 
all versions are consistent.  jackson jar list in spark-3.3.0:



2022/06/10  04:3775,714 jackson-annotations-2.13.3.jar

2022/06/10  04:37   374,895 jackson-core-2.13.3.jar

2022/06/10  04:37   232,248 jackson-core-asl-1.9.13.jar

2022/06/10  04:37 1,536,542 jackson-databind-2.13.3.jar

2022/06/10  04:3752,020 jackson-dataformat-yaml-2.13.3.jar

2022/06/10  04:37   121,201 jackson-datatype-jsr310-2.13.3.jar

2022/06/10  04:37   780,664 jackson-mapper-asl-1.9.13.jar

2022/06/10  04:37   458,981 jackson-module-scala_2.12-2.13.3.jar



Spark 3.3.0 uses Jackson version 2.13.3, while Spark 3.5.0 uses Jackson version 
2.15.2. I think you can remove the lower version of Jackson package to keep the 
versions consistent.

eabour

 

From:   moshik.vi...@veeva.com.INVALID

Date: 2023-11-01 15:03

To:   user@spark.apache.org

CC:   'Saar Barhoom'

Subject: jackson-databind version mismatch

Hi Spark team,

 

On upgrading spark version from 3.2.1 to 3.4.1 got the following issue:

java.lang.NoSuchMethodError: 'com.fasterxml.jackson.core.JsonGenerator 
com.fasterxml.jackson.databind.ObjectMapper.createGenerator(java.io.OutputStream,
 com.fasterxml.jackson.core.JsonEncoding)'

at 
org.apache.spark.util.JsonProtocol$.toJsonString(JsonProtocol.scala:75)

at 
org.apache.spark.SparkThrowableHelper$.getMessage(SparkThrowableHelper.scala:74)

at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$7(SQLExecution.scala:127)

at scala.Option.map(Option.scala:230)

at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125)

at 
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:195)

at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:103)

at 
org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)

at 
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)

at org.apache.spark.sql.Dataset.withAction(Dataset.scala:4165)

at org.apache.spark.sql.Dataset.head(Dataset.scala:3161)

at org.apache.spark.sql.Dataset.take(Dataset.scala:3382)

at org.apache.spark.sql.Dataset.takeAsList(Dataset.scala:3405)

at 
com.crossix.safemine.cloud.utils.DebugRDDLogger.showDataset(DebugRDDLogger.java:84)

at 
com.crossix.safemine.cloud.components.statistics.spark.StatisticsTransformer.getFillRateCountsWithSparkQuery(StatisticsTransformer.java:122)

at 
com.crossix.safemine.cloud.components.statistics.spark.StatisticsTransformer.calculateStatistics(StatisticsTransformer.java:61)

at 
com.crossix.safemine.cloud.components.statistics.spark.SparkFileStatistics.execute(SparkFileStatistics.java:102)

at 
com.crossix.safemine.cloud.StatisticsFlow.calculateAllStatistics(StatisticsFlow.java:146)

at 
com.crossix.safemine.cloud.StatisticsFlow.runStatistics(StatisticsFlow.java:119)

at 
com.crossix.safemine.cloud.StatisticsFlow.initialFileStatistics(StatisticsFlow.java:77)

at com.crossix.safemine.cloud.SMCFlow.process(SMCFlow.java:221)

at com.crossix.safemine.cloud.SMCFlow.execute(SMCFlow.java:132)

at com.crossix.safemine.cloud.SMCFlow.run(SMCFlow.java:91)



I see that that spark package contains the dependency:

com.fasterxml.jackson.core:jackson-databind:jar:2.10.5:compile

 

But jackson-databind 2.10.5 does not contain 
ObjectMapper.createGenerator(java.io.OutputStream, 

Data analysis issues

2023-11-02 Thread Jauru Lin
Hello all,

I have a question about Apache Spark,
I would like to ask if I use Rstudio to connect to Spark to analyze data,
will the data I use be seen by Spark's back-end personnel?

Hope someone can solve my problem.
Thanks!


Re: Re: jackson-databind version mismatch

2023-11-02 Thread eab...@163.com
Hi,
But in fact, it does have those packages.

 D:\02_bigdata\spark-3.5.0-bin-hadoop3\jars 

2023/09/09  10:0875,567 jackson-annotations-2.15.2.jar
2023/09/09  10:08   549,207 jackson-core-2.15.2.jar
2023/09/09  10:08   232,248 jackson-core-asl-1.9.13.jar
2023/09/09  10:08 1,620,088 jackson-databind-2.15.2.jar
2023/09/09  10:0854,630 jackson-dataformat-yaml-2.15.2.jar
2023/09/09  10:08   122,937 jackson-datatype-jsr310-2.15.2.jar
2023/09/09  10:08   780,664 jackson-mapper-asl-1.9.13.jar
2023/09/09  10:08   513,968 jackson-module-scala_2.12-2.15.2.jar



eabour
 
From: Bjørn Jørgensen
Date: 2023-11-02 16:40
To: eab...@163.com
CC: user @spark; Saar Barhoom; moshik.vitas
Subject: Re: jackson-databind version mismatch
[SPARK-43225][BUILD][SQL] Remove jackson-core-asl and jackson-mapper-asl from 
pre-built distribution

tor. 2. nov. 2023 kl. 09:15 skrev Bjørn Jørgensen :
In spark 3.5.0 removed  jackson-core-asl and jackson-mapper-asl  those are with 
groupid org.codehaus.jackson. 

Those others jackson-* are with groupid com.fasterxml.jackson.core 


tor. 2. nov. 2023 kl. 01:43 skrev eab...@163.com :
Hi,
Please check the versions of jar files starting with "jackson-". Make sure 
all versions are consistent.  jackson jar list in spark-3.3.0:

2022/06/10  04:3775,714 jackson-annotations-2.13.3.jar
2022/06/10  04:37   374,895 jackson-core-2.13.3.jar
2022/06/10  04:37   232,248 jackson-core-asl-1.9.13.jar
2022/06/10  04:37 1,536,542 jackson-databind-2.13.3.jar
2022/06/10  04:3752,020 jackson-dataformat-yaml-2.13.3.jar
2022/06/10  04:37   121,201 jackson-datatype-jsr310-2.13.3.jar
2022/06/10  04:37   780,664 jackson-mapper-asl-1.9.13.jar
2022/06/10  04:37   458,981 jackson-module-scala_2.12-2.13.3.jar

Spark 3.3.0 uses Jackson version 2.13.3, while Spark 3.5.0 uses Jackson version 
2.15.2. I think you can remove the lower version of Jackson package to keep the 
versions consistent.
eabour
 
From: moshik.vi...@veeva.com.INVALID
Date: 2023-11-01 15:03
To: user@spark.apache.org
CC: 'Saar Barhoom'
Subject: jackson-databind version mismatch
Hi Spark team,
 
On upgrading spark version from 3.2.1 to 3.4.1 got the following issue:
java.lang.NoSuchMethodError: 'com.fasterxml.jackson.core.JsonGenerator 
com.fasterxml.jackson.databind.ObjectMapper.createGenerator(java.io.OutputStream,
 com.fasterxml.jackson.core.JsonEncoding)'
at 
org.apache.spark.util.JsonProtocol$.toJsonString(JsonProtocol.scala:75)
at 
org.apache.spark.SparkThrowableHelper$.getMessage(SparkThrowableHelper.scala:74)
at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$7(SQLExecution.scala:127)
at scala.Option.map(Option.scala:230)
at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125)
at 
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:195)
at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:103)
at 
org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
at 
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:4165)
at org.apache.spark.sql.Dataset.head(Dataset.scala:3161)
at org.apache.spark.sql.Dataset.take(Dataset.scala:3382)
at org.apache.spark.sql.Dataset.takeAsList(Dataset.scala:3405)
at 
com.crossix.safemine.cloud.utils.DebugRDDLogger.showDataset(DebugRDDLogger.java:84)
at 
com.crossix.safemine.cloud.components.statistics.spark.StatisticsTransformer.getFillRateCountsWithSparkQuery(StatisticsTransformer.java:122)
at 
com.crossix.safemine.cloud.components.statistics.spark.StatisticsTransformer.calculateStatistics(StatisticsTransformer.java:61)
at 
com.crossix.safemine.cloud.components.statistics.spark.SparkFileStatistics.execute(SparkFileStatistics.java:102)
at 
com.crossix.safemine.cloud.StatisticsFlow.calculateAllStatistics(StatisticsFlow.java:146)
at 
com.crossix.safemine.cloud.StatisticsFlow.runStatistics(StatisticsFlow.java:119)
at 
com.crossix.safemine.cloud.StatisticsFlow.initialFileStatistics(StatisticsFlow.java:77)
at com.crossix.safemine.cloud.SMCFlow.process(SMCFlow.java:221)
at com.crossix.safemine.cloud.SMCFlow.execute(SMCFlow.java:132)
at com.crossix.safemine.cloud.SMCFlow.run(SMCFlow.java:91)

I see that that spark package contains the dependency:

Re: jackson-databind version mismatch

2023-11-02 Thread Bjørn Jørgensen
[SPARK-43225][BUILD][SQL] Remove jackson-core-asl and jackson-mapper-asl
from pre-built distribution 

tor. 2. nov. 2023 kl. 09:15 skrev Bjørn Jørgensen :

> In spark 3.5.0 removed  jackson-core-asl and jackson-mapper-asl  those
> are with groupid org.codehaus.jackson.
>
> Those others jackson-* are with groupid com.fasterxml.jackson.core
>
>
> tor. 2. nov. 2023 kl. 01:43 skrev eab...@163.com :
>
>> Hi,
>> Please check the versions of jar files starting with "jackson-". Make 
>> sure all versions are consistent.
>>  jackson jar list in spark-3.3.0:
>> 
>> 2022/06/10  04:3775,714 jackson-annotations-*2.13.3*.jar
>> 2022/06/10  04:37   374,895 jackson-core-*2.13.3*.jar
>> 2022/06/10  04:37   232,248 jackson-core-asl-1.9.13.jar
>> 2022/06/10  04:37 1,536,542 jackson-databind-*2.13.3*.jar
>> 2022/06/10  04:3752,020 jackson-dataformat-yaml-*2.13.3*.jar
>> 2022/06/10  04:37   121,201 jackson-datatype-jsr310-*2.13.3*.jar
>> 2022/06/10  04:37   780,664 jackson-mapper-asl-1.9.13.jar
>> 2022/06/10  04:37   458,981 jackson-module-scala_2.12-*2.13.3*
>> .jar
>> 
>>
>> Spark 3.3.0 uses Jackson version 2.13.3, while Spark 3.5.0 uses Jackson 
>> version 2.15.2.
>> I think you can remove the lower version of Jackson package to keep the 
>> versions consistent.
>> eabour
>>
>>
>> *From:* moshik.vi...@veeva.com.INVALID
>> *Date:* 2023-11-01 15:03
>> *To:* user@spark.apache.org
>> *CC:* 'Saar Barhoom' 
>> *Subject:* jackson-databind version mismatch
>>
>> Hi Spark team,
>>
>>
>>
>> On upgrading spark version from 3.2.1 to 3.4.1 got the following issue:
>>
>> *java.lang.NoSuchMethodError: 'com.fasterxml.jackson.core.JsonGenerator
>> com.fasterxml.jackson.databind.ObjectMapper.createGenerator(java.io.OutputStream,
>> com.fasterxml.jackson.core.JsonEncoding)'*
>>
>> *at
>> org.apache.spark.util.JsonProtocol$.toJsonString(JsonProtocol.scala:75)*
>>
>> *at
>> org.apache.spark.SparkThrowableHelper$.getMessage(SparkThrowableHelper.scala:74)*
>>
>> *at
>> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$7(SQLExecution.scala:127)*
>>
>> *at scala.Option.map(Option.scala:230)*
>>
>> *at
>> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125)*
>>
>> *at
>> org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:195)*
>>
>> *at
>> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:103)*
>>
>> *at
>> org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)*
>>
>> *at
>> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)*
>>
>> *at
>> org.apache.spark.sql.Dataset.withAction(Dataset.scala:4165)*
>>
>> *at org.apache.spark.sql.Dataset.head(Dataset.scala:3161)*
>>
>> *at org.apache.spark.sql.Dataset.take(Dataset.scala:3382)*
>>
>> *at
>> org.apache.spark.sql.Dataset.takeAsList(Dataset.scala:3405)*
>>
>> *at
>> com.crossix.safemine.cloud.utils.DebugRDDLogger.showDataset(DebugRDDLogger.java:84)*
>>
>> *at
>> com.crossix.safemine.cloud.components.statistics.spark.StatisticsTransformer.getFillRateCountsWithSparkQuery(StatisticsTransformer.java:122)*
>>
>> *at
>> com.crossix.safemine.cloud.components.statistics.spark.StatisticsTransformer.calculateStatistics(StatisticsTransformer.java:61)*
>>
>> *at
>> com.crossix.safemine.cloud.components.statistics.spark.SparkFileStatistics.execute(SparkFileStatistics.java:102)*
>>
>> *at
>> com.crossix.safemine.cloud.StatisticsFlow.calculateAllStatistics(StatisticsFlow.java:146)*
>>
>> *at
>> com.crossix.safemine.cloud.StatisticsFlow.runStatistics(StatisticsFlow.java:119)*
>>
>> *at
>> com.crossix.safemine.cloud.StatisticsFlow.initialFileStatistics(StatisticsFlow.java:77)*
>>
>> *at
>> com.crossix.safemine.cloud.SMCFlow.process(SMCFlow.java:221)*
>>
>> *at
>> com.crossix.safemine.cloud.SMCFlow.execute(SMCFlow.java:132)*
>>
>> *at
>> com.crossix.safemine.cloud.SMCFlow.run(SMCFlow.java:91)*
>>
>>
>>
>> I see that that spark package contains the dependency:
>>
>> com.fasterxml.jackson.core:jackson-databind:jar:2.10.5:compile
>>
>>
>>
>> But jackson-databind 2.10.5 does not contain 
>> *ObjectMapper.createGenerator(java.io.OutputStream,
>> com.fasterxml.jackson.core.JsonEncoding)*
>>
>> It was added on 2.11.0
>>
>>
>>
>> Trying to upgrade jackson-databind fails with:
>>
>> *com.fasterxml.jackson.databind.JsonMappingException: Scala module 2.10.5
>> requires Jackson Databind version >= 2.10.0 and < 2.11.0*
>>
>>
>>

Re: Spark / Scala conflict

2023-11-02 Thread Aironman DirtDiver
The error message Caused by: java.lang.ClassNotFoundException:
scala.Product$class indicates that the Spark job is trying to load a class
that is not available in the classpath. This can happen if the Spark job is
compiled with a different version of Scala than the version of Scala that
is used to run the job.

You have mentioned that you are using Spark 3.5.0, which is compatible with
Scala 2.12. However, you have also mentioned that you have tried Scala
versions 2.10, 2.11, 2.12, and 2.13. This suggests that you may have
multiple versions of Scala installed on your system.

To resolve the issue, you need to make sure that the Spark job is compiled
and run with the same version of Scala. You can do this by setting the
SPARK_SCALA_VERSION environment variable to the desired Scala version
before starting the Spark job.

For example, to compile the Spark job with Scala 2.12, you would run the
following command:

SPARK_SCALA_VERSION=2.12 sbt compile

To run the Spark job with Scala 2.12, you would run the following command:

SPARK_SCALA_VERSION=2.12 spark-submit spark-job.jar

If you are using Databricks, you can set the Scala version for the Spark
cluster in the cluster creation settings.

Once you have ensured that the Spark job is compiled and run with the same
version of Scala, the error should be resolved.

Here are some additional tips for troubleshooting Scala version conflicts:

   - Make sure that you are using the correct version of the Spark
   libraries. The Spark libraries must be compiled with the same version of
   Scala as the Spark job.
   - If you are using a third-party library, make sure that it is
   compatible with the version of Scala that you are using.
   - Check the Spark logs for any ClassNotFoundExceptions. The logs may
   indicate the specific class that is missing from the classpath.
   - Use a tool like sbt dependency:tree to view the dependencies of your
   Spark job. This can help you to identify any conflicting dependencies.


El jue, 2 nov 2023 a las 5:39, Harry Jamison
() escribió:

> I am getting the error below when I try to run a spark job connecting to
> phoneix.  It seems like I have the incorrect scala version that some part
> of the code is expecting.
>
> I am using spark 3.5.0, and I have copied these phoenix jars into the
> spark lib
> phoenix-server-hbase-2.5-5.1.3.jar
> phoenix-spark-5.0.0-HBase-2.0.jar
>
> I have tried scala 2.10, 2.11, 2.12, and 2.13
> I do not see the scala version used in the logs so I am not 100% sure that
> it is using the version I expect that it should be.
>
>
> Here is the exception that I am getting
>
> 2023-11-01T16:13:00,391 INFO  [Thread-4] handler.ContextHandler: Started
> o.s.j.s.ServletContextHandler@15cd3b2a{/static/sql,null,AVAILABLE,@Spark}
> Traceback (most recent call last):
>   File "/hadoop/spark/spark-3.5.0-bin-hadoop3/copy_tables.py", line 10, in
> 
> .option("zkUrl", "namenode:2181").load()
>   File
> "/hadoop/spark/spark/python/lib/pyspark.zip/pyspark/sql/readwriter.py",
> line 314, in load
>   File
> "/hadoop/spark/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py",
> line 1322, in __call__
>   File
> "/hadoop/spark/spark/python/lib/pyspark.zip/pyspark/errors/exceptions/captured.py",
> line 179, in deco
>   File
> "/hadoop/spark/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/protocol.py",
> line 326, in get_return_value
> py4j.protocol.Py4JJavaError: An error occurred while calling o28.load.
> : java.lang.NoClassDefFoundError: scala/Product$class
> at
> org.apache.phoenix.spark.PhoenixRelation.(PhoenixRelation.scala:29)
> at
> org.apache.phoenix.spark.DefaultSource.createRelation(DefaultSource.scala:29)
> at
> org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:346)
> at
> org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:229)
> at
> org.apache.spark.sql.DataFrameReader.$anonfun$load$2(DataFrameReader.scala:211)
> at scala.Option.getOrElse(Option.scala:189)
> at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211)
> at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:172)
> at
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
> Method)
> at
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.base/java.lang.reflect.Method.invoke(Method.java:566)
> at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
> at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
> at py4j.Gateway.invoke(Gateway.java:282)
> at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
> at py4j.commands.CallCommand.execute(CallCommand.java:79)
> at
> py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
> at 

Re: jackson-databind version mismatch

2023-11-02 Thread Bjørn Jørgensen
In spark 3.5.0 removed  jackson-core-asl and jackson-mapper-asl
 those are with
groupid org.codehaus.jackson.

Those others jackson-* are with groupid com.fasterxml.jackson.core


tor. 2. nov. 2023 kl. 01:43 skrev eab...@163.com :

> Hi,
> Please check the versions of jar files starting with "jackson-". Make 
> sure all versions are consistent.
>  jackson jar list in spark-3.3.0:
> 
> 2022/06/10  04:3775,714 jackson-annotations-*2.13.3*.jar
> 2022/06/10  04:37   374,895 jackson-core-*2.13.3*.jar
> 2022/06/10  04:37   232,248 jackson-core-asl-1.9.13.jar
> 2022/06/10  04:37 1,536,542 jackson-databind-*2.13.3*.jar
> 2022/06/10  04:3752,020 jackson-dataformat-yaml-*2.13.3*.jar
> 2022/06/10  04:37   121,201 jackson-datatype-jsr310-*2.13.3*.jar
> 2022/06/10  04:37   780,664 jackson-mapper-asl-1.9.13.jar
> 2022/06/10  04:37   458,981 jackson-module-scala_2.12-*2.13.3*.jar
> 
>
> Spark 3.3.0 uses Jackson version 2.13.3, while Spark 3.5.0 uses Jackson 
> version 2.15.2.
> I think you can remove the lower version of Jackson package to keep the 
> versions consistent.
> eabour
>
>
> *From:* moshik.vi...@veeva.com.INVALID
> *Date:* 2023-11-01 15:03
> *To:* user@spark.apache.org
> *CC:* 'Saar Barhoom' 
> *Subject:* jackson-databind version mismatch
>
> Hi Spark team,
>
>
>
> On upgrading spark version from 3.2.1 to 3.4.1 got the following issue:
>
> *java.lang.NoSuchMethodError: 'com.fasterxml.jackson.core.JsonGenerator
> com.fasterxml.jackson.databind.ObjectMapper.createGenerator(java.io.OutputStream,
> com.fasterxml.jackson.core.JsonEncoding)'*
>
> *at
> org.apache.spark.util.JsonProtocol$.toJsonString(JsonProtocol.scala:75)*
>
> *at
> org.apache.spark.SparkThrowableHelper$.getMessage(SparkThrowableHelper.scala:74)*
>
> *at
> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$7(SQLExecution.scala:127)*
>
> *at scala.Option.map(Option.scala:230)*
>
> *at
> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125)*
>
> *at
> org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:195)*
>
> *at
> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:103)*
>
> *at
> org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)*
>
> *at
> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)*
>
> *at
> org.apache.spark.sql.Dataset.withAction(Dataset.scala:4165)*
>
> *at org.apache.spark.sql.Dataset.head(Dataset.scala:3161)*
>
> *at org.apache.spark.sql.Dataset.take(Dataset.scala:3382)*
>
> *at
> org.apache.spark.sql.Dataset.takeAsList(Dataset.scala:3405)*
>
> *at
> com.crossix.safemine.cloud.utils.DebugRDDLogger.showDataset(DebugRDDLogger.java:84)*
>
> *at
> com.crossix.safemine.cloud.components.statistics.spark.StatisticsTransformer.getFillRateCountsWithSparkQuery(StatisticsTransformer.java:122)*
>
> *at
> com.crossix.safemine.cloud.components.statistics.spark.StatisticsTransformer.calculateStatistics(StatisticsTransformer.java:61)*
>
> *at
> com.crossix.safemine.cloud.components.statistics.spark.SparkFileStatistics.execute(SparkFileStatistics.java:102)*
>
> *at
> com.crossix.safemine.cloud.StatisticsFlow.calculateAllStatistics(StatisticsFlow.java:146)*
>
> *at
> com.crossix.safemine.cloud.StatisticsFlow.runStatistics(StatisticsFlow.java:119)*
>
> *at
> com.crossix.safemine.cloud.StatisticsFlow.initialFileStatistics(StatisticsFlow.java:77)*
>
> *at
> com.crossix.safemine.cloud.SMCFlow.process(SMCFlow.java:221)*
>
> *at
> com.crossix.safemine.cloud.SMCFlow.execute(SMCFlow.java:132)*
>
> *at
> com.crossix.safemine.cloud.SMCFlow.run(SMCFlow.java:91)*
>
>
>
> I see that that spark package contains the dependency:
>
> com.fasterxml.jackson.core:jackson-databind:jar:2.10.5:compile
>
>
>
> But jackson-databind 2.10.5 does not contain 
> *ObjectMapper.createGenerator(java.io.OutputStream,
> com.fasterxml.jackson.core.JsonEncoding)*
>
> It was added on 2.11.0
>
>
>
> Trying to upgrade jackson-databind fails with:
>
> *com.fasterxml.jackson.databind.JsonMappingException: Scala module 2.10.5
> requires Jackson Databind version >= 2.10.0 and < 2.11.0*
>
>
>
> According to spark 3.3.0 release notes: "Upgrade Jackson to 2.13.3" but
> spark package of 3.4.1 contains Jackson of 2.10.5
>
> (https://spark.apache.org/releases/spark-release-3-3-0.html)
>
> What am I missing?
>
>
>
> --
>
> 

Spark / Scala conflict

2023-11-01 Thread Harry Jamison
I am getting the error below when I try to run a spark job connecting to 
phoneix.  It seems like I have the incorrect scala version that some part of 
the code is expecting.

I am using spark 3.5.0, and I have copied these phoenix jars into the spark lib
phoenix-server-hbase-2.5-5.1.3.jar  
phoenix-spark-5.0.0-HBase-2.0.jar

I have tried scala 2.10, 2.11, 2.12, and 2.13
I do not see the scala version used in the logs so I am not 100% sure that it 
is using the version I expect that it should be.


Here is the exception that I am getting

2023-11-01T16:13:00,391 INFO  [Thread-4] handler.ContextHandler: Started 
o.s.j.s.ServletContextHandler@15cd3b2a{/static/sql,null,AVAILABLE,@Spark}
Traceback (most recent call last):
  File "/hadoop/spark/spark-3.5.0-bin-hadoop3/copy_tables.py", line 10, in 

.option("zkUrl", "namenode:2181").load()
  File "/hadoop/spark/spark/python/lib/pyspark.zip/pyspark/sql/readwriter.py", 
line 314, in load
  File 
"/hadoop/spark/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", 
line 1322, in __call__
  File 
"/hadoop/spark/spark/python/lib/pyspark.zip/pyspark/errors/exceptions/captured.py",
 line 179, in deco
  File "/hadoop/spark/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/protocol.py", 
line 326, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o28.load.
: java.lang.NoClassDefFoundError: scala/Product$class
at org.apache.phoenix.spark.PhoenixRelation.(PhoenixRelation.scala:29)
at 
org.apache.phoenix.spark.DefaultSource.createRelation(DefaultSource.scala:29)
at 
org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:346)
at 
org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:229)
at 
org.apache.spark.sql.DataFrameReader.$anonfun$load$2(DataFrameReader.scala:211)
at scala.Option.getOrElse(Option.scala:189)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:172)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
Method)
at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at 
py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.ClassNotFoundException: scala.Product$class
at 
java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:581)
at 
java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:527)
... 20 more


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: jackson-databind version mismatch

2023-11-01 Thread eab...@163.com
Hi,
Please check the versions of jar files starting with "jackson-". Make sure 
all versions are consistent.  jackson jar list in spark-3.3.0:

2022/06/10  04:3775,714 jackson-annotations-2.13.3.jar
2022/06/10  04:37   374,895 jackson-core-2.13.3.jar
2022/06/10  04:37   232,248 jackson-core-asl-1.9.13.jar
2022/06/10  04:37 1,536,542 jackson-databind-2.13.3.jar
2022/06/10  04:3752,020 jackson-dataformat-yaml-2.13.3.jar
2022/06/10  04:37   121,201 jackson-datatype-jsr310-2.13.3.jar
2022/06/10  04:37   780,664 jackson-mapper-asl-1.9.13.jar
2022/06/10  04:37   458,981 jackson-module-scala_2.12-2.13.3.jar

Spark 3.3.0 uses Jackson version 2.13.3, while Spark 3.5.0 uses Jackson version 
2.15.2. I think you can remove the lower version of Jackson package to keep the 
versions consistent.
eabour
 
From: moshik.vi...@veeva.com.INVALID
Date: 2023-11-01 15:03
To: user@spark.apache.org
CC: 'Saar Barhoom'
Subject: jackson-databind version mismatch
Hi Spark team,
 
On upgrading spark version from 3.2.1 to 3.4.1 got the following issue:
java.lang.NoSuchMethodError: 'com.fasterxml.jackson.core.JsonGenerator 
com.fasterxml.jackson.databind.ObjectMapper.createGenerator(java.io.OutputStream,
 com.fasterxml.jackson.core.JsonEncoding)'
at 
org.apache.spark.util.JsonProtocol$.toJsonString(JsonProtocol.scala:75)
at 
org.apache.spark.SparkThrowableHelper$.getMessage(SparkThrowableHelper.scala:74)
at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$7(SQLExecution.scala:127)
at scala.Option.map(Option.scala:230)
at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125)
at 
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:195)
at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:103)
at 
org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
at 
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:4165)
at org.apache.spark.sql.Dataset.head(Dataset.scala:3161)
at org.apache.spark.sql.Dataset.take(Dataset.scala:3382)
at org.apache.spark.sql.Dataset.takeAsList(Dataset.scala:3405)
at 
com.crossix.safemine.cloud.utils.DebugRDDLogger.showDataset(DebugRDDLogger.java:84)
at 
com.crossix.safemine.cloud.components.statistics.spark.StatisticsTransformer.getFillRateCountsWithSparkQuery(StatisticsTransformer.java:122)
at 
com.crossix.safemine.cloud.components.statistics.spark.StatisticsTransformer.calculateStatistics(StatisticsTransformer.java:61)
at 
com.crossix.safemine.cloud.components.statistics.spark.SparkFileStatistics.execute(SparkFileStatistics.java:102)
at 
com.crossix.safemine.cloud.StatisticsFlow.calculateAllStatistics(StatisticsFlow.java:146)
at 
com.crossix.safemine.cloud.StatisticsFlow.runStatistics(StatisticsFlow.java:119)
at 
com.crossix.safemine.cloud.StatisticsFlow.initialFileStatistics(StatisticsFlow.java:77)
at com.crossix.safemine.cloud.SMCFlow.process(SMCFlow.java:221)
at com.crossix.safemine.cloud.SMCFlow.execute(SMCFlow.java:132)
at com.crossix.safemine.cloud.SMCFlow.run(SMCFlow.java:91)

I see that that spark package contains the dependency:
com.fasterxml.jackson.core:jackson-databind:jar:2.10.5:compile
 
But jackson-databind 2.10.5 does not contain 
ObjectMapper.createGenerator(java.io.OutputStream, 
com.fasterxml.jackson.core.JsonEncoding)
It was added on 2.11.0
 
Trying to upgrade jackson-databind fails with:
com.fasterxml.jackson.databind.JsonMappingException: Scala module 2.10.5 
requires Jackson Databind version >= 2.10.0 and < 2.11.0
 
According to spark 3.3.0 release notes: "Upgrade Jackson to 2.13.3" but spark 
package of 3.4.1 contains Jackson of 2.10.5
(https://spark.apache.org/releases/spark-release-3-3-0.html)
What am I missing?
 
--
Moshik Vitas
Senior Software Developer, Crossix
Veeva Systems
m: +972-54-5326-400
moshik.vi...@veeva.com
 


Fixed byte array issue

2023-11-01 Thread KhajaAsmath Mohammed
Hi,

I am facing an issue with fixed byte array issue when reading spark
dataframe. spark.sql.parquet.enableVectorizedReader = false is solving my
issue but it is causing significant performance issue. any resolution for
this?

Thanks,
Asmath


jackson-databind version mismatch

2023-11-01 Thread moshik.vitas
Hi Spark team,

 

On upgrading spark version from 3.2.1 to 3.4.1 got the following issue:

java.lang.NoSuchMethodError: 'com.fasterxml.jackson.core.JsonGenerator 
com.fasterxml.jackson.databind.ObjectMapper.createGenerator(java.io.OutputStream,
 com.fasterxml.jackson.core.JsonEncoding)'

at 
org.apache.spark.util.JsonProtocol$.toJsonString(JsonProtocol.scala:75)

at 
org.apache.spark.SparkThrowableHelper$.getMessage(SparkThrowableHelper.scala:74)

at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$7(SQLExecution.scala:127)

at scala.Option.map(Option.scala:230)

at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125)

at 
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:195)

at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:103)

at 
org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)

at 
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)

at org.apache.spark.sql.Dataset.withAction(Dataset.scala:4165)

at org.apache.spark.sql.Dataset.head(Dataset.scala:3161)

at org.apache.spark.sql.Dataset.take(Dataset.scala:3382)

at org.apache.spark.sql.Dataset.takeAsList(Dataset.scala:3405)

at 
com.crossix.safemine.cloud.utils.DebugRDDLogger.showDataset(DebugRDDLogger.java:84)

at 
com.crossix.safemine.cloud.components.statistics.spark.StatisticsTransformer.getFillRateCountsWithSparkQuery(StatisticsTransformer.java:122)

at 
com.crossix.safemine.cloud.components.statistics.spark.StatisticsTransformer.calculateStatistics(StatisticsTransformer.java:61)

at 
com.crossix.safemine.cloud.components.statistics.spark.SparkFileStatistics.execute(SparkFileStatistics.java:102)

at 
com.crossix.safemine.cloud.StatisticsFlow.calculateAllStatistics(StatisticsFlow.java:146)

at 
com.crossix.safemine.cloud.StatisticsFlow.runStatistics(StatisticsFlow.java:119)

at 
com.crossix.safemine.cloud.StatisticsFlow.initialFileStatistics(StatisticsFlow.java:77)

at com.crossix.safemine.cloud.SMCFlow.process(SMCFlow.java:221)

at com.crossix.safemine.cloud.SMCFlow.execute(SMCFlow.java:132)

at com.crossix.safemine.cloud.SMCFlow.run(SMCFlow.java:91)



I see that that spark package contains the dependency:

com.fasterxml.jackson.core:jackson-databind:jar:2.10.5:compile

 

But jackson-databind 2.10.5 does not contain 
ObjectMapper.createGenerator(java.io.OutputStream, 
com.fasterxml.jackson.core.JsonEncoding)

It was added on 2.11.0

 

Trying to upgrade jackson-databind fails with:

com.fasterxml.jackson.databind.JsonMappingException: Scala module 2.10.5 
requires Jackson Databind version >= 2.10.0 and < 2.11.0

 

According to spark 3.3.0 release notes: "Upgrade Jackson to 2.13.3" but spark 
package of 3.4.1 contains Jackson of 2.10.5

(https://spark.apache.org/releases/spark-release-3-3-0.html)

What am I missing?

 

--

Moshik Vitas

Senior Software Developer, Crossix

Veeva Systems

m: +972-54-5326-400

  moshik.vi...@veeva.com

 



Elasticity and scalability for Spark in Kubernetes

2023-10-30 Thread Mich Talebzadeh
I  was thinking in line of elasticity and autoscaling for Spark in the
context of Kubernetes. My experience with Kubernetes and Spark on the so
called autopilot has not been that great.This is mainly from my experience
that in autopilot you let the choice of nodes be decided by the vendor's
default configuration. Autopilot assumes that you can scale horizontally if
resource allocation is not there. However, this does not take into account,
if you start a k8s node of 4GB which is totally inadequate for a spark job
with moderate loads. Simply the driver pod fails to create and autopilot
starts building the cluster again. causing the delay. Sure it can start
with a larger node size and it might get there eventually at a considerable
delay.

Vertical elasticity refers to the ability of a single application instance
to scale its resources up or down. This can be done by adjusting the amount
of memory, CPU, or storage allocated to the application.

Horizontal autoscaling refers to the ability to automatically add or remove
application instances based on the workload. This is typically done by
monitoring the application's performance metrics, such as CPU utilization,
memory usage, or request latency.

Vertical elasticity


   - Memory: The amount of memory allocated to each Spark executor.
   - CPU: The number of CPU cores allocated to each Spark executor.
   - Storage: The amount of storage allocated to each Spark executor.


Horizontal autoscaling


   - Minimum number of executors: The minimum number of executors that
   should be running at any given time.
   - Maximum number of executors: The maximum number of executors that can
   be running at any given time.
   - Target CPU utilization: The desired CPU utilization for the cluster.
   - Target memory utilization: The desired memory utilization for the
   cluster.
   - Target request latency: The desired request latency for the
   application.


For example, in Python I would have these:


# Setting the horizontal autoscaling parameters

spark.conf.set('spark.dynamicAllocation.enabled', 'true') spark.conf.set(
'spark.dynamicAllocation.minExecutors', min_instances) spark.conf.set(
'spark.dynamicAllocation.maxExecutors', max_instances) spark.conf.set(
'spark.dynamicAllocation.targetExecutorIdleTime', 30) spark.conf.set(
'spark.dynamicAllocation.initialExecutors', 4)
spark.conf.set('spark.dynamicAllocation.targetRequestLatency', 100)

I have have also set the following properties, which are not strictly
necessary for horizontal autoscaling, but which can be helpful

   - target_memory_utilization: This property specifies the desired memory
   utilization for the application cluster.
   - target_request_latency: This property specifies the desired request
   latency for the application cluster.


spark.conf.set('target_request_latency '.100)
spark.conf.set('target_memory_utilization', 60)
Anyway this a sample of parameters that I use in spark-submit

spark-submit --verbose \
   --properties-file ${property_file} \
   --master k8s://https://$KUBERNETES_MASTER_IP:443 \
   --deploy-mode cluster \
   --name $APPNAME \
   --py-files $CODE_DIRECTORY_CLOUD/spark_on_gke.zip \
   --conf spark.kubernetes.namespace=$NAMESPACE \
   --conf spark.network.timeout=300 \
   --conf spark.kubernetes.allocation.batch.size=3 \
   --conf spark.kubernetes.allocation.batch.delay=1 \
   --conf spark.kubernetes.driver.container.image=${IMAGEDRIVER} \
   --conf spark.kubernetes.executor.container.image=${IMAGEDRIVER} \
   --conf spark.kubernetes.driver.pod.name=$APPNAME \
   --conf
spark.kubernetes.authenticate.driver.serviceAccountName=spark-bq \
   --conf
spark.driver.extraJavaOptions="-Dio.netty.tryReflectionSetAccessible=true" \
   --conf
spark.executor.extraJavaOptions="-Dio.netty.tryReflectionSetAccessible=true"
\
   --conf spark.dynamicAllocation.enabled=true \
   --conf spark.dynamicAllocation.shuffleTracking.enabled=true \
   --conf spark.dynamicAllocation.shuffleTracking.timeout=20s \
   --conf spark.dynamicAllocation.executorIdleTimeout=30s \
   --conf spark.dynamicAllocation.cachedExecutorIdleTimeout=40s \
   --conf spark.dynamicAllocation.minExecutors=0 \
   --conf spark.dynamicAllocation.maxExecutors=20 \
   --conf spark.driver.cores=3 \
   --conf spark.executor.cores=3 \
   --conf spark.driver.memory=1024m \
   --conf spark.executor.memory=1024m \
   $CODE_DIRECTORY_CLOUD/${APPLICATION}

Note that I have kept the memory low (both the driver and executor) to move
the submit job from Pending to Running state. This is by no means optimum
but I like to explore ideas on it.
with the other members.

Thanks

Mich Talebzadeh,
Distinguished Technologist, Solutions Architect & Engineer
London
United Kingdom

   view my Linkedin profile

 

Re: Re: Running Spark Connect Server in Cluster Mode on Kubernetes

2023-10-29 Thread Nagatomi Yasukazu
Hi, eabour

Thank you for the insights.

Based on the information you provided, along with the PR
[SPARK-42371][CONNECT] that add "./sbin/start-connect-server.sh" script,
I'll experiment with launching the Spark Connect Server in Cluster Mode on
Kubernetes.

[SPARK-42371][CONNECT] Add scripts to start and stop Spark Connect server
https://github.com/apache/spark/pull/39928

I'll keep you updated on the progress in this thread.

> ALL

If anyone has successfully launched the Spark Connect Server in Cluster
Mode on an on-premises Kubernetes, I'd greatly appreciate it if you could
share your experience or any relevant information.

Any related insights are also very welcome!

Best regards,
Yasukazu

2023年10月19日(木) 16:11 eab...@163.com :

> Hi,
> I have found three important classes:
>
>1. *org.apache.spark.sql.connect.service.SparkConnectServer* : the 
> ./sbin/start-connect-server.sh
>script use SparkConnectServer  class as main class. In main function,
>use SparkSession.builder.getOrCreate() create local sessin, and
>start SparkConnectService.
>2. *org.apache.spark.sql.connect.SparkConnectPlugin* : To enable Spark
>Connect, simply make sure that the appropriate JAR is available in the
>CLASSPATH and the driver plugin is configured to load this class.
>3. *org.apache.spark.sql.connect.SimpleSparkConnectService* : A simple
>main class method to start the spark connect server as a service for client
>tests.
>
>
>So, I believe that by configuring the spark.plugins and starting the
> Spark cluster on Kubernetes, clients can utilize sc://ip:port to connect
> to the remote server.
>Let me give it a try.
>
> --
> eabour
>
>
> *From:* eab...@163.com
> *Date:* 2023-10-19 14:28
> *To:* Nagatomi Yasukazu ; user @spark
> 
> *Subject:* Re: Re: Running Spark Connect Server in Cluster Mode on
> Kubernetes
> Hi all,
>
> Has the spark connect server running on k8s functionality been implemented?
>
> --
>
>
> *From:* Nagatomi Yasukazu 
> *Date:* 2023-09-05 17:51
> *To:* user 
> *Subject:* Re: Running Spark Connect Server in Cluster Mode on Kubernetes
> Dear Spark Community,
>
> I've been exploring the capabilities of the Spark Connect Server and
> encountered an issue when trying to launch it in a cluster deploy mode with
> Kubernetes as the master.
>
> While initiating the `start-connect-server.sh` script with the `--conf`
> parameter for `spark.master` and `spark.submit.deployMode`, I was met with
> an error message:
>
> ```
> Exception in thread "main" org.apache.spark.SparkException: Cluster deploy
> mode is not applicable to Spark Connect server.
> ```
>
> This error message can be traced back to Spark's source code here:
>
> https://github.com/apache/spark/blob/6c885a7cf57df328b03308cff2eed814bda156e4/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala#L307
>
> Given my observations, I'm curious about the Spark Connect Server roadmap:
>
> Is there a plan or current conversation to enable Kubernetes as a master
> in Spark Connect Server's cluster deploy mode?
>
> I have tried to gather information from existing JIRA tickets, but have
> not been able to get a definitive answer:
>
> https://issues.apache.org/jira/browse/SPARK-42730
> https://issues.apache.org/jira/browse/SPARK-39375
> https://issues.apache.org/jira/browse/SPARK-44117
>
> Any thoughts, updates, or references to similar conversations or
> initiatives would be greatly appreciated.
>
> Thank you for your time and expertise!
>
> Best regards,
> Yasukazu
>
> 2023年9月5日(火) 12:09 Nagatomi Yasukazu :
>
>> Hello Mich,
>> Thank you for your questions. Here are my responses:
>>
>> > 1. What investigation have you done to show that it is running in local
>> mode?
>>
>> I have verified through the History Server's Environment tab that:
>> - "spark.master" is set to local[*]
>> - "spark.app.id" begins with local-xxx
>> - "spark.submit.deployMode" is set to local
>>
>>
>> > 2. who has configured this kubernetes cluster? Is it supplied by a
>> cloud vendor?
>>
>> Our Kubernetes cluster was set up in an on-prem environment using RKE2(
>> https://docs.rke2.io/ ).
>>
>>
>> > 3. Confirm that you have configured Spark Connect Server correctly for
>> cluster mode. Make sure you specify the cluster manager (e.g., Kubernetes)
>> and other relevant Spark configurations in your Spark job submission.
>>
>> Based on the Spark Connect documentation I've read, there doesn't seem to
>> be any specific settings for cluster mode related to the Spark Connect
>> Server.
>>
>> Configuration - Spark 3.4.1 Documentation
>> https://spark.apache.org/docs/3.4.1/configuration.html#spark-connect
>>
>> Quickstart: Spark Connect — PySpark 3.4.1 documentation
>>
>> https://spark.apache.org/docs/latest/api/python/getting_started/quickstart_connect.html
>>
>> Spark Connect Overview - Spark 3.4.1 Documentation
>> https://spark.apache.org/docs/latest/spark-connect-overview.html
>>
>> The 

Re: Spark join produce duplicate rows in resultset

2023-10-27 Thread Meena Rajani
Thanks all:

Patrick selected rev.* and I.* cleared the confusion. The Item actually
brought 4 rows hence the final result set had 4 rows.

Regards,
Meena

On Sun, Oct 22, 2023 at 10:13 AM Bjørn Jørgensen 
wrote:

> alos remove the space in rev. scode
>
> søn. 22. okt. 2023 kl. 19:08 skrev Sadha Chilukoori <
> sage.quoti...@gmail.com>:
>
>> Hi Meena,
>>
>> I'm asking to clarify, are the *on *& *and* keywords optional in the
>> join conditions?
>>
>> Please try this snippet, and see if it helps
>>
>> select rev.* from rev
>> inner join customer c
>> on rev.custumer_id =c.id
>> inner join product p
>> on rev.sys = p.sys
>> and rev.prin = p.prin
>> and rev.scode= p.bcode
>>
>> left join item I
>> on rev.sys = I.sys
>> and rev.custumer_id = I.custumer_id
>> and rev. scode = I.scode;
>>
>> Thanks,
>> Sadha
>>
>> On Sat, Oct 21, 2023 at 3:21 PM Meena Rajani 
>> wrote:
>>
>>> Hello all:
>>>
>>> I am using spark sql to join two tables. To my surprise I am
>>> getting redundant rows. What could be the cause.
>>>
>>>
>>> select rev.* from rev
>>> inner join customer c
>>> on rev.custumer_id =c.id
>>> inner join product p
>>> rev.sys = p.sys
>>> rev.prin = p.prin
>>> rev.scode= p.bcode
>>>
>>> left join item I
>>> on rev.sys = i.sys
>>> rev.custumer_id = I.custumer_id
>>> rev. scode = I.scode
>>>
>>> where rev.custumer_id = '123456789'
>>>
>>> The first part of the code brings one row
>>>
>>> select rev.* from rev
>>> inner join customer c
>>> on rev.custumer_id =c.id
>>> inner join product p
>>> rev.sys = p.sys
>>> rev.prin = p.prin
>>> rev.scode= p.bcode
>>>
>>>
>>> The  item has two rows which have common attributes  and the* final
>>> join should result in 2 rows. But I am seeing 4 rows instead.*
>>>
>>> left join item I
>>> on rev.sys = i.sys
>>> rev.custumer_id = I.custumer_id
>>> rev. scode = I.scode
>>>
>>>
>>>
>>> Regards,
>>> Meena
>>>
>>>
>>>
>
> --
> Bjørn Jørgensen
> Vestre Aspehaug 4, 6010 Ålesund
> Norge
>
> +47 480 94 297
>


Re: [Structured Streaming] Joins after aggregation don't work in streaming

2023-10-27 Thread Andrzej Zera
Hi, thank you very much for an update!

Thanks,
Andrzej

On 2023/10/27 01:50:35 Jungtaek Lim wrote:

> Hi, we are aware of your ticket and plan to look into it. We can't say
> about ETA but just wanted to let you know that we are going to look into
> it. Thanks for reporting!
>
> Thanks,
> Jungtaek Lim (HeartSaVioR)
>
> On Fri, Oct 27, 2023 at 5:22 AM Andrzej Zera 
> wrote:
>
>> Hey All,
>>
>> I'm trying to reproduce the following streaming operation: "Time window
>> aggregation in separate streams followed by stream-stream join". According
>> to documentation, this should be possible in Spark 3.5.0 but I had no
>> success despite different tries.
>>
>> Here is a documentation snippet I'm trying to reproduce:
>> https://github.com/apache/spark/blob/261b281e6e57be32eb28bf4e50bea24ed22a9f21/docs/structured-streaming-programming-guide.md?plain=1#L1939-L1995
>>
>> I created an issue with more details but no one responded yet:
>> https://issues.apache.org/jira/browse/SPARK-45637
>>
>> Thank you!
>> Andrzej
>>
>


Re: [Structured Streaming] Joins after aggregation don't work in streaming

2023-10-26 Thread Jungtaek Lim
Hi, we are aware of your ticket and plan to look into it. We can't say
about ETA but just wanted to let you know that we are going to look into
it. Thanks for reporting!

Thanks,
Jungtaek Lim (HeartSaVioR)

On Fri, Oct 27, 2023 at 5:22 AM Andrzej Zera  wrote:

> Hey All,
>
> I'm trying to reproduce the following streaming operation: "Time window
> aggregation in separate streams followed by stream-stream join". According
> to documentation, this should be possible in Spark 3.5.0 but I had no
> success despite different tries.
>
> Here is a documentation snippet I'm trying to reproduce:
> https://github.com/apache/spark/blob/261b281e6e57be32eb28bf4e50bea24ed22a9f21/docs/structured-streaming-programming-guide.md?plain=1#L1939-L1995
>
> I created an issue with more details but no one responded yet:
> https://issues.apache.org/jira/browse/SPARK-45637
>
> Thank you!
> Andrzej
>


[Structured Streaming] Joins after aggregation don't work in streaming

2023-10-26 Thread Andrzej Zera
Hey All,

I'm trying to reproduce the following streaming operation: "Time window
aggregation in separate streams followed by stream-stream join". According
to documentation, this should be possible in Spark 3.5.0 but I had no
success despite different tries.

Here is a documentation snippet I'm trying to reproduce:
https://github.com/apache/spark/blob/261b281e6e57be32eb28bf4e50bea24ed22a9f21/docs/structured-streaming-programming-guide.md?plain=1#L1939-L1995

I created an issue with more details but no one responded yet:
https://issues.apache.org/jira/browse/SPARK-45637

Thank you!
Andrzej


[Resolved] Re: spark.stop() cannot stop spark connect session

2023-10-25 Thread eab...@163.com
Hi all.
I read source code at spark/python/pyspark/sql/connect/session.py at master 
· apache/spark (github.com) and the comment for the "stop" method is described 
as follows:
def stop(self) -> None:
# Stopping the session will only close the connection to the current 
session (and
# the life cycle of the session is maintained by the server),
# whereas the regular PySpark session immediately terminates the Spark 
Context
# itself, meaning that stopping all Spark sessions.
# It is controversial to follow the existing the regular Spark 
session's behavior
# specifically in Spark Connect the Spark Connect server is designed for
# multi-tenancy - the remote client side cannot just stop the server 
and stop
# other remote clients being used from other users.
 
So, that's how it was designed.


eabour
 
From: eab...@163.com
Date: 2023-10-20 15:56
To: user @spark
Subject: spark.stop() cannot stop spark connect session
Hi,
my code:
from pyspark.sql import SparkSession
spark = SparkSession.builder.remote("sc://172.29.190.147").getOrCreate()

import pandas as pd
# 创建pandas dataframe
pdf = pd.DataFrame({
"name": ["Alice", "Bob", "Charlie"],
"age": [25, 30, 35],
"gender": ["F", "M", "M"]
})

# 将pandas dataframe转换为spark dataframe
sdf = spark.createDataFrame(pdf)

# 显示spark dataframe
sdf.show()

spark.stop()

After stop, execute sdf.show() throw 
pyspark.errors.exceptions.connect.SparkConnectException: [NO_ACTIVE_SESSION] No 
active Spark session found. Please create a new Spark session before running 
the code. Visit the Spark web UI at http://172.29.190.147:4040/connect/ to 
check if the current session is still running and has not been stopped yet.
1 session(s) are online, running 0 Request(s)
 Session Statistics (1)
1 Pages. Jump to. Showitems in a page.Go
Page:
1
User
Session ID
Start Time ▾
Finish Time
Duration
Total Execute
29f05cde-8f8b-418d-95c0-8dbbbfb556d22023/10/20 15:30:0414 minutes 49 seconds2


eabour


spark schema conflict behavior records being silently dropped

2023-10-24 Thread Carlos Aguni
hi all,

i noticed a weird behavior to when spark parses nested json with schema
conflict.

i also just noticed that spark "fixed" this in the most recent release
3.5.0 but since i'm working with AWS services being:
* EMR 6: spark 3.3.* spark3.4.*
* Glue 3: spark3.1.1
* Glue 4: spark 3.3.0
https://docs.aws.amazon.com/glue/latest/dg/release-notes.html
https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-release-app-versions-6.x.html

..we're still facing this issue company-wide.

the problem was that spark is silently dropping records (or even whole
files) when there's an schema conflict or empty string values (
https://kb.databricks.com/notebooks/json-reader-parses-value-as-null).

My whole concern here is that spark is not even echoing a warn or error or
exception when such cases occurs.

To reproduce:
i'm using amaznlinux2 with Python 3.7.16 and pyspark==3.4.1.

echo
'{"Records":[{"evtid":"1","requestParameters":{"DescribeHostsRequest":{"MaxResults":500}}}]}'
> one.json
echo
'{"Records":[{"evtid":"2","requestParameters":{"lol":{},"lol2":{}}},{"evtid":"3","requestParameters":{"DescribeHostsRequest":""}}]}'
> two.json

Output of command:
> spark.read.json(["one.json", "two.json"]).show()

using 3.1.0 and 3.3.0
+--+
|   Records|
+--+
|  null|
|[{1, {{500}}}]|
+--+
drops the second (two.json) file

using Spark 3.4.0
+--+
|   Records|
+--+
|  null|
|[{1, {{500}}}]|
+--+
it completely drops the second (two.json) file.

Spark 3.5.0
++
| Records|
++
|[{2, {NULL}}, {3,...|
|  [{1, {{500}}}]|
++
it reads both files but completely drops the "requestParameters" content of
all the records in the second (two.json) file.
{"evtid":"2","requestParameters":{}} <-- not good
{"evtid":"3","requestParameters":{}} <-- not good
{"evtid":"1","requestParameters":{"DescribeHostsRequest":{"MaxResults":500}}}

enabling spark.conf.set("spark.sql.legacy.json.allowEmptyString.enabled",
True) as suggested by
https://kb.databricks.com/notebooks/json-reader-parses-value-as-null in
spark 3.1 and 3.3 yields the same result seen in spark 3.5. which is not
ideal if one wants the later fetch the records as is.
to this. the only solution I found was to explicitly enforce the schema
when reading.

that said.
does anyone the exact thread or changelog where this issue was handled?
i've checked it on the links below but was non conclusive:
https://spark.apache.org/docs/latest/sql-migration-guide.html
https://spark.apache.org/releases/spark-release-3-5-0.html

another question.
how would you guys handle this scenario?
I could not see a clue even after enabling full verbose.
I could maybe force spark to issue an exception when such a case is
encountered.

or maybe send those bad/broken records to another file or bucket (dlq-ish)

best regards,c.


Re: automatically/dinamically renew aws temporary token

2023-10-24 Thread Carlos Aguni
hi all,

thank you for your reply.

> Can’t you attach the cross account permission to the glue job role? Why
the detour via AssumeRole ?
yes Jorn, i also believe this is the best approach. but here we're dealing
with company policies and all the bureaucracy that comes along.
in parallel i'm trying to argue on that path. by now even requesting an
increase on the session duration is a struggle.
but at the moment, since I was only allowed the AssumeRole approach i'm
figuring out a way through this path.

> https://github.com/zillow/aws-custom-credential-provider
thank you Pol. I'll take a look into the project.

regards,c.

On Mon, Oct 23, 2023 at 7:03 AM Pol Santamaria  wrote:

> Hi Carlos!
>
> Take a look at this project, it's 6 years old but the approach is still
> valid:
>
> https://github.com/zillow/aws-custom-credential-provider
>
> The credential provider gets called each time an S3 or Glue Catalog is
> accessed, and then you can decide whether to use a cached token or renew.
>
> Best,
>
> *Pol Santamaria*
>
>
> On Mon, Oct 23, 2023 at 8:08 AM Jörn Franke  wrote:
>
>> Can’t you attach the cross account permission to the glue job role? Why
>> the detour via AssumeRole ?
>>
>> Assumerole can make sense if you use an AWS IAM user and STS
>> authentication, but this would make no sense within AWS for cross-account
>> access as attaching the permissions to the Glue job role is more secure (no
>> need for static credentials, automatically renew permissions in shorter
>> time without any specific configuration in Spark).
>>
>> Have you checked with AWS support?
>>
>> Am 22.10.2023 um 21:14 schrieb Carlos Aguni :
>>
>> 
>> hi all,
>>
>> i've a scenario where I need to assume a cross account role to have S3
>> bucket access.
>>
>> the problem is that this role only allows for 1h time span (no
>> negotiation).
>>
>> that said.
>> does anyone know a way to tell spark to automatically renew the token
>> or to dinamically renew the token on each node?
>> i'm currently using spark on AWS glue.
>>
>> wonder what options do I have.
>>
>> regards,c.
>>
>>


Re: Maximum executors in EC2 Machine

2023-10-24 Thread Riccardo Ferrari
Hi,

I would refer to their documentation to better understand the concepts
behind cluster overview and submitting applications:

   -
   
https://spark.apache.org/docs/latest/cluster-overview.html#cluster-manager-types
   - https://spark.apache.org/docs/latest/submitting-applications.html

When using local[*]  you can get as many worker threads as your cores  in
the same jvm running your driver and not executors. If you want to test
against a real cluster you can look into using stand-alone mode.

HTH,
Riccardo

On Mon, Oct 23, 2023 at 5:31 PM KhajaAsmath Mohammed <
mdkhajaasm...@gmail.com> wrote:

> Hi,
>
> I am running a spark job in spark EC2 machine whiich has 40 cores. Driver
> and executor memory is 16 GB. I am using local[*] but I still get only one
> executor(driver). Is there a way to get more executors with this config.
>
> I am not using yarn or mesos in this case. Only one machine which is
> enough for our work load but the data is increased.
>
> Thanks,
> Asmath
>


submitting tasks failed in Spark standalone mode due to missing failureaccess jar file

2023-10-24 Thread eab...@163.com
Hi Team.
I use spark 3.5.0 to start Spark cluster with start-master.sh and 
start-worker.sh, when I use  ./bin/spark-shell --master 
spark://LAPTOP-TC4A0SCV.:7077 and get error logs: 
```
23/10/24 12:00:46 ERROR TaskSchedulerImpl: Lost an executor 1 (already 
removed): Command exited with code 50
```
  The worker finished executors  logs:
```
Spark Executor Command: 
"/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.372.b07-1.el7_9.x86_64/jre/bin/java" 
"-cp" 
"/root/spark-3.5.0-bin-hadoop3/conf/:/root/spark-3.5.0-bin-hadoop3/jars/*" 
"-Xmx1024M" "-Dspark.driver.port=43765" "-Djava.net.preferIPv6Addresses=false" 
"-XX:+IgnoreUnrecognizedVMOptions" 
"--add-opens=java.base/java.lang=ALL-UNNAMED" 
"--add-opens=java.base/java.lang.invoke=ALL-UNNAMED" 
"--add-opens=java.base/java.lang.reflect=ALL-UNNAMED" 
"--add-opens=java.base/java.io=ALL-UNNAMED" 
"--add-opens=java.base/java.net=ALL-UNNAMED" 
"--add-opens=java.base/java.nio=ALL-UNNAMED" 
"--add-opens=java.base/java.util=ALL-UNNAMED" 
"--add-opens=java.base/java.util.concurrent=ALL-UNNAMED" 
"--add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED" 
"--add-opens=java.base/sun.nio.ch=ALL-UNNAMED" 
"--add-opens=java.base/sun.nio.cs=ALL-UNNAMED" 
"--add-opens=java.base/sun.security.action=ALL-UNNAMED" 
"--add-opens=java.base/sun.util.calendar=ALL-UNNAMED" 
"--add-opens=java.security.jgss/sun.security.krb5=ALL-UNNAMED" 
"-Djdk.reflect.useDirectMethodHandle=false" 
"org.apache.spark.executor.CoarseGrainedExecutorBackend" "--driver-url" 
"spark://CoarseGrainedScheduler@172.29.190.147:43765" "--executor-id" "0" 
"--hostname" "172.29.190.147" "--cores" "6" "--app-id" 
"app-20231024120037-0001" "--worker-url" "spark://Worker@172.29.190.147:34707" 
"--resourceProfileId" "0"

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
23/10/24 12:00:39 INFO CoarseGrainedExecutorBackend: Started daemon with 
process name: 19535@LAPTOP-TC4A0SCV
23/10/24 12:00:39 INFO SignalUtils: Registering signal handler for TERM
23/10/24 12:00:39 INFO SignalUtils: Registering signal handler for HUP
23/10/24 12:00:39 INFO SignalUtils: Registering signal handler for INT
23/10/24 12:00:39 WARN Utils: Your hostname, LAPTOP-TC4A0SCV resolves to a 
loopback address: 127.0.1.1; using 172.29.190.147 instead (on interface eth0)
23/10/24 12:00:39 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another 
address
23/10/24 12:00:42 INFO CoarseGrainedExecutorBackend: Successfully registered 
with driver
23/10/24 12:00:42 INFO Executor: Starting executor ID 0 on host 172.29.190.147
23/10/24 12:00:42 INFO Executor: OS info Linux, 
5.15.123.1-microsoft-standard-WSL2, amd64
23/10/24 12:00:42 INFO Executor: Java version 1.8.0_372
23/10/24 12:00:42 INFO Utils: Successfully started service 
'org.apache.spark.network.netty.NettyBlockTransferService' on port 35227.
23/10/24 12:00:42 INFO NettyBlockTransferService: Server created on 
172.29.190.147:35227
23/10/24 12:00:42 INFO BlockManager: Using 
org.apache.spark.storage.RandomBlockReplicationPolicy for block replication 
policy
23/10/24 12:00:42 ERROR Inbox: An error happened while processing message in 
the inbox for Executor
java.lang.NoClassDefFoundError: 
org/sparkproject/guava/util/concurrent/internal/InternalFutureFailureAccess
at java.lang.ClassLoader.defineClass1(Native Method)
at java.lang.ClassLoader.defineClass(ClassLoader.java:756)
at 
java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
at java.net.URLClassLoader.defineClass(URLClassLoader.java:473)
at java.net.URLClassLoader.access$100(URLClassLoader.java:74)
at java.net.URLClassLoader$1.run(URLClassLoader.java:369)
at java.net.URLClassLoader$1.run(URLClassLoader.java:363)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:362)
at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)
at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
at java.lang.ClassLoader.defineClass1(Native Method)
at java.lang.ClassLoader.defineClass(ClassLoader.java:756)
at 
java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
at java.net.URLClassLoader.defineClass(URLClassLoader.java:473)
at java.net.URLClassLoader.access$100(URLClassLoader.java:74)
at java.net.URLClassLoader$1.run(URLClassLoader.java:369)
at java.net.URLClassLoader$1.run(URLClassLoader.java:363)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:362)
at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)
at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
at 

Contribution Recommendations

2023-10-23 Thread Phil Dakin
Per the "Contributing to Spark "
guide, I am requesting guidance on selecting a good ticket to take on. I've
opened documentation/test PRs:

https://github.com/apache/spark/pull/43369
https://github.com/apache/spark/pull/43405

If you have recommendations for the next ticket, please let me know. The
"starter" tag in Jira is sparsely populated.


Maximum executors in EC2 Machine

2023-10-23 Thread KhajaAsmath Mohammed
Hi,

I am running a spark job in spark EC2 machine whiich has 40 cores. Driver
and executor memory is 16 GB. I am using local[*] but I still get only one
executor(driver). Is there a way to get more executors with this config.

I am not using yarn or mesos in this case. Only one machine which is enough
for our work load but the data is increased.

Thanks,
Asmath


Re: automatically/dinamically renew aws temporary token

2023-10-23 Thread Pol Santamaria
Hi Carlos!

Take a look at this project, it's 6 years old but the approach is still
valid:

https://github.com/zillow/aws-custom-credential-provider

The credential provider gets called each time an S3 or Glue Catalog is
accessed, and then you can decide whether to use a cached token or renew.

Best,

*Pol Santamaria*


On Mon, Oct 23, 2023 at 8:08 AM Jörn Franke  wrote:

> Can’t you attach the cross account permission to the glue job role? Why
> the detour via AssumeRole ?
>
> Assumerole can make sense if you use an AWS IAM user and STS
> authentication, but this would make no sense within AWS for cross-account
> access as attaching the permissions to the Glue job role is more secure (no
> need for static credentials, automatically renew permissions in shorter
> time without any specific configuration in Spark).
>
> Have you checked with AWS support?
>
> Am 22.10.2023 um 21:14 schrieb Carlos Aguni :
>
> 
> hi all,
>
> i've a scenario where I need to assume a cross account role to have S3
> bucket access.
>
> the problem is that this role only allows for 1h time span (no
> negotiation).
>
> that said.
> does anyone know a way to tell spark to automatically renew the token
> or to dinamically renew the token on each node?
> i'm currently using spark on AWS glue.
>
> wonder what options do I have.
>
> regards,c.
>
>


Re: automatically/dinamically renew aws temporary token

2023-10-23 Thread Jörn Franke
Can’t you attach the cross account permission to the glue job role? Why the 
detour via AssumeRole ?

Assumerole can make sense if you use an AWS IAM user and STS authentication, 
but this would make no sense within AWS for cross-account access as attaching 
the permissions to the Glue job role is more secure (no need for static 
credentials, automatically renew permissions in shorter time without any 
specific configuration in Spark).

Have you checked with AWS support?

> Am 22.10.2023 um 21:14 schrieb Carlos Aguni :
> 
> 
> hi all,
> 
> i've a scenario where I need to assume a cross account role to have S3 bucket 
> access.
> 
> the problem is that this role only allows for 1h time span (no negotiation).
> 
> that said.
> does anyone know a way to tell spark to automatically renew the token
> or to dinamically renew the token on each node?
> i'm currently using spark on AWS glue.
> 
> wonder what options do I have.
> 
> regards,c.


Re: Spark join produce duplicate rows in resultset

2023-10-22 Thread Bjørn Jørgensen
alos remove the space in rev. scode

søn. 22. okt. 2023 kl. 19:08 skrev Sadha Chilukoori :

> Hi Meena,
>
> I'm asking to clarify, are the *on *& *and* keywords optional in the join
> conditions?
>
> Please try this snippet, and see if it helps
>
> select rev.* from rev
> inner join customer c
> on rev.custumer_id =c.id
> inner join product p
> on rev.sys = p.sys
> and rev.prin = p.prin
> and rev.scode= p.bcode
>
> left join item I
> on rev.sys = I.sys
> and rev.custumer_id = I.custumer_id
> and rev. scode = I.scode;
>
> Thanks,
> Sadha
>
> On Sat, Oct 21, 2023 at 3:21 PM Meena Rajani 
> wrote:
>
>> Hello all:
>>
>> I am using spark sql to join two tables. To my surprise I am
>> getting redundant rows. What could be the cause.
>>
>>
>> select rev.* from rev
>> inner join customer c
>> on rev.custumer_id =c.id
>> inner join product p
>> rev.sys = p.sys
>> rev.prin = p.prin
>> rev.scode= p.bcode
>>
>> left join item I
>> on rev.sys = i.sys
>> rev.custumer_id = I.custumer_id
>> rev. scode = I.scode
>>
>> where rev.custumer_id = '123456789'
>>
>> The first part of the code brings one row
>>
>> select rev.* from rev
>> inner join customer c
>> on rev.custumer_id =c.id
>> inner join product p
>> rev.sys = p.sys
>> rev.prin = p.prin
>> rev.scode= p.bcode
>>
>>
>> The  item has two rows which have common attributes  and the* final join
>> should result in 2 rows. But I am seeing 4 rows instead.*
>>
>> left join item I
>> on rev.sys = i.sys
>> rev.custumer_id = I.custumer_id
>> rev. scode = I.scode
>>
>>
>>
>> Regards,
>> Meena
>>
>>
>>

-- 
Bjørn Jørgensen
Vestre Aspehaug 4, 6010 Ålesund
Norge

+47 480 94 297


Re: Spark join produce duplicate rows in resultset

2023-10-22 Thread Sadha Chilukoori
Hi Meena,

I'm asking to clarify, are the *on *& *and* keywords optional in the join
conditions?

Please try this snippet, and see if it helps

select rev.* from rev
inner join customer c
on rev.custumer_id =c.id
inner join product p
on rev.sys = p.sys
and rev.prin = p.prin
and rev.scode= p.bcode

left join item I
on rev.sys = I.sys
and rev.custumer_id = I.custumer_id
and rev. scode = I.scode;

Thanks,
Sadha

On Sat, Oct 21, 2023 at 3:21 PM Meena Rajani  wrote:

> Hello all:
>
> I am using spark sql to join two tables. To my surprise I am
> getting redundant rows. What could be the cause.
>
>
> select rev.* from rev
> inner join customer c
> on rev.custumer_id =c.id
> inner join product p
> rev.sys = p.sys
> rev.prin = p.prin
> rev.scode= p.bcode
>
> left join item I
> on rev.sys = i.sys
> rev.custumer_id = I.custumer_id
> rev. scode = I.scode
>
> where rev.custumer_id = '123456789'
>
> The first part of the code brings one row
>
> select rev.* from rev
> inner join customer c
> on rev.custumer_id =c.id
> inner join product p
> rev.sys = p.sys
> rev.prin = p.prin
> rev.scode= p.bcode
>
>
> The  item has two rows which have common attributes  and the* final join
> should result in 2 rows. But I am seeing 4 rows instead.*
>
> left join item I
> on rev.sys = i.sys
> rev.custumer_id = I.custumer_id
> rev. scode = I.scode
>
>
>
> Regards,
> Meena
>
>
>


Re: Spark join produce duplicate rows in resultset

2023-10-22 Thread Patrick Tucci
Hi Meena,

It's not impossible, but it's unlikely that there's a bug in Spark SQL
randomly duplicating rows. The most likely explanation is there are more
records in the item table that match your sys/custumer_id/scode criteria
than you expect.

In your original query, try changing select rev.* to select I.*. This will
show you the records from item that the join produces. If the first part of
the code only returns one record, I expect you will see 4 distinct records
returned here.

Thanks,

Patrick


On Sun, Oct 22, 2023 at 1:29 AM Meena Rajani  wrote:

> Hello all:
>
> I am using spark sql to join two tables. To my surprise I am
> getting redundant rows. What could be the cause.
>
>
> select rev.* from rev
> inner join customer c
> on rev.custumer_id =c.id
> inner join product p
> rev.sys = p.sys
> rev.prin = p.prin
> rev.scode= p.bcode
>
> left join item I
> on rev.sys = i.sys
> rev.custumer_id = I.custumer_id
> rev. scode = I.scode
>
> where rev.custumer_id = '123456789'
>
> The first part of the code brings one row
>
> select rev.* from rev
> inner join customer c
> on rev.custumer_id =c.id
> inner join product p
> rev.sys = p.sys
> rev.prin = p.prin
> rev.scode= p.bcode
>
>
> The  item has two rows which have common attributes  and the* final join
> should result in 2 rows. But I am seeing 4 rows instead.*
>
> left join item I
> on rev.sys = i.sys
> rev.custumer_id = I.custumer_id
> rev. scode = I.scode
>
>
>
> Regards,
> Meena
>
>
>


automatically/dinamically renew aws temporary token

2023-10-22 Thread Carlos Aguni
hi all,

i've a scenario where I need to assume a cross account role to have S3 bucket 
access.

the problem is that this role only allows for 1h time span (no negotiation).

that said.
does anyone know a way to tell spark to automatically renew the token
or to dinamically renew the token on each node?
i'm currently using spark on AWS glue.

wonder what options do I have.

regards,c.


Spark join produce duplicate rows in resultset

2023-10-21 Thread Meena Rajani
Hello all:

I am using spark sql to join two tables. To my surprise I am
getting redundant rows. What could be the cause.


select rev.* from rev
inner join customer c
on rev.custumer_id =c.id
inner join product p
rev.sys = p.sys
rev.prin = p.prin
rev.scode= p.bcode

left join item I
on rev.sys = i.sys
rev.custumer_id = I.custumer_id
rev. scode = I.scode

where rev.custumer_id = '123456789'

The first part of the code brings one row

select rev.* from rev
inner join customer c
on rev.custumer_id =c.id
inner join product p
rev.sys = p.sys
rev.prin = p.prin
rev.scode= p.bcode


The  item has two rows which have common attributes  and the* final join
should result in 2 rows. But I am seeing 4 rows instead.*

left join item I
on rev.sys = i.sys
rev.custumer_id = I.custumer_id
rev. scode = I.scode



Regards,
Meena


Error when trying to get the data from Hive Materialized View

2023-10-21 Thread Siva Sankar Reddy
Hi Team ,

We are not getting any error when retrieving the data from hive table in
PYSPARK , but getting the error ( Scala.matcherror MATERIALIZED_VIEW ( of
class org.Apache.Hadoop.hive.metastore.TableType ) . Please let me know
resolution for this ?

Thanks


spark.stop() cannot stop spark connect session

2023-10-20 Thread eab...@163.com
Hi,
my code:
from pyspark.sql import SparkSession
spark = SparkSession.builder.remote("sc://172.29.190.147").getOrCreate()

import pandas as pd
# 创建pandas dataframe
pdf = pd.DataFrame({
"name": ["Alice", "Bob", "Charlie"],
"age": [25, 30, 35],
"gender": ["F", "M", "M"]
})

# 将pandas dataframe转换为spark dataframe
sdf = spark.createDataFrame(pdf)

# 显示spark dataframe
sdf.show()

spark.stop()

After stop, execute sdf.show() throw 
pyspark.errors.exceptions.connect.SparkConnectException: [NO_ACTIVE_SESSION] No 
active Spark session found. Please create a new Spark session before running 
the code. Visit the Spark web UI at http://172.29.190.147:4040/connect/ to 
check if the current session is still running and has not been stopped yet.
1 session(s) are online, running 0 Request(s)
 Session Statistics (1)
1 Pages. Jump to. Showitems in a page.Go
Page:
1
User
Session ID
Start Time ▾
Finish Time
Duration
Total Execute
29f05cde-8f8b-418d-95c0-8dbbbfb556d22023/10/20 15:30:0414 minutes 49 seconds2


eabour


"Premature end of Content-Length" Error

2023-10-19 Thread Sandhya Bala
Hi all,

I am running into the following error with spark 2.4.8

Job aborted due to stage failure: Task 9 in stage 2.0 failed 4 times, most
> recent failure: Lost task 9.3 in stage 2.0 (TID 100, 10.221.8.73, executor
> 2): org.apache.http.ConnectionClosedException: Premature end of
> Content-Length delimited message body (expected: 106327; received: 3477


but my current code doesn't have neither hadoop-aws nor aws-java-sdk.

Any suggestions on what could be the problem?

Thanks,
Sandhya


Re: Re: Running Spark Connect Server in Cluster Mode on Kubernetes

2023-10-19 Thread eab...@163.com
Hi,
I have found three important classes:
org.apache.spark.sql.connect.service.SparkConnectServer : the 
./sbin/start-connect-server.sh script use SparkConnectServer  class as main 
class. In main function, use SparkSession.builder.getOrCreate() create local 
sessin, and start SparkConnectService.
org.apache.spark.sql.connect.SparkConnectPlugin : To enable Spark Connect, 
simply make sure that the appropriate JAR is available in the CLASSPATH and the 
driver plugin is configured to load this class.
org.apache.spark.sql.connect.SimpleSparkConnectService : A simple main class 
method to start the spark connect server as a service for client tests. 

   So, I believe that by configuring the spark.plugins and starting the Spark 
cluster on Kubernetes, clients can utilize sc://ip:port to connect to the 
remote server. 
   Let me give it a try.



eabour
 
From: eab...@163.com
Date: 2023-10-19 14:28
To: Nagatomi Yasukazu; user @spark
Subject: Re: Re: Running Spark Connect Server in Cluster Mode on Kubernetes
Hi all, 
Has the spark connect server running on k8s functionality been implemented?



 
From: Nagatomi Yasukazu
Date: 2023-09-05 17:51
To: user
Subject: Re: Running Spark Connect Server in Cluster Mode on Kubernetes
Dear Spark Community,

I've been exploring the capabilities of the Spark Connect Server and 
encountered an issue when trying to launch it in a cluster deploy mode with 
Kubernetes as the master.

While initiating the `start-connect-server.sh` script with the `--conf` 
parameter for `spark.master` and `spark.submit.deployMode`, I was met with an 
error message:

```
Exception in thread "main" org.apache.spark.SparkException: Cluster deploy mode 
is not applicable to Spark Connect server.
```

This error message can be traced back to Spark's source code here:
https://github.com/apache/spark/blob/6c885a7cf57df328b03308cff2eed814bda156e4/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala#L307

Given my observations, I'm curious about the Spark Connect Server roadmap:

Is there a plan or current conversation to enable Kubernetes as a master in 
Spark Connect Server's cluster deploy mode?

I have tried to gather information from existing JIRA tickets, but have not 
been able to get a definitive answer:

https://issues.apache.org/jira/browse/SPARK-42730
https://issues.apache.org/jira/browse/SPARK-39375
https://issues.apache.org/jira/browse/SPARK-44117

Any thoughts, updates, or references to similar conversations or initiatives 
would be greatly appreciated.

Thank you for your time and expertise!

Best regards,
Yasukazu

2023年9月5日(火) 12:09 Nagatomi Yasukazu :
Hello Mich,
Thank you for your questions. Here are my responses:

> 1. What investigation have you done to show that it is running in local mode?

I have verified through the History Server's Environment tab that:
- "spark.master" is set to local[*]
- "spark.app.id" begins with local-xxx
- "spark.submit.deployMode" is set to local


> 2. who has configured this kubernetes cluster? Is it supplied by a cloud 
> vendor?

Our Kubernetes cluster was set up in an on-prem environment using RKE2( 
https://docs.rke2.io/ ).


> 3. Confirm that you have configured Spark Connect Server correctly for 
> cluster mode. Make sure you specify the cluster manager (e.g., Kubernetes) 
> and other relevant Spark configurations in your Spark job submission.

Based on the Spark Connect documentation I've read, there doesn't seem to be 
any specific settings for cluster mode related to the Spark Connect Server.

Configuration - Spark 3.4.1 Documentation
https://spark.apache.org/docs/3.4.1/configuration.html#spark-connect

Quickstart: Spark Connect — PySpark 3.4.1 documentation
https://spark.apache.org/docs/latest/api/python/getting_started/quickstart_connect.html

Spark Connect Overview - Spark 3.4.1 Documentation
https://spark.apache.org/docs/latest/spark-connect-overview.html

The documentation only suggests running ./sbin/start-connect-server.sh 
--packages org.apache.spark:spark-connect_2.12:3.4.0, leaving me at a loss.


> 4. Can you provide a full spark submit command

Given the nature of Spark Connect, I don't use the spark-submit command. 
Instead, as per the documentation, I can execute workloads using only a Python 
script. For the Spark Connect Server, I have a Kubernetes manifest executing 
"/opt.spark/sbin/start-connect-server.sh --packages 
org.apache.spark:spark-connect_2.12:3.4.0".


> 5. Make sure that the Python client script connecting to Spark Connect Server 
> specifies the cluster mode explicitly, like using --master or --deploy-mode 
> flags when creating a SparkSession.

The Spark Connect Server operates as a Driver, so it isn't possible to specify 
the --master or --deploy-mode flags in the Python client script. If I try, I 
encounter a RuntimeError.

like this:
RuntimeError: Spark master cannot be configured with Spark Connect server; 
however, found URL for Spark Connect [sc://.../]


> 6. Ensure that you have allocated 

Re: Re: Running Spark Connect Server in Cluster Mode on Kubernetes

2023-10-19 Thread eab...@163.com
Hi all, 
Has the spark connect server running on k8s functionality been implemented?



 
From: Nagatomi Yasukazu
Date: 2023-09-05 17:51
To: user
Subject: Re: Running Spark Connect Server in Cluster Mode on Kubernetes
Dear Spark Community,

I've been exploring the capabilities of the Spark Connect Server and 
encountered an issue when trying to launch it in a cluster deploy mode with 
Kubernetes as the master.

While initiating the `start-connect-server.sh` script with the `--conf` 
parameter for `spark.master` and `spark.submit.deployMode`, I was met with an 
error message:

```
Exception in thread "main" org.apache.spark.SparkException: Cluster deploy mode 
is not applicable to Spark Connect server.
```

This error message can be traced back to Spark's source code here:
https://github.com/apache/spark/blob/6c885a7cf57df328b03308cff2eed814bda156e4/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala#L307

Given my observations, I'm curious about the Spark Connect Server roadmap:

Is there a plan or current conversation to enable Kubernetes as a master in 
Spark Connect Server's cluster deploy mode?

I have tried to gather information from existing JIRA tickets, but have not 
been able to get a definitive answer:

https://issues.apache.org/jira/browse/SPARK-42730
https://issues.apache.org/jira/browse/SPARK-39375
https://issues.apache.org/jira/browse/SPARK-44117

Any thoughts, updates, or references to similar conversations or initiatives 
would be greatly appreciated.

Thank you for your time and expertise!

Best regards,
Yasukazu

2023年9月5日(火) 12:09 Nagatomi Yasukazu :
Hello Mich,
Thank you for your questions. Here are my responses:

> 1. What investigation have you done to show that it is running in local mode?

I have verified through the History Server's Environment tab that:
- "spark.master" is set to local[*]
- "spark.app.id" begins with local-xxx
- "spark.submit.deployMode" is set to local


> 2. who has configured this kubernetes cluster? Is it supplied by a cloud 
> vendor?

Our Kubernetes cluster was set up in an on-prem environment using RKE2( 
https://docs.rke2.io/ ).


> 3. Confirm that you have configured Spark Connect Server correctly for 
> cluster mode. Make sure you specify the cluster manager (e.g., Kubernetes) 
> and other relevant Spark configurations in your Spark job submission.

Based on the Spark Connect documentation I've read, there doesn't seem to be 
any specific settings for cluster mode related to the Spark Connect Server.

Configuration - Spark 3.4.1 Documentation
https://spark.apache.org/docs/3.4.1/configuration.html#spark-connect

Quickstart: Spark Connect — PySpark 3.4.1 documentation
https://spark.apache.org/docs/latest/api/python/getting_started/quickstart_connect.html

Spark Connect Overview - Spark 3.4.1 Documentation
https://spark.apache.org/docs/latest/spark-connect-overview.html

The documentation only suggests running ./sbin/start-connect-server.sh 
--packages org.apache.spark:spark-connect_2.12:3.4.0, leaving me at a loss.


> 4. Can you provide a full spark submit command

Given the nature of Spark Connect, I don't use the spark-submit command. 
Instead, as per the documentation, I can execute workloads using only a Python 
script. For the Spark Connect Server, I have a Kubernetes manifest executing 
"/opt.spark/sbin/start-connect-server.sh --packages 
org.apache.spark:spark-connect_2.12:3.4.0".


> 5. Make sure that the Python client script connecting to Spark Connect Server 
> specifies the cluster mode explicitly, like using --master or --deploy-mode 
> flags when creating a SparkSession.

The Spark Connect Server operates as a Driver, so it isn't possible to specify 
the --master or --deploy-mode flags in the Python client script. If I try, I 
encounter a RuntimeError.

like this:
RuntimeError: Spark master cannot be configured with Spark Connect server; 
however, found URL for Spark Connect [sc://.../]


> 6. Ensure that you have allocated the necessary resources (CPU, memory etc) 
> to Spark Connect Server when running it on Kubernetes.

Resources are ample, so that shouldn't be the problem.


> 7. Review the environment variables and configurations you have set, 
> including the SPARK_NO_DAEMONIZE=1 variable. Ensure that these variables are 
> not conflicting with 

I'm unsure if SPARK_NO_DAEMONIZE=1 conflicts with cluster mode settings. But 
without it, the process goes to the background when executing 
start-connect-server.sh, causing the Pod to terminate prematurely.


> 8. Are you using the correct spark client version that is fully compatible 
> with your spark on the server?

Yes, I have verified that without using Spark Connect (e.g., using Spark 
Operator), Spark applications run as expected.

> 9. check the kubernetes error logs

The Kubernetes logs don't show any errors, and jobs are running in local mode.


> 10. Insufficient resources can lead to the application running in local mode

I wasn't aware that insufficient resources 

Re: hive: spark as execution engine. class not found problem

2023-10-17 Thread Vijay Shankar
UNSUBSCRIBE

On Tue, Oct 17, 2023 at 5:09 PM Amirhossein Kabiri <
amirhosseikab...@gmail.com> wrote:

> I used Ambari to config and install Hive and Spark. I want to insert into
> a hive table using Spark execution Engine but I face to this weird error.
> The error is:
>
> Job failed with java.lang.ClassNotFoundException:
> ive_20231017100559_301568f9-bdfa-4f7c-89a6-f69a65b30aaf:1
> 2023-10-17 10:07:42,972 ERROR [c4aeb932-743e-4736-b00f-6b905381fa03 main]
> status.SparkJobMonitor: Job failed with java.lang.ClassNotFoundException:
> ive_20231017100559_301568f9-bdfa-4f7c-89a6-f69a65b30aaf:1
> com.esotericsoftware.kryo.KryoException: Unable to find class:
> ive_20231017100559_301568f9-bdfa-4f7c-89a6-f69a65b30aaf:1
> Serialization trace:
> invertedWorkGraph (org.apache.hadoop.hive.ql.plan.SparkWork)
> at
> com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:160)
> at
> com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:133)
> at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:693)
> at
> org.apache.hadoop.hive.ql.exec.SerializationUtilities$KryoWithHooks.readClass(SerializationUtilities.java:181)
> at
> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:118)
> at
> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:543)
> at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:709)
> at
> org.apache.hadoop.hive.ql.exec.SerializationUtilities$KryoWithHooks.readObject(SerializationUtilities.java:206)
> at
> org.apache.hadoop.hive.ql.exec.spark.KryoSerializer.deserialize(KryoSerializer.java:60)
> at
> org.apache.hadoop.hive.ql.exec.spark.RemoteHiveSparkClient$JobStatusJob.call(RemoteHiveSparkClient.java:329)
> at
> org.apache.hive.spark.client.RemoteDriver$JobWrapper.call(RemoteDriver.java:378)
> at
> org.apache.hive.spark.client.RemoteDriver$JobWrapper.call(RemoteDriver.java:343)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.ClassNotFoundException:
> ive_20231017100559_301568f9-bdfa-4f7c-89a6-f69a65b30aaf:1
> at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
> at java.lang.Class.forName0(Native Method)
> at java.lang.Class.forName(Class.java:348)
> at
> com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:154)
> ... 15 more
>
> 2023-10-17 10:07:43,067 INFO  [c4aeb932-743e-4736-b00f-6b905381fa03 main]
> reexec.ReOptimizePlugin: ReOptimization: retryPossible: false
> FAILED: Execution Error, return code 3 from
> org.apache.hadoop.hive.ql.exec.spark.SparkTask. Spark job failed during
> runtime. Please check stacktrace for the root cause.
>
> the weird part is Hive make this itself and asks me where to find it! I
> would appreciate any helps to solve and locate the problem.
>
> note: The Ambari, Hadoop, Hive, Zookeeper and Spark Works Well according
> to the Ambari service health check.
> note: Since I didnt find any spark specific hive-site.xml I added the
> following configs to the hive-site.xml file:
> 
>   hive.execution.engine
>   spark
> 
>
> 
>   hive.spark.warehouse.location
>   /tmp/spark/warehouse
> 
>
> 
>   hive.spark.sql.execution.mode
>   adaptive
> 
>
> 
>   hive.spark.sql.shuffle.partitions
>   200
> 
>
> 
>   hive.spark.sql.shuffle.partitions.pernode
>   2
> 
>
> 
>   hive.spark.sql.memory.fraction
>   0.6
> 
>
> 
>   hive.spark.sql.codegen.enabled
>   true
> 
>
> 
>   spark.sql.hive.hiveserver2.jdbc.url
>   jdbc:hive2://my.ambari.com:2181
> /;serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=hiveserver2
> 
>
> 
>   spark.datasource.hive.warehouse.load.staging.dir
>   /tmp
> 
>
>
> 
>   spark.hadoop.hive.zookeeper.quorum
>   my.ambari.com:2181
> 
>
> 
>
> spark.datasource.hive.warehouse.write.path.strictColumnNamesMapping
>   true
> 
>
> 
>   spark.sql.hive.conf.list
>
> hive.vectorized.execution.filesink.arrow.native.enabled=true;hive.vectorized.execution.enabled=true
> 
>
> 
>   hive.spark.client.connect.timeout
>   3ms
> 
>
> 
>   hive.spark.client.server.connect.timeout
>   30ms
>
> 
> hive.hook.proto.base-directory
> /tmp/hive/hooks
>   
>   
> hive.spark.sql.shuffle.partitions
> 200
>   
>   
> hive.strict.managed.tables
> true
>   
>   
> hive.stats.fetch.partition.stats
> true
>   
>   
> hive.spark.sql.memory.fraction
> 0.6
>   
>   
> hive.spark.sql.execution.mode
> spark
>   
>   
> hive.spark.sql.codegen.enabled
> 

hive: spark as execution engine. class not found problem

2023-10-17 Thread Amirhossein Kabiri
I used Ambari to config and install Hive and Spark. I want to insert into a
hive table using Spark execution Engine but I face to this weird error. The
error is:

Job failed with java.lang.ClassNotFoundException:
ive_20231017100559_301568f9-bdfa-4f7c-89a6-f69a65b30aaf:1
2023-10-17 10:07:42,972 ERROR [c4aeb932-743e-4736-b00f-6b905381fa03 main]
status.SparkJobMonitor: Job failed with java.lang.ClassNotFoundException:
ive_20231017100559_301568f9-bdfa-4f7c-89a6-f69a65b30aaf:1
com.esotericsoftware.kryo.KryoException: Unable to find class:
ive_20231017100559_301568f9-bdfa-4f7c-89a6-f69a65b30aaf:1
Serialization trace:
invertedWorkGraph (org.apache.hadoop.hive.ql.plan.SparkWork)
at
com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:160)
at
com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:133)
at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:693)
at
org.apache.hadoop.hive.ql.exec.SerializationUtilities$KryoWithHooks.readClass(SerializationUtilities.java:181)
at
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:118)
at
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:543)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:709)
at
org.apache.hadoop.hive.ql.exec.SerializationUtilities$KryoWithHooks.readObject(SerializationUtilities.java:206)
at
org.apache.hadoop.hive.ql.exec.spark.KryoSerializer.deserialize(KryoSerializer.java:60)
at
org.apache.hadoop.hive.ql.exec.spark.RemoteHiveSparkClient$JobStatusJob.call(RemoteHiveSparkClient.java:329)
at
org.apache.hive.spark.client.RemoteDriver$JobWrapper.call(RemoteDriver.java:378)
at
org.apache.hive.spark.client.RemoteDriver$JobWrapper.call(RemoteDriver.java:343)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ClassNotFoundException:
ive_20231017100559_301568f9-bdfa-4f7c-89a6-f69a65b30aaf:1
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at
com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:154)
... 15 more

2023-10-17 10:07:43,067 INFO  [c4aeb932-743e-4736-b00f-6b905381fa03 main]
reexec.ReOptimizePlugin: ReOptimization: retryPossible: false
FAILED: Execution Error, return code 3 from
org.apache.hadoop.hive.ql.exec.spark.SparkTask. Spark job failed during
runtime. Please check stacktrace for the root cause.

the weird part is Hive make this itself and asks me where to find it! I
would appreciate any helps to solve and locate the problem.

note: The Ambari, Hadoop, Hive, Zookeeper and Spark Works Well according to
the Ambari service health check.
note: Since I didnt find any spark specific hive-site.xml I added the
following configs to the hive-site.xml file:

  hive.execution.engine
  spark



  hive.spark.warehouse.location
  /tmp/spark/warehouse



  hive.spark.sql.execution.mode
  adaptive



  hive.spark.sql.shuffle.partitions
  200



  hive.spark.sql.shuffle.partitions.pernode
  2



  hive.spark.sql.memory.fraction
  0.6



  hive.spark.sql.codegen.enabled
  true



  spark.sql.hive.hiveserver2.jdbc.url
  jdbc:hive2://my.ambari.com:2181
/;serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=hiveserver2



  spark.datasource.hive.warehouse.load.staging.dir
  /tmp




  spark.hadoop.hive.zookeeper.quorum
  my.ambari.com:2181




spark.datasource.hive.warehouse.write.path.strictColumnNamesMapping
  true



  spark.sql.hive.conf.list

hive.vectorized.execution.filesink.arrow.native.enabled=true;hive.vectorized.execution.enabled=true



  hive.spark.client.connect.timeout
  3ms



  hive.spark.client.server.connect.timeout
  30ms


hive.hook.proto.base-directory
/tmp/hive/hooks
  
  
hive.spark.sql.shuffle.partitions
200
  
  
hive.strict.managed.tables
true
  
  
hive.stats.fetch.partition.stats
true
  
  
hive.spark.sql.memory.fraction
0.6
  
  
hive.spark.sql.execution.mode
spark
  
  
hive.spark.sql.codegen.enabled
true
  
  
hive.heapsize
2g
  
  
hive.spark.sql.shuffle.partitions.pernode
100
  
  
hive.spark.warehouse.location
/user/hive/warehouse
  



Re: Spark stand-alone mode

2023-10-17 Thread Ilango
Hi all,

Thanks a lot for your suggestions and knowledge sharing. I like to let you
know that, I completed setting up the stand alone cluster and couple of
data science users are able to use it already for last two weeks. And the
performance is really good. Almost 10X performance improvement compare to
HPC local mode. They tested with some complex data science scripts using
spark and other data science projects. The cluster is really stable and
very performant.

I enabled dynamic allocation and cap the memory and cpu accordingly at
spark-defaults. Conf and at our spark framework code. So its been pretty
impressive for the last few weeks.

Thanks you so much!

Thanks,
Elango


On Tue, 19 Sep 2023 at 6:40 PM, Patrick Tucci 
wrote:

> Multiple applications can run at once, but you need to either configure
> Spark or your applications to allow that. In stand-alone mode, each
> application attempts to take all resources available by default. This
> section of the documentation has more details:
>
>
> https://spark.apache.org/docs/latest/spark-standalone.html#resource-scheduling
>
> Explicitly setting the resources per application limits the resources to
> the configured values for the lifetime of the application. You can use
> dynamic allocation to allow Spark to scale the resources up and down per
> application based on load, but the configuration is relatively more complex:
>
>
> https://spark.apache.org/docs/latest/job-scheduling.html#dynamic-resource-allocation
>
> On Mon, Sep 18, 2023 at 3:53 PM Ilango  wrote:
>
>>
>> Thanks all for your suggestions. Noted with thanks.
>> Just wanted share few more details about the environment
>> 1. We use NFS for data storage and data is in parquet format
>> 2. All HPC nodes are connected and already work as a cluster for Studio
>> workbench. I can setup password less SSH if it not exist already.
>> 3. We will stick with NFS for now and stand alone then may be will
>> explore HDFS and YARN.
>>
>> Can you please confirm whether multiple users can run spark jobs at the
>> same time?
>> If so I will start working on it and let you know how it goes
>>
>> Mich, the link to Hadoop is not working. Can you please check and let me
>> know the correct link. Would like to explore Hadoop option as well.
>>
>>
>>
>> Thanks,
>> Elango
>>
>> On Sat, Sep 16, 2023, 4:20 AM Bjørn Jørgensen 
>> wrote:
>>
>>> you need to setup ssh without password, use key instead.  How to
>>> connect without password using SSH (passwordless)
>>> 
>>>
>>> fre. 15. sep. 2023 kl. 20:55 skrev Mich Talebzadeh <
>>> mich.talebza...@gmail.com>:
>>>
 Hi,

 Can these 4 nodes talk to each other through ssh as trusted hosts (on
 top of the network that Sean already mentioned)? Otherwise you need to set
 it up. You can install a LAN if you have another free port at the back of
 your HPC nodes. They should

 You ought to try to set up a Hadoop cluster pretty easily. Check this
 old article of mine for Hadoop set-up.


 https://www.linkedin.com/pulse/diy-festive-season-how-install-configure-big-data-so-mich/?trackingId=z7n5tx7tQOGK9tcG9VClkw%3D%3D

 Hadoop will provide you with a common storage layer (HDFS) that these
 nodes will be able to share and talk. Yarn is your best bet as the resource
 manager with reasonably powerful hosts you have. However, for now the Stand
 Alone mode will do. Make sure that the Metastore you choose, (by default it
 will use Hive Metastore called Derby :( ) is something respetable like
 Postgres DB that can handle multiple concurrent spark jobs

 HTH


 Mich Talebzadeh,
 Distinguished Technologist, Solutions Architect & Engineer
 London
 United Kingdom


view my Linkedin profile
 


  https://en.everybodywiki.com/Mich_Talebzadeh



 *Disclaimer:* Use it at your own risk. Any and all responsibility for
 any loss, damage or destruction of data or any other property which may
 arise from relying on this email's technical content is explicitly
 disclaimed. The author will in no case be liable for any monetary damages
 arising from such loss, damage or destruction.




 On Fri, 15 Sept 2023 at 07:04, Ilango  wrote:

>
> Hi all,
>
> We have 4 HPC nodes and installed spark individually in all nodes.
>
> Spark is used as local mode(each driver/executor will have 8 cores and
> 65 GB) in Sparklyr/pyspark using Rstudio/Posit workbench. Slurm is used as
> scheduler.
>
> As this is local mode, we are facing performance issue(as only one
> executor) when it comes dealing with large datasets.
>
> Can I convert this 4 nodes into spark standalone cluster. We dont have
> hadoop so yarn mode is out of 

Re: Can not complete the read csv task

2023-10-14 Thread Khalid Mammadov
This command only defines a new DataFrame, in order to see some results you
need to do something like merged_spark_data.show() on a new line.

Regarding the error I think it's typical error that you get when you run
Spark on Windows OS. You can suppress it using Winutils tool (Google it or
ChatGPT it to see how).

On Thu, 12 Oct 2023, 11:58 Kelum Perera,  wrote:

> Dear friends,
>
> I'm trying to get a fresh start with Spark. I tried to read few CSV files
> in a folder, but the task got stuck and not completed as shown in the
> copied content from the terminal.
>
> Can someone help to understand what is going wrong?
>
> Versions;
> java version "11.0.16" 2022-07-19 LTS
> Java(TM) SE Runtime Environment 18.9 (build 11.0.16+11-LTS-199)
> Java HotSpot(TM) 64-Bit Server VM 18.9 (build 11.0.16+11-LTS-199, mixed
> mode)
>
> Python 3.9.13
> Windows 10
>
> Copied from the terminal;
>     __
>  / __/__  ___ _/ /__
> _\ \/ _ \/ _ `/ __/  '_/
>/__ / .__/\_,_/_/ /_/\_\   version 3.5.0
>   /_/
>
> Using Python version 3.9.13 (main, Aug 25 2022 23:51:50)
> Spark context Web UI available at http://LK510FIDSLW4.ey.net:4041
> Spark context available as 'sc' (master = local[*], app id =
> local-1697089858181).
> SparkSession available as 'spark'.
> >>> merged_spark_data =
> spark.read.csv(r"C:\Users\Kelum.Perera\Downloads\data-master\nyse_all\nyse_data\*",
> header=False )
> Exception in thread "globPath-ForkJoinPool-1-worker-115"
> java.lang.UnsatisfiedLinkError:
> org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Ljava/lang/String;I)Z
> at org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Native
> Method)
> at
> org.apache.hadoop.io.nativeio.NativeIO$Windows.access(NativeIO.java:793)
> at org.apache.hadoop.fs.FileUtil.canRead(FileUtil.java:1249)
> at org.apache.hadoop.fs.FileUtil.list(FileUtil.java:1454)
> at
> org.apache.hadoop.fs.RawLocalFileSystem.listStatus(RawLocalFileSystem.java:601)
> at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1972)
> at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:2014)
> at
> org.apache.hadoop.fs.ChecksumFileSystem.listStatus(ChecksumFileSystem.java:761)
> at org.apache.hadoop.fs.Globber.listStatus(Globber.java:128)
> at org.apache.hadoop.fs.Globber.doGlob(Globber.java:291)
> at org.apache.hadoop.fs.Globber.glob(Globber.java:202)
> at org.apache.hadoop.fs.FileSystem.globStatus(FileSystem.java:2124)
> at
> org.apache.spark.deploy.SparkHadoopUtil.globPath(SparkHadoopUtil.scala:238)
> at
> org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$checkAndGlobPathIfNecessary$3(DataSource.scala:737)
> at
> org.apache.spark.util.ThreadUtils$.$anonfun$parmap$2(ThreadUtils.scala:380)
> at scala.concurrent.Future$.$anonfun$apply$1(Future.scala:659)
> at scala.util.Success.$anonfun$map$1(Try.scala:255)
> at scala.util.Success.map(Try.scala:213)
> at scala.concurrent.Future.$anonfun$map$1(Future.scala:292)
> at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:33)
> at
> scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:33)
> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)
> at
> java.base/java.util.concurrent.ForkJoinTask$RunnableExecuteAction.exec(ForkJoinTask.java:1426)
> at
> java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290)
> at
> java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020)
> at
> java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1656)
> at
> java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1594)
> at
> java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183)
>
>
>
> Noting happens afterwards. Appreciate your kind input to solve this.
>
> Best Regards,
> Kelum Perera
>
>
>
>


[ANNOUNCE] Apache Celeborn(incubating) 0.3.1 available

2023-10-13 Thread Cheng Pan
Hi all,

Apache Celeborn(Incubating) community is glad to announce the
new release of Apache Celeborn(Incubating) 0.3.1.

Celeborn is dedicated to improving the efficiency and elasticity of
different map-reduce engines and provides an elastic, high-efficient
service for intermediate data including shuffle data, spilled data,
result data, etc.

Download Link: https://celeborn.apache.org/download/

GitHub Release Tag:
- https://github.com/apache/incubator-celeborn/releases/tag/v0.3.1-incubating

Release Notes:
- https://celeborn.apache.org/community/release_notes/release_note_0.3.1

Home Page: https://celeborn.apache.org/

Celeborn Resources:
- Issue Management: https://issues.apache.org/jira/projects/CELEBORN
- Mailing List: d...@celeborn.apache.org

Thanks,
Cheng Pan
On behalf of the Apache Celeborn(incubating) community




-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Fwd: Fw: Can not complete the read csv task

2023-10-13 Thread KP Youtuber
Dear group members,

I'm trying to get a fresh start with Spark, but came a cross following
issue;

I tried to read few CSV files from a folder, but the task got stuck and
didn't complete. ( copied content from the terminal.)

Can someone help to understand what is going wrong?

Versions;
java version "11.0.16" 2022-07-19 LTS
Java(TM) SE Runtime Environment 18.9 (build 11.0.16+11-LTS-199)
Java HotSpot(TM) 64-Bit Server VM 18.9 (build 11.0.16+11-LTS-199, mixed
mode)

Python 3.9.13
Windows 10

Copied from the terminal;
    __
 / __/__  ___ _/ /__
_\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 3.5.0
  /_/

Using Python version 3.9.13 (main, Aug 25 2022 23:51:50)
Spark context Web UI available at http://LK510FIDSLW4.ey.net:4041
Spark context available as 'sc' (master = local[*], app id =
local-1697089858181).
SparkSession available as 'spark'.
>>> merged_spark_data =
spark.read.csv(r"C:\Users\Kelum.Perera\Downloads\data-master\nyse_all\nyse_data\*",
header=False )
Exception in thread "globPath-ForkJoinPool-1-worker-115"
java.lang.UnsatisfiedLinkError:
org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Ljava/lang/String;I)Z
at org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Native
Method)
at
org.apache.hadoop.io.nativeio.NativeIO$Windows.access(NativeIO.java:793)
at org.apache.hadoop.fs.FileUtil.canRead(FileUtil.java:1249)
at org.apache.hadoop.fs.FileUtil.list(FileUtil.java:1454)
at
org.apache.hadoop.fs.RawLocalFileSystem.listStatus(RawLocalFileSystem.java:601)
at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1972)
at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:2014)
at
org.apache.hadoop.fs.ChecksumFileSystem.listStatus(ChecksumFileSystem.java:761)
at org.apache.hadoop.fs.Globber.listStatus(Globber.java:128)
at org.apache.hadoop.fs.Globber.doGlob(Globber.java:291)
at org.apache.hadoop.fs.Globber.glob(Globber.java:202)
at org.apache.hadoop.fs.FileSystem.globStatus(FileSystem.java:2124)
at
org.apache.spark.deploy.SparkHadoopUtil.globPath(SparkHadoopUtil.scala:238)
at
org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$checkAndGlobPathIfNecessary$3(DataSource.scala:737)
at
org.apache.spark.util.ThreadUtils$.$anonfun$parmap$2(ThreadUtils.scala:380)
at scala.concurrent.Future$.$anonfun$apply$1(Future.scala:659)
at scala.util.Success.$anonfun$map$1(Try.scala:255)
at scala.util.Success.map(Try.scala:213)
at scala.concurrent.Future.$anonfun$map$1(Future.scala:292)
at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:33)
at
scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:33)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)
at
java.base/java.util.concurrent.ForkJoinTask$RunnableExecuteAction.exec(ForkJoinTask.java:1426)
at
java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290)
at
java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020)
at
java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1656)
at
java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1594)
at
java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183)



Noting happens afterwards. Appreciate your kind input to solve this.

Best Regards,
Kelum Perera


Fw: Can not complete the read csv task

2023-10-13 Thread Kelum Perera


From: Kelum Perera 
Sent: Thursday, October 12, 2023 11:40 AM
To: user@spark.apache.org ; Kelum Perera 
; Kelum Gmail 
Subject: Can not complete the read csv task

Dear friends,

I'm trying to get a fresh start with Spark. I tried to read few CSV files in a 
folder, but the task got stuck and not completed as shown in the copied content 
from the terminal.

Can someone help to understand what is going wrong?

Versions;
java version "11.0.16" 2022-07-19 LTS
Java(TM) SE Runtime Environment 18.9 (build 11.0.16+11-LTS-199)
Java HotSpot(TM) 64-Bit Server VM 18.9 (build 11.0.16+11-LTS-199, mixed mode)

Python 3.9.13
Windows 10

Copied from the terminal;
    __
 / __/__  ___ _/ /__
_\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 3.5.0
  /_/

Using Python version 3.9.13 (main, Aug 25 2022 23:51:50)
Spark context Web UI available at http://LK510FIDSLW4.ey.net:4041
Spark context available as 'sc' (master = local[*], app id = 
local-1697089858181).
SparkSession available as 'spark'.
>>> merged_spark_data = 
>>> spark.read.csv(r"C:\Users\Kelum.Perera\Downloads\data-master\nyse_all\nyse_data\*",
>>>  header=False )
Exception in thread "globPath-ForkJoinPool-1-worker-115" 
java.lang.UnsatisfiedLinkError: 
org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Ljava/lang/String;I)Z
at org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Native Method)
at 
org.apache.hadoop.io.nativeio.NativeIO$Windows.access(NativeIO.java:793)
at org.apache.hadoop.fs.FileUtil.canRead(FileUtil.java:1249)
at org.apache.hadoop.fs.FileUtil.list(FileUtil.java:1454)
at 
org.apache.hadoop.fs.RawLocalFileSystem.listStatus(RawLocalFileSystem.java:601)
at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1972)
at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:2014)
at 
org.apache.hadoop.fs.ChecksumFileSystem.listStatus(ChecksumFileSystem.java:761)
at org.apache.hadoop.fs.Globber.listStatus(Globber.java:128)
at org.apache.hadoop.fs.Globber.doGlob(Globber.java:291)
at org.apache.hadoop.fs.Globber.glob(Globber.java:202)
at org.apache.hadoop.fs.FileSystem.globStatus(FileSystem.java:2124)
at 
org.apache.spark.deploy.SparkHadoopUtil.globPath(SparkHadoopUtil.scala:238)
at 
org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$checkAndGlobPathIfNecessary$3(DataSource.scala:737)
at 
org.apache.spark.util.ThreadUtils$.$anonfun$parmap$2(ThreadUtils.scala:380)
at scala.concurrent.Future$.$anonfun$apply$1(Future.scala:659)
at scala.util.Success.$anonfun$map$1(Try.scala:255)
at scala.util.Success.map(Try.scala:213)
at scala.concurrent.Future.$anonfun$map$1(Future.scala:292)
at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:33)
at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:33)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)
at 
java.base/java.util.concurrent.ForkJoinTask$RunnableExecuteAction.exec(ForkJoinTask.java:1426)
at 
java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290)
at 
java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020)
at 
java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1656)
at 
java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1594)
at 
java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183)



Noting happens afterwards. Appreciate your kind input to solve this.

Best Regards,
Kelum Perera





Re: [ SPARK SQL ]: UPPER in WHERE condition is not working in Apache Spark 3.5.0 for Mysql ENUM Column

2023-10-13 Thread Suyash Ajmera
This issue is related to CharVarcharCodegenUtils readSidePadding method .

Appending white spaces while reading ENUM data from mysql

Causing issue in querying , writing the same data to Cassandra.

On Thu, 12 Oct, 2023, 7:46 pm Suyash Ajmera, 
wrote:

> I have upgraded my spark job from spark 3.3.1 to spark 3.5.0, I am
> querying to Mysql Database and applying
>
> `*UPPER(col) = UPPER(value)*` in the subsequent sql query. It is working
> as expected in spark 3.3.1 , but not working with 3.5.0.
>
> Where Condition ::  `*UPPER(vn) = 'ERICSSON' AND (upper(st) = 'OPEN' OR
> upper(st) = 'REOPEN' OR upper(st) = 'CLOSED')*`
>
> The *st *column is ENUM in the database and it is causing the issue.
>
> Below is the Physical Plan of *FILTER* phase :
>
> For 3.3.1 :
>
> +- Filter ((upper(vn#11) = ERICSSON) AND (((upper(st#42) = OPEN) OR
> (upper(st#42) = REOPEN)) OR (upper(st#42) = CLOSED)))
>
> For 3.5.0 :
>
> +- Filter ((upper(vn#11) = ERICSSON) AND (((upper(staticinvoke(class
> org.apache.spark.sql.catalyst.util.CharVarcharCodegenUtils, StringType,
> readSidePadding, st#42, 13, true, false, true)) = OPEN) OR
> (upper(staticinvoke(class
> org.apache.spark.sql.catalyst.util.CharVarcharCodegenUtils, StringType,
> readSidePadding, st#42, 13, true, false, true)) = REOPEN)) OR
> (upper(staticinvoke(class
> org.apache.spark.sql.catalyst.util.CharVarcharCodegenUtils, StringType,
> readSidePadding, st#42, 13, true, false, true)) = CLOSED)))
>
> -
>
> I have debug it and found that Spark added a property in version 3.4.0 ,
> i.e. **spark.sql.readSideCharPadding** which has default value **true**.
>
> Link to the JIRA : https://issues.apache.org/jira/browse/SPARK-40697
>
> Added a new method in Class **CharVarcharCodegenUtils**
>
> public static UTF8String readSidePadding(UTF8String inputStr, int limit) {
> int numChars = inputStr.numChars();
> if (numChars == limit) {
>   return inputStr;
> } else if (numChars < limit) {
>   return inputStr.rpad(limit, SPACE);
> } else {
>   return inputStr;
> }
>   }
>
>
> **This method is appending some whitespace padding to the ENUM values
> while reading and causing the Issue.**
>
> ---
>
> When I am removing the UPPER function from the where condition the
> **FILTER** Phase looks like this :
>
>  +- Filter (((staticinvoke(class
> org.apache.spark.sql.catalyst.util.CharVarcharCodegenUtils,
>  StringType, readSidePadding, st#42, 13, true, false, true) = OPEN
> ) OR (staticinvoke(class
> org.apache.spark.sql.catalyst.util.CharVarcharCodegenUtils, StringType,
> readSidePadding, st#42, 13, true, false, true) = REOPEN   )) OR
> (staticinvoke(class
> org.apache.spark.sql.catalyst.util.CharVarcharCodegenUtils, StringType,
> readSidePadding, st#42, 13, true, false, true) = CLOSED   ))
>
>
> **You can see it has added some white space after the value and the query
> runs fine giving the correct result.**
>
> But with the UPPER function I am not getting the data.
>
> --
>
> I have also tried to disable this Property *spark.sql.readSideCharPadding
> = false* with following cases :
>
> 1. With Upper function in where clause :
>It is not pushing the filters to Database and the *query works fine*.
>
>   +- Filter (((upper(st#42) = OPEN) OR (upper(st#42) = REOPEN)) OR
> (upper(st#42) = CLOSED))
>
> 2. But when I am removing the upper function
>
>  *It is pushing the filter to Mysql with the white spaces and I am not
> getting the data. (THIS IS A CAUSING VERY BIG ISSUE)*
>
>   PushedFilters: [*IsNotNull(vn), *EqualTo(vn,ERICSSON),
> *Or(Or(EqualTo(st,OPEN ),EqualTo(st,REOPEN
> )),EqualTo(st,CLOSED   ))]
>
> I cannot move this filter to JDBC read query , also I can't remove this
> UPPER function in the where clause.
>
>
> 
>
> Also I found same data getting written to CASSANDRA with *PADDING .*
>


[ SPARK SQL ]: PPER in WHERE condition is not working in Apache Spark 3.5.0 for Mysql ENUM Column

2023-10-12 Thread Suyash Ajmera
I have upgraded my spark job from spark 3.3.1 to spark 3.5.0, I am querying
to Mysql Database and applying

`*UPPER(col) = UPPER(value)*` in the subsequent sql query. It is working as
expected in spark 3.3.1 , but not working with 3.5.0.

Where Condition ::  `*UPPER(vn) = 'ERICSSON' AND (upper(st) = 'OPEN' OR
upper(st) = 'REOPEN' OR upper(st) = 'CLOSED')*`

The *st *column is ENUM in the database and it is causing the issue.

Below is the Physical Plan of *FILTER* phase :

For 3.3.1 :

+- Filter ((upper(vn#11) = ERICSSON) AND (((upper(st#42) = OPEN) OR
(upper(st#42) = REOPEN)) OR (upper(st#42) = CLOSED)))

For 3.5.0 :

+- Filter ((upper(vn#11) = ERICSSON) AND (((upper(staticinvoke(class
org.apache.spark.sql.catalyst.util.CharVarcharCodegenUtils, StringType,
readSidePadding, st#42, 13, true, false, true)) = OPEN) OR
(upper(staticinvoke(class
org.apache.spark.sql.catalyst.util.CharVarcharCodegenUtils, StringType,
readSidePadding, st#42, 13, true, false, true)) = REOPEN)) OR
(upper(staticinvoke(class
org.apache.spark.sql.catalyst.util.CharVarcharCodegenUtils, StringType,
readSidePadding, st#42, 13, true, false, true)) = CLOSED)))

-

I have debug it and found that Spark added a property in version 3.4.0 ,
i.e. **spark.sql.readSideCharPadding** which has default value **true**.

Link to the JIRA : https://issues.apache.org/jira/browse/SPARK-40697

Added a new method in Class **CharVarcharCodegenUtils**

public static UTF8String readSidePadding(UTF8String inputStr, int limit) {
int numChars = inputStr.numChars();
if (numChars == limit) {
  return inputStr;
} else if (numChars < limit) {
  return inputStr.rpad(limit, SPACE);
} else {
  return inputStr;
}
  }


**This method is appending some whitespace padding to the ENUM values while
reading and causing the Issue.**

---

When I am removing the UPPER function from the where condition the
**FILTER** Phase looks like this :

 +- Filter (((staticinvoke(class
org.apache.spark.sql.catalyst.util.CharVarcharCodegenUtils,
 StringType, readSidePadding, st#42, 13, true, false, true) = OPEN
) OR (staticinvoke(class
org.apache.spark.sql.catalyst.util.CharVarcharCodegenUtils, StringType,
readSidePadding, st#42, 13, true, false, true) = REOPEN   )) OR
(staticinvoke(class
org.apache.spark.sql.catalyst.util.CharVarcharCodegenUtils, StringType,
readSidePadding, st#42, 13, true, false, true) = CLOSED   ))


**You can see it has added some white space after the value and the query
runs fine giving the correct result.**

But with the UPPER function I am not getting the data.

--

I have also tried to disable this Property *spark.sql.readSideCharPadding =
false* with following cases :

1. With Upper function in where clause :
   It is not pushing the filters to Database and the *query works fine*.

  +- Filter (((upper(st#42) = OPEN) OR (upper(st#42) = REOPEN)) OR
(upper(st#42) = CLOSED))

2. But when I am removing the upper function

 *It is pushing the filter to Mysql with the white spaces and I am not
getting the data. (THIS IS A CAUSING VERY BIG ISSUE)*

  PushedFilters: [*IsNotNull(vn), *EqualTo(vn,ERICSSON),
*Or(Or(EqualTo(st,OPEN ),EqualTo(st,REOPEN
)),EqualTo(st,CLOSED   ))]

I cannot move this filter to JDBC read query , also I can't remove this
UPPER function in the where clause.



Also I found same data getting written to CASSANDRA with *PADDING .*


Can not complete the read csv task

2023-10-12 Thread Kelum Perera
Dear friends,

I'm trying to get a fresh start with Spark. I tried to read few CSV files in a 
folder, but the task got stuck and not completed as shown in the copied content 
from the terminal.

Can someone help to understand what is going wrong?

Versions;
java version "11.0.16" 2022-07-19 LTS
Java(TM) SE Runtime Environment 18.9 (build 11.0.16+11-LTS-199)
Java HotSpot(TM) 64-Bit Server VM 18.9 (build 11.0.16+11-LTS-199, mixed mode)

Python 3.9.13
Windows 10

Copied from the terminal;
    __
 / __/__  ___ _/ /__
_\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 3.5.0
  /_/

Using Python version 3.9.13 (main, Aug 25 2022 23:51:50)
Spark context Web UI available at http://LK510FIDSLW4.ey.net:4041
Spark context available as 'sc' (master = local[*], app id = 
local-1697089858181).
SparkSession available as 'spark'.
>>> merged_spark_data = 
>>> spark.read.csv(r"C:\Users\Kelum.Perera\Downloads\data-master\nyse_all\nyse_data\*",
>>>  header=False )
Exception in thread "globPath-ForkJoinPool-1-worker-115" 
java.lang.UnsatisfiedLinkError: 
org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Ljava/lang/String;I)Z
at org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Native Method)
at 
org.apache.hadoop.io.nativeio.NativeIO$Windows.access(NativeIO.java:793)
at org.apache.hadoop.fs.FileUtil.canRead(FileUtil.java:1249)
at org.apache.hadoop.fs.FileUtil.list(FileUtil.java:1454)
at 
org.apache.hadoop.fs.RawLocalFileSystem.listStatus(RawLocalFileSystem.java:601)
at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1972)
at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:2014)
at 
org.apache.hadoop.fs.ChecksumFileSystem.listStatus(ChecksumFileSystem.java:761)
at org.apache.hadoop.fs.Globber.listStatus(Globber.java:128)
at org.apache.hadoop.fs.Globber.doGlob(Globber.java:291)
at org.apache.hadoop.fs.Globber.glob(Globber.java:202)
at org.apache.hadoop.fs.FileSystem.globStatus(FileSystem.java:2124)
at 
org.apache.spark.deploy.SparkHadoopUtil.globPath(SparkHadoopUtil.scala:238)
at 
org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$checkAndGlobPathIfNecessary$3(DataSource.scala:737)
at 
org.apache.spark.util.ThreadUtils$.$anonfun$parmap$2(ThreadUtils.scala:380)
at scala.concurrent.Future$.$anonfun$apply$1(Future.scala:659)
at scala.util.Success.$anonfun$map$1(Try.scala:255)
at scala.util.Success.map(Try.scala:213)
at scala.concurrent.Future.$anonfun$map$1(Future.scala:292)
at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:33)
at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:33)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)
at 
java.base/java.util.concurrent.ForkJoinTask$RunnableExecuteAction.exec(ForkJoinTask.java:1426)
at 
java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290)
at 
java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020)
at 
java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1656)
at 
java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1594)
at 
java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183)



Noting happens afterwards. Appreciate your kind input to solve this.

Best Regards,
Kelum Perera





Re: Autoscaling in Spark

2023-10-10 Thread Mich Talebzadeh
This has been brought up a few times. I will focus on Spark Structured
Streaming

Autoscaling does not support Spark Structured Streaming (SSS). Why because
streaming jobs are typically long-running jobs that need to maintain state
across micro-batches. Autoscaling is designed to scale up and down Spark
clusters in response to workload changes However, this would cause problems
for Spark Structured Streaming jobs because it would cause the jobs to lose
their state. These jobs continuously process incoming data and update their
state incrementally (see checkpoint directory). Autoscaling, which can
dynamically add or remove worker nodes, would disrupt this stateful
processing. Although Spark itself supports dynamic allocation, (i.e. which
can add or remove executor nodes based on demand), it is not the same as
autoscaling in cloud  like GCP etc like Kubernetes or managed clusters. For
now you need to plan your workload in SSS accordingly.

My general advice, the usual thing to watch  from Spark GUI

Processing Time (Process Rate)  + Reserved Capacity < Batch Interval (Batch
Duration)

If your sink  has an issue absorbing data in a timely manner as per above
formulae, you will see the defect on the Processing Rate

Batch Interval, i.e. the rate at which the upstream source sends messages
through Kafka or other source. We can start by assuming that the rate of
increase in the number of messages processed (processing time) will require
an *additional reserved capacity*. We can anticipate a heuristic 70% (~1SD)
increase in the processing time so in theory you  should be able to handle
all this work below the batch interval.

HTH

Mich Talebzadeh,
Distinguished Technologist, Solutions Architect & Engineer
London
United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Tue, 10 Oct 2023 at 16:11, Kiran Biswal  wrote:

> Hello Experts
>
> Is there any true auto scaling option for spark? The dynamic auto scaling
> works only for batch. Any guidelines on spark streaming  autoscaling and
> how that will be tied to any cluster level autoscaling solutions?
>
> Thanks
>


Autoscaling in Spark

2023-10-10 Thread Kiran Biswal
Hello Experts

Is there any true auto scaling option for spark? The dynamic auto scaling
works only for batch. Any guidelines on spark streaming  autoscaling and
how that will be tied to any cluster level autoscaling solutions?

Thanks


Re: Updating delta file column data

2023-10-10 Thread Mich Talebzadeh
Hi,

Since you mentioned that  there could be duplicate records with the same
unique key in the Delta table, you will need a way to handle these
duplicate records. One approach I can suggest is to use a timestamp to
determine the latest or most relevant record among duplicates, the
so-called op_time column df = df.withColumn("op_time", current_timestamp())
at ingestion time, so you can determine the most relevant record etc

This is the pseudo-code suggestion

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, struct
appName = "DeltaHexToIntConversion"
spark = SparkSession.builder.appName(appName).getOrCreate()
delta_table_path = "path_to_your_delta_table"
df = spark.read.format("delta").load(delta_table_path)
df = df.withColumn(
"exploded_data",
struct(col("data.field1").cast("int").alias("field1_int"),
col("data.field2"))
)
df = df.select("other_columns", "exploded_data.field1_int",
"exploded_data.field2")
# Handling Duplicates:
# Define your logic here to select the most relevant record among
duplicates, say timestamp as mentioned above
df = df.dropDuplicates(["unique_key"], keep="last")
# merge the DataFrame back to the Delta table
df.write.format("delta").mode("mergr").option("mergeSchema",
"true").save(delta_table_path)


HTH

Mich Talebzadeh,
Distinguished Technologist, Solutions Architect & Engineer
London
United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Mon, 9 Oct 2023 at 17:12, Mich Talebzadeh 
wrote:

> In a nutshell, is this what you are trying to do?
>
>
>1. Read the Delta table into a Spark DataFrame.
>2. Explode the string column into a struct column.
>3. Convert the hexadecimal field to an integer.
>4. Write the DataFrame back to the Delta table in merge mode with a
>unique key.
>
> Is this a fair assessment
>
> HTH
>
> Mich Talebzadeh,
> Distinguished Technologist, Solutions Architect & Engineer
> London
> United Kingdom
>
>
>view my Linkedin profile
> 
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Mon, 9 Oct 2023 at 14:46, Karthick Nk  wrote:
>
>> Hi All,
>>
>> I have  mentioned the sample data below and the operation I need to
>> perform over there,
>>
>> I have delta tables with columns, in that columns I have the data in the
>> string data type(contains the struct data),
>>
>> So, I need to update one key value in the struct field data in the string
>> column of the delta table.
>>
>> Note: I can able to explode the string column into the struct field and
>> into the individual field by using the following operation in the spark,
>>
>> [image: image.png]
>>
>> df_new = spark.read.json(df.rdd.map(lambda x: '{"data": x.data }')
>>
>> Could you suggest a possible way to perform the required action in an
>> optimistic way?
>>
>> Note: Please feel free to ask, if you need further information.
>>
>> Thanks & regards,
>> Karthick
>>
>> On Mon, Oct 2, 2023 at 10:53 PM Karthick Nk 
>> wrote:
>>
>>> Hi community members,
>>>
>>> In databricks adls2 delta tables, I need to perform the below operation,
>>> could you help me with your thoughts
>>>
>>>  I have the delta tables with one colum with data type string , which
>>> contains the json data in string data type, I need to do the following
>>> 1. I have to update one particular field value in the json and update it
>>> back in the same column of the data.
>>>
>>> Example :
>>>
>>> In string column, inside json I have one field with value in hexadecimal.
>>> Like { version : ''0xabcd1234"}
>>>
>>> I have to convert this field into corresponding integer value and update
>>> back into same strong column json value.
>>> Note: I have to perform this operation within this column. This column
>>> is basically with data type string in delta table.
>>>
>>> Could you suggest some sample example.
>>>
>>> Thanks in advance.
>>>
>>


Re: Log file location in Spark on K8s

2023-10-09 Thread Prashant Sharma
Hi Sanket,

Driver and executor logs are written to stdout by default, it can be
configured using SPARK_HOME/conf/log4j.properties file. The file including
the entire SPARK_HOME/conf is auto propogateded to all driver and executor
container and mounted as volume.

Thanks

On Mon, 9 Oct, 2023, 5:37 pm Agrawal, Sanket,
 wrote:

> Hi All,
>
>
>
> We are trying to send the spark logs using fluent-bit. We validated that
> fluent-bit is able to move logs of all other pods except the
> driver/executor pods.
>
>
>
> It would be great if someone can guide us where should I look for spark
> logs in Spark on Kubernetes with client/cluster mode deployment.
>
>
>
> Thanks,
> Sanket A.
>
> This message (including any attachments) contains confidential information
> intended for a specific individual and purpose, and is protected by law. If
> you are not the intended recipient, you should delete this message and any
> disclosure, copying, or distribution of this message, or the taking of any
> action based on it, by you is strictly prohibited.
>
> Deloitte refers to a Deloitte member firm, one of its related entities, or
> Deloitte Touche Tohmatsu Limited ("DTTL"). Each Deloitte member firm is a
> separate legal entity and a member of DTTL. DTTL does not provide services
> to clients. Please see www.deloitte.com/about to learn more.
>
> v.E.1
>


Re: Clarification with Spark Structured Streaming

2023-10-09 Thread Danilo Sousa
Unsubscribe

> Em 9 de out. de 2023, à(s) 07:03, Mich Talebzadeh  
> escreveu:
> 
> Hi,
> 
> Please see my responses below:
> 
> 1) In Spark Structured Streaming does commit mean streaming data has been 
> delivered to the sink like Snowflake?
> 
> No. a commit does not refer to data being delivered to a sink like Snowflake 
> or bigQuery. The term commit refers to Spark Structured Streaming (SS) 
> internals. Specifically it means that a micro-batch of data has been 
> processed by SSS. In the checkpoint directory there is a subdirectory called 
> commits that marks the micro-batch process as completed.
> 
> 2) if sinks like Snowflake  cannot absorb or digest streaming data in a 
> timely manner, will there be an impact on spark streaming itself?
> 
> Yes, it can potentially impact SSS. If the sink cannot absorb data in a 
> timely manner, the batches will start to back up in SSS. This can cause Spark 
> to run out of memory and the streaming job to fail. As I understand, Spark 
> will use a combination of memory and disk storage (checkpointing). This can 
> also happen if the network interface between Spark and the sink is disrupted. 
> On the other hand Spark may slow down, as it tries to process the backed-up 
> batches of data. You want to avoid these scenarios.
> 
> HTH
> 
> Mich Talebzadeh,
> Distinguished Technologist, Solutions Architect & Engineer
> London
> United Kingdom
> 
>view my Linkedin profile 
> 
> 
>  https://en.everybodywiki.com/Mich_Talebzadeh
> 
>  
> Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
> damage or destruction of data or any other property which may arise from 
> relying on this email's technical content is explicitly disclaimed. The 
> author will in no case be liable for any monetary damages arising from such 
> loss, damage or destruction.
>  
> 
> 
> On Sun, 8 Oct 2023 at 19:50, ashok34...@yahoo.com.INVALID 
>  wrote:
>> Hello team
>> 
>> 1) In Spark Structured Streaming does commit mean streaming data has been 
>> delivered to the sink like Snowflake?
>> 
>> 2) if sinks like Snowflake  cannot absorb or digest streaming data in a 
>> timely manner, will there be an impact on spark streaming itself?
>> 
>> Thanks
>> 
>> AK



Re: Clarification with Spark Structured Streaming

2023-10-09 Thread Mich Talebzadeh
Your mileage varies. Often there is a flavour of Cloud Data warehouse
already there. CDWs like BigQuery, Redshift, Snowflake and so forth. They
can all do a good job for various degrees

   - Use efficient data types. Choose data types that are efficient for
   Spark to process. For example, use integer data types for columns that
   store integer values.
   - Avoid using complex data types. Complex data types, such as nested
   structs and arrays, can be less efficient for Spark to process.
   - Opt for columnar storage format like Parquet for your sink table.
   Columnar storage is highly efficient for analytical workloads as it allows
   for column-level compression and predicate pushdown.
   - These CDW come with partitioning options. Popular are date or time
   formats that can be used for partitioning. This will reduce the amount of
   data scanned during queries.
   - Some of these CDWs come with native streaming capabilities like
   BigQuery Streaming, I believe Snowflake has Snowpipe Streaming API as well
   (don't know much about it) . These options  enable real-time data ingestion
   and processing, No need for manual batch processing etc.
   - You can batch incoming data for efficiency processing, which can
   improve performance and simplify data handling. Start by configuring your
   Spark Streaming context with an appropriate batch interval. The batch
   interval defines how often Spark will process a batch of data. Choose a
   batch interval that balances latency and throughput based on the
   application's needs. Spark can process batches of data more efficiently
   than it can process individual records.
   - Snowflake says it is serverless and so is BigQuery. They are designed
   to provide a uniform performance regardless of workload. Serverless CDWs
   can efficiently handle both batch and streaming workloads without the need
   for manual resource provisioning.
   - Use materialized views to pre-compute query results, which can improve
   the performance of frequently executed queries. This has been around from
   classics RDBMs

HTH

Mich Talebzadeh,
Distinguished Technologist, Solutions Architect & Engineer
London
United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Mon, 9 Oct 2023 at 17:50, ashok34...@yahoo.com 
wrote:

> Thank you for your feedback Mich.
>
> In general how can one optimise the cloud data warehouses (the sink part),
> to handle streaming Spark data efficiently, avoiding bottlenecks that
> discussed.
>
>
> AK
> On Monday, 9 October 2023 at 11:04:41 BST, Mich Talebzadeh <
> mich.talebza...@gmail.com> wrote:
>
>
> Hi,
>
> Please see my responses below:
>
> 1) In Spark Structured Streaming does commit mean streaming data has been
> delivered to the sink like Snowflake?
>
> No. a commit does not refer to data being delivered to a sink like
> Snowflake or bigQuery. The term commit refers to Spark Structured Streaming
> (SS) internals. Specifically it means that a micro-batch of data has been
> processed by SSS. In the checkpoint directory there is a
> subdirectory called commits that marks the micro-batch process as completed.
>
> 2) if sinks like Snowflake  cannot absorb or digest streaming data in a
> timely manner, will there be an impact on spark streaming itself?
>
> Yes, it can potentially impact SSS. If the sink cannot absorb data in a
> timely manner, the batches will start to back up in SSS. This can cause
> Spark to run out of memory and the streaming job to fail. As I understand,
> Spark will use a combination of memory and disk storage (checkpointing).
> This can also happen if the network interface between Spark and the sink is
> disrupted. On the other hand Spark may slow down, as it tries to process
> the backed-up batches of data. You want to avoid these scenarios.
>
> HTH
>
> Mich Talebzadeh,
> Distinguished Technologist, Solutions Architect & Engineer
> London
> United Kingdom
>
>
>view my Linkedin profile
> 
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Sun, 8 Oct 2023 at 19:50, ashok34...@yahoo.com.INVALID
>  wrote:
>
> Hello team
>
> 1) In Spark Structured Streaming does commit mean streaming data 

Re: Clarification with Spark Structured Streaming

2023-10-09 Thread ashok34...@yahoo.com.INVALID
 Thank you for your feedback Mich.
In general how can one optimise the cloud data warehouses (the sink part), to 
handle streaming Spark data efficiently, avoiding bottlenecks that discussed.

AKOn Monday, 9 October 2023 at 11:04:41 BST, Mich Talebzadeh 
 wrote:  
 
 Hi,
Please see my responses below:
1) In Spark Structured Streaming does commit mean streaming data has been 
delivered to the sink like Snowflake?

No. a commit does not refer to data being delivered to a sink like Snowflake or 
bigQuery. The term commit refers to Spark Structured Streaming (SS) internals. 
Specifically it means that a micro-batch of data has been processed by SSS. In 
the checkpoint directory there is a subdirectory called commits that marks the 
micro-batch process as completed.
2) if sinks like Snowflake  cannot absorb or digest streaming data in a timely 
manner, will there be an impact on spark streaming itself?

Yes, it can potentially impact SSS. If the sink cannot absorb data in a timely 
manner, the batches will start to back up in SSS. This can cause Spark to run 
out of memory and the streaming job to fail. As I understand, Spark will use a 
combination of memory and disk storage (checkpointing). This can also happen if 
the network interface between Spark and the sink is disrupted. On the other 
hand Spark may slow down, as it tries to process the backed-up batches of data. 
You want to avoid these scenarios.
HTH
Mich Talebzadeh,Distinguished Technologist, Solutions Architect & Engineer
London
United Kingdom



   view my Linkedin profile




 https://en.everybodywiki.com/Mich_Talebzadeh

 

Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
damage or destructionof data or any other property which may arise from relying 
on this email's technical content is explicitly disclaimed.The author will in 
no case be liable for any monetary damages arising from suchloss, damage or 
destruction. 

 


On Sun, 8 Oct 2023 at 19:50, ashok34...@yahoo.com.INVALID 
 wrote:

Hello team
1) In Spark Structured Streaming does commit mean streaming data has been 
delivered to the sink like Snowflake?
2) if sinks like Snowflake  cannot absorb or digest streaming data in a timely 
manner, will there be an impact on spark streaming itself?
Thanks

AK
  

Re: Updating delta file column data

2023-10-09 Thread Mich Talebzadeh
In a nutshell, is this what you are trying to do?


   1. Read the Delta table into a Spark DataFrame.
   2. Explode the string column into a struct column.
   3. Convert the hexadecimal field to an integer.
   4. Write the DataFrame back to the Delta table in merge mode with a
   unique key.

Is this a fair assessment

HTH

Mich Talebzadeh,
Distinguished Technologist, Solutions Architect & Engineer
London
United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Mon, 9 Oct 2023 at 14:46, Karthick Nk  wrote:

> Hi All,
>
> I have  mentioned the sample data below and the operation I need to
> perform over there,
>
> I have delta tables with columns, in that columns I have the data in the
> string data type(contains the struct data),
>
> So, I need to update one key value in the struct field data in the string
> column of the delta table.
>
> Note: I can able to explode the string column into the struct field and
> into the individual field by using the following operation in the spark,
>
> [image: image.png]
>
> df_new = spark.read.json(df.rdd.map(lambda x: '{"data": x.data }')
>
> Could you suggest a possible way to perform the required action in an
> optimistic way?
>
> Note: Please feel free to ask, if you need further information.
>
> Thanks & regards,
> Karthick
>
> On Mon, Oct 2, 2023 at 10:53 PM Karthick Nk  wrote:
>
>> Hi community members,
>>
>> In databricks adls2 delta tables, I need to perform the below operation,
>> could you help me with your thoughts
>>
>>  I have the delta tables with one colum with data type string , which
>> contains the json data in string data type, I need to do the following
>> 1. I have to update one particular field value in the json and update it
>> back in the same column of the data.
>>
>> Example :
>>
>> In string column, inside json I have one field with value in hexadecimal.
>> Like { version : ''0xabcd1234"}
>>
>> I have to convert this field into corresponding integer value and update
>> back into same strong column json value.
>> Note: I have to perform this operation within this column. This column is
>> basically with data type string in delta table.
>>
>> Could you suggest some sample example.
>>
>> Thanks in advance.
>>
>


Log file location in Spark on K8s

2023-10-09 Thread Agrawal, Sanket
Hi All,

We are trying to send the spark logs using fluent-bit. We validated that 
fluent-bit is able to move logs of all other pods except the driver/executor 
pods.

It would be great if someone can guide us where should I look for spark logs in 
Spark on Kubernetes with client/cluster mode deployment.

Thanks,
Sanket A.

This message (including any attachments) contains confidential information 
intended for a specific individual and purpose, and is protected by law. If you 
are not the intended recipient, you should delete this message and any 
disclosure, copying, or distribution of this message, or the taking of any 
action based on it, by you is strictly prohibited.

Deloitte refers to a Deloitte member firm, one of its related entities, or 
Deloitte Touche Tohmatsu Limited ("DTTL"). Each Deloitte member firm is a 
separate legal entity and a member of DTTL. DTTL does not provide services to 
clients. Please see www.deloitte.com/about to learn more.

v.E.1


Re: Clarification with Spark Structured Streaming

2023-10-09 Thread Mich Talebzadeh
Hi,

Please see my responses below:

1) In Spark Structured Streaming does commit mean streaming data has been
delivered to the sink like Snowflake?

No. a commit does not refer to data being delivered to a sink like
Snowflake or bigQuery. The term commit refers to Spark Structured Streaming
(SS) internals. Specifically it means that a micro-batch of data has been
processed by SSS. In the checkpoint directory there is a
subdirectory called commits that marks the micro-batch process as completed.

2) if sinks like Snowflake  cannot absorb or digest streaming data in a
timely manner, will there be an impact on spark streaming itself?

Yes, it can potentially impact SSS. If the sink cannot absorb data in a
timely manner, the batches will start to back up in SSS. This can cause
Spark to run out of memory and the streaming job to fail. As I understand,
Spark will use a combination of memory and disk storage (checkpointing).
This can also happen if the network interface between Spark and the sink is
disrupted. On the other hand Spark may slow down, as it tries to process
the backed-up batches of data. You want to avoid these scenarios.

HTH

Mich Talebzadeh,
Distinguished Technologist, Solutions Architect & Engineer
London
United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Sun, 8 Oct 2023 at 19:50, ashok34...@yahoo.com.INVALID
 wrote:

> Hello team
>
> 1) In Spark Structured Streaming does commit mean streaming data has been
> delivered to the sink like Snowflake?
>
> 2) if sinks like Snowflake  cannot absorb or digest streaming data in a
> timely manner, will there be an impact on spark streaming itself?
>
> Thanks
>
> AK
>


Re: Updating delta file column data

2023-10-09 Thread Karthick Nk
Hi All,

I have  mentioned the sample data below and the operation I need to perform
over there,

I have delta tables with columns, in that columns I have the data in the
string data type(contains the struct data),

So, I need to update one key value in the struct field data in the string
column of the delta table.

Note: I can able to explode the string column into the struct field and
into the individual field by using the following operation in the spark,

[image: image.png]

df_new = spark.read.json(df.rdd.map(lambda x: '{"data": x.data }')

Could you suggest a possible way to perform the required action in an
optimistic way?

Note: Please feel free to ask, if you need further information.

Thanks & regards,
Karthick

On Mon, Oct 2, 2023 at 10:53 PM Karthick Nk  wrote:

> Hi community members,
>
> In databricks adls2 delta tables, I need to perform the below operation,
> could you help me with your thoughts
>
>  I have the delta tables with one colum with data type string , which
> contains the json data in string data type, I need to do the following
> 1. I have to update one particular field value in the json and update it
> back in the same column of the data.
>
> Example :
>
> In string column, inside json I have one field with value in hexadecimal.
> Like { version : ''0xabcd1234"}
>
> I have to convert this field into corresponding integer value and update
> back into same strong column json value.
> Note: I have to perform this operation within this column. This column is
> basically with data type string in delta table.
>
> Could you suggest some sample example.
>
> Thanks in advance.
>


Clarification with Spark Structured Streaming

2023-10-08 Thread ashok34...@yahoo.com.INVALID
Hello team
1) In Spark Structured Streaming does commit mean streaming data has been 
delivered to the sink like Snowflake?
2) if sinks like Snowflake  cannot absorb or digest streaming data in a timely 
manner, will there be an impact on spark streaming itself?
Thanks

AK

Re: Connection pool shut down in Spark Iceberg Streaming Connector

2023-10-05 Thread Igor Calabria
You might be affected by this issue:
https://github.com/apache/iceberg/issues/8601

It was already patched but it isn't released yet.

On Thu, Oct 5, 2023 at 7:47 PM Prashant Sharma  wrote:

> Hi Sanket, more details might help here.
>
> How does your spark configuration look like?
>
> What exactly was done when this happened?
>
> On Thu, 5 Oct, 2023, 2:29 pm Agrawal, Sanket,
>  wrote:
>
>> Hello Everyone,
>>
>>
>>
>> We are trying to stream the changes in our Iceberg tables stored in AWS
>> S3. We are achieving this through Spark-Iceberg Connector and using JAR
>> files for Spark-AWS. Suddenly we have started receiving error “Connection
>> pool shut down”.
>>
>>
>>
>> Spark Version: 3.4.1
>>
>> Iceberg: 1.3.1
>>
>>
>>
>> Any help or guidance would of great help.
>>
>>
>>
>> Thank You,
>>
>> Sanket A.
>>
>>
>>
>> This message (including any attachments) contains confidential
>> information intended for a specific individual and purpose, and is
>> protected by law. If you are not the intended recipient, you should delete
>> this message and any disclosure, copying, or distribution of this message,
>> or the taking of any action based on it, by you is strictly prohibited.
>>
>> Deloitte refers to a Deloitte member firm, one of its related entities,
>> or Deloitte Touche Tohmatsu Limited ("DTTL"). Each Deloitte member firm is
>> a separate legal entity and a member of DTTL. DTTL does not provide
>> services to clients. Please see www.deloitte.com/about to learn more.
>>
>> v.E.1
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Spark Compatibility with Spring Boot 3.x

2023-10-05 Thread Angshuman Bhattacharya
Thanks Ahmed. I am trying to bring this up with Spark DE community

On Thu, Oct 5, 2023 at 12:32 PM Ahmed Albalawi <
ahmed.albal...@capitalone.com> wrote:

> Hello team,
>
> We are in the process of upgrading one of our apps to Spring Boot 3.x
> while using Spark, and we have encountered an issue with Spark
> compatibility, specifically with Jakarta Servlet. Spring Boot 3.x uses
> Jakarta Servlet while Spark uses Javax Servlet. Can we get some guidance on
> how to upgrade to Spring Boot 3.x while continuing to use Spark.
>
> The specific error is listed below:
>
> java.lang.NoClassDefFoundError: javax/servlet/Servlet
> at org.apache.spark.ui.SparkUI$.create(SparkUI.scala:239)
> at org.apache.spark.SparkContext.(SparkContext.scala:503)
> at org.apache.spark.SparkContext$.getOrCreate(SparkContext.scala:2888)
> at org.apache.spark.SparkContext.getOrCreate(SparkContext.scala)
>
> The error comes up when we try to run a mvn clean install, and the issue is 
> in our test cases. This issue happens specifically when we build our spark 
> session. The line of code it traces down to is as follows:
>
> *session = 
> SparkSession.builder().sparkContext(SparkContext.getOrCreate(sparkConf)).getOrCreate();*
>
> What we have tried:
>
> - We noticed according to this post 
> ,
>  there are no compatible versions of spark using version 5 of the Jakarta 
> Servlet API
>
> - We've tried 
> 
>  using the maven shade plugin to use jakarta instead of javax, but are 
> running into some other issues with this.
> - We've also looked at the following 
> 
>  to use jakarta 4.x with jersey 2.x and still have an issue with the servlet
>
>
> Please let us know if there are any solutions to this issue. Thanks!
>
>
> --
> *Ahmed Albalawi*
>
> Senior Associate Software Engineer • EP2 Tech - CuRE
>
> 571-668-3911 •  1680 Capital One Dr.
>

__



The information contained in this e-mail may be confidential and/or proprietary 
to Capital One and/or its affiliates and may only be used solely in performance 
of work or services for Capital One. The information transmitted herewith is 
intended only for use by the individual or entity to which it is addressed. If 
the reader of this message is not the intended recipient, you are hereby 
notified that any review, retransmission, dissemination, distribution, copying 
or other use of, or taking of any action in reliance upon this information is 
strictly prohibited. If you have received this communication in error, please 
contact the sender and delete the material from your computer.





Re: Spark Compatibility with Spring Boot 3.x

2023-10-05 Thread Sean Owen
I think we already updated this in Spark 4. However for now you would have
to also include a JAR with the jakarta.* classes instead.
You are welcome to try Spark 4 now by building from master, but it's far
from release.

On Thu, Oct 5, 2023 at 11:53 AM Ahmed Albalawi
 wrote:

> Hello team,
>
> We are in the process of upgrading one of our apps to Spring Boot 3.x
> while using Spark, and we have encountered an issue with Spark
> compatibility, specifically with Jakarta Servlet. Spring Boot 3.x uses
> Jakarta Servlet while Spark uses Javax Servlet. Can we get some guidance on
> how to upgrade to Spring Boot 3.x while continuing to use Spark.
>
> The specific error is listed below:
>
> java.lang.NoClassDefFoundError: javax/servlet/Servlet
> at org.apache.spark.ui.SparkUI$.create(SparkUI.scala:239)
> at org.apache.spark.SparkContext.(SparkContext.scala:503)
> at org.apache.spark.SparkContext$.getOrCreate(SparkContext.scala:2888)
> at org.apache.spark.SparkContext.getOrCreate(SparkContext.scala)
>
> The error comes up when we try to run a mvn clean install, and the issue is 
> in our test cases. This issue happens specifically when we build our spark 
> session. The line of code it traces down to is as follows:
>
> *session = 
> SparkSession.builder().sparkContext(SparkContext.getOrCreate(sparkConf)).getOrCreate();*
>
> What we have tried:
>
> - We noticed according to this post 
> ,
>  there are no compatible versions of spark using version 5 of the Jakarta 
> Servlet API
>
> - We've tried 
> 
>  using the maven shade plugin to use jakarta instead of javax, but are 
> running into some other issues with this.
> - We've also looked at the following 
> 
>  to use jakarta 4.x with jersey 2.x and still have an issue with the servlet
>
>
> Please let us know if there are any solutions to this issue. Thanks!
>
>
> --
> *Ahmed Albalawi*
>
> Senior Associate Software Engineer • EP2 Tech - CuRE
>
> 571-668-3911 •  1680 Capital One Dr.
> --
>
> The information contained in this e-mail may be confidential and/or
> proprietary to Capital One and/or its affiliates and may only be used
> solely in performance of work or services for Capital One. The information
> transmitted herewith is intended only for use by the individual or entity
> to which it is addressed. If the reader of this message is not the intended
> recipient, you are hereby notified that any review, retransmission,
> dissemination, distribution, copying or other use of, or taking of any
> action in reliance upon this information is strictly prohibited. If you
> have received this communication in error, please contact the sender and
> delete the material from your computer.
>
>
>
>
>


Spark Compatibility with Spring Boot 3.x

2023-10-05 Thread Ahmed Albalawi
Hello team,

We are in the process of upgrading one of our apps to Spring Boot 3.x while
using Spark, and we have encountered an issue with Spark compatibility,
specifically with Jakarta Servlet. Spring Boot 3.x uses Jakarta Servlet
while Spark uses Javax Servlet. Can we get some guidance on how to upgrade
to Spring Boot 3.x while continuing to use Spark.

The specific error is listed below:

java.lang.NoClassDefFoundError: javax/servlet/Servlet
at org.apache.spark.ui.SparkUI$.create(SparkUI.scala:239)
at org.apache.spark.SparkContext.(SparkContext.scala:503)
at org.apache.spark.SparkContext$.getOrCreate(SparkContext.scala:2888)
at org.apache.spark.SparkContext.getOrCreate(SparkContext.scala)

The error comes up when we try to run a mvn clean install, and the
issue is in our test cases. This issue happens specifically when we
build our spark session. The line of code it traces down to is as
follows:

*session = 
SparkSession.builder().sparkContext(SparkContext.getOrCreate(sparkConf)).getOrCreate();*

What we have tried:

- We noticed according to this post
,
there are no compatible versions of spark using version 5 of the
Jakarta Servlet API

- We've tried 

using the maven shade plugin to use jakarta instead of javax, but are
running into some other issues with this.
- We've also looked at the following

to use jakarta 4.x with jersey 2.x and still have an issue with the
servlet


Please let us know if there are any solutions to this issue. Thanks!


-- 
*Ahmed Albalawi*

Senior Associate Software Engineer • EP2 Tech - CuRE

571-668-3911 •  1680 Capital One Dr.

__



The information contained in this e-mail may be confidential and/or proprietary 
to Capital One and/or its affiliates and may only be used solely in performance 
of work or services for Capital One. The information transmitted herewith is 
intended only for use by the individual or entity to which it is addressed. If 
the reader of this message is not the intended recipient, you are hereby 
notified that any review, retransmission, dissemination, distribution, copying 
or other use of, or taking of any action in reliance upon this information is 
strictly prohibited. If you have received this communication in error, please 
contact the sender and delete the material from your computer.





Re: [PySpark Structured Streaming] How to tune .repartition(N) ?

2023-10-05 Thread Mich Talebzadeh
The fact that you have 60 partitions or brokers in kaka  is not directly
correlated  to Spark Structured Streaming (SSS) executors by itself. See
below.

Spark starts with 200 partitions. However, by default, Spark/PySpark
creates partitions that are equal to the number of CPU cores in the node,
the so called vcores. So it depends on the number of nodes you are using in
your spark cluster.

Without doing a PoC you would not need to worry about repartition(10) in
your writeStream. I suggest that for now you remove that parameter and
observe the spark processing through Spark GUI (default port 4040) and in
particular the page on Structured Streaming". Your sink is Delta Lake which
is no different from any other data warehouses such as Google BigQuery.

My general advice, the usual thing to watch  from Spark GUI

Processing Time (Process Rate)  + Reserved Capacity < Batch Interval (Batch
Duration)

If your sink ( Delta Lake) has an issue absorbing data in a timely manner
as per above formulae, you will see the defect on the Processing Rate

Batch Interval, i.e. the rate at which the upstream source sends messages
through Kafka. We can start by assuming that the rate of increase in the
number of messages processed (processing time) will require an additional
reserved capacity. We can anticipate a heuristic 70% (~1SD) increase in the
processing time so in theory you  should be able to handle all this work
below the batch interval.

The parameter which I think many deploy is
spark.streaming.backpressure.enabled
> (spark.conf.set("spark.streaming.backpressure.enabled", "true"). The
central idea is that if a component is struggling to keep up, it should
communicate to upstream components and get them to reduce the load. In the
context of Spark Streaming, the receiver is the upstream component which
gets notified if the executors cannot keep up. There are a number of
occasions this will  (not just necessarily the spike in the incoming
messages). For example:

   - Streaming Source: Unexpected short burst of incoming messages in
   source system
   - YARN: Lost Spark executors due to node(s) failure
   - External Sink System: High load on external systems such as Delta
   Lake, BigQuery etc

Without backpressure, microbatches queue up over time and the scheduling
delay increases (check Operation Duration from GUI).

The next parameter I think of is sparkStreamingBackpressurePidMinRate. It is
 the total records per second. It relies on
spark.streaming.kafka.maxRatePerPartition, (not set), which is the maximum
rate (number of records per second) at which messages will be read from
each Kafka partition.

So  sparkStreamingBackpressurePidMinRate starts with

n (total number of kafka partitions)
* spark.streaming.kafka.maxRatePerPartition * Batch Interval

spark.streaming.kafka.maxRatePerPartition is used to control the maximum
rate of data ingestion from Kafka per partition. Kafka topics can have
multiple partitions, and Spark Streaming processes data in parallel by
reading from these partitions.
If you set spark.streaming.kafka.maxRatePerPartition to 1000, Spark
Streaming will consume data from each Kafka partition at a rate of up to
1000 messages per second.

So in your case if you set it goes something like

60 * 1000 * Batch Interval (in seconds)

Of course I stand corrected.

HTH

Mich Talebzadeh,
Distinguished Technologist, Solutions Architect & Engineer
London
United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Thu, 5 Oct 2023 at 05:54, Shao Yang Hong
 wrote:

> Hi all on user@spark:
>
> We are looking for advice and suggestions on how to tune the
> .repartition() parameter.
>
> We are using Spark Streaming on our data pipeline to consume messages
> and persist them to a Delta Lake
> (https://delta.io/learn/getting-started/).
>
> We read messages from a Kafka topic, then add a generated date column
> as a daily partitioning, and save these records to Delta Lake. We have
> 60 Kafka partitions on the Kafka topic, 15 Spark executor instances
> (so 4 Kafka partitions per executor).
>
> How then, should we use .repartition()? Should we omit this parameter?
> Or set it to 15? or 4?
>
> Our code looks roughly like the below:
>
> ```
> df = (
> spark.readStream.format("kafka")
> .option("kafka.bootstrap.servers", os.environ["KAFKA_BROKERS"])
> .option("subscribe", os.environ["KAFKA_TOPIC"])
> .load()
> )
>
> table = (
> df.select(
> from_protobuf(
> "value", "table", "/opt/protobuf-desc/table.desc"
>   

Re: [PySpark Structured Streaming] How to tune .repartition(N) ?

2023-10-05 Thread Perez
You can try the 'optimize' command of delta lake. That will help you for
sure. It merges small files. Also, it depends on the file format. If you
are working with Parquet then still small files should not cause any issues.

P.

On Thu, Oct 5, 2023 at 10:55 AM Shao Yang Hong
 wrote:

> Hi Raghavendra,
>
> Yes, we are trying to reduce the number of files in delta as well (the
> small file problem [0][1]).
>
> We already have a scheduled app to compact files, but the number of
> files is still large, at 14K files per day.
>
> [0]: https://docs.delta.io/latest/optimizations-oss.html#language-python
> [1]:
> https://delta.io/blog/2023-01-25-delta-lake-small-file-compaction-optimize/
>
> On Thu, Oct 5, 2023 at 12:53 PM Raghavendra Ganesh
>  wrote:
> >
> > Hi,
> > What is the purpose for which you want to use repartition() .. to reduce
> the number of files in delta?
> > Also note that there is an alternative option of using coalesce()
> instead of repartition().
> > --
> > Raghavendra
> >
> >
> > On Thu, Oct 5, 2023 at 10:15 AM Shao Yang Hong <
> shaoyang.h...@ninjavan.co.invalid> wrote:
> >>
> >> Hi all on user@spark:
> >>
> >> We are looking for advice and suggestions on how to tune the
> >> .repartition() parameter.
> >>
> >> We are using Spark Streaming on our data pipeline to consume messages
> >> and persist them to a Delta Lake
> >> (https://delta.io/learn/getting-started/).
> >>
> >> We read messages from a Kafka topic, then add a generated date column
> >> as a daily partitioning, and save these records to Delta Lake. We have
> >> 60 Kafka partitions on the Kafka topic, 15 Spark executor instances
> >> (so 4 Kafka partitions per executor).
> >>
> >> How then, should we use .repartition()? Should we omit this parameter?
> >> Or set it to 15? or 4?
> >>
> >> Our code looks roughly like the below:
> >>
> >> ```
> >> df = (
> >> spark.readStream.format("kafka")
> >> .option("kafka.bootstrap.servers", os.environ["KAFKA_BROKERS"])
> >> .option("subscribe", os.environ["KAFKA_TOPIC"])
> >> .load()
> >> )
> >>
> >> table = (
> >> df.select(
> >> from_protobuf(
> >> "value", "table", "/opt/protobuf-desc/table.desc"
> >> ).alias("msg")
> >> )
> >> .withColumn("uuid", col("msg.uuid"))
> >> # etc other columns...
> >>
> >> # generated column for daily partitioning in Delta Lake
> >> .withColumn(CREATED_DATE,
> >> date_format(from_unixtime("msg.logged_at"), "-MM-dd"))
> >> .drop("msg")
> >> )
> >>
> >> query = (
> >> table
> >> .repartition(10).writeStream
> >> .queryName(APP_NAME)
> >> .outputMode("append")
> >> .format("delta")
> >> .partitionBy(CREATED_DATE)
> >> .option("checkpointLocation", os.environ["CHECKPOINT"])
> >> .start(os.environ["DELTA_PATH"])
> >> )
> >>
> >> query.awaitTermination()
> >> spark.stop()
> >> ```
> >>
> >> Any advice would be appreciated.
> >>
> >> --
> >> Best Regards,
> >> Shao Yang HONG
> >>
> >> -
> >> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> >>
>
>
> --
> Best Regards,
> Shao Yang HONG
> Software Engineer, Pricing, Tech
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Connection pool shut down in Spark Iceberg Streaming Connector

2023-10-05 Thread Prashant Sharma
Hi Sanket, more details might help here.

How does your spark configuration look like?

What exactly was done when this happened?

On Thu, 5 Oct, 2023, 2:29 pm Agrawal, Sanket,
 wrote:

> Hello Everyone,
>
>
>
> We are trying to stream the changes in our Iceberg tables stored in AWS
> S3. We are achieving this through Spark-Iceberg Connector and using JAR
> files for Spark-AWS. Suddenly we have started receiving error “Connection
> pool shut down”.
>
>
>
> Spark Version: 3.4.1
>
> Iceberg: 1.3.1
>
>
>
> Any help or guidance would of great help.
>
>
>
> Thank You,
>
> Sanket A.
>
>
>
> This message (including any attachments) contains confidential information
> intended for a specific individual and purpose, and is protected by law. If
> you are not the intended recipient, you should delete this message and any
> disclosure, copying, or distribution of this message, or the taking of any
> action based on it, by you is strictly prohibited.
>
> Deloitte refers to a Deloitte member firm, one of its related entities, or
> Deloitte Touche Tohmatsu Limited ("DTTL"). Each Deloitte member firm is a
> separate legal entity and a member of DTTL. DTTL does not provide services
> to clients. Please see www.deloitte.com/about to learn more.
>
> v.E.1
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org


Connection pool shut down in Spark Iceberg Streaming Connector

2023-10-05 Thread Agrawal, Sanket
Hello Everyone,

We are trying to stream the changes in our Iceberg tables stored in AWS S3. We 
are achieving this through Spark-Iceberg Connector and using JAR files for 
Spark-AWS. Suddenly we have started receiving error "Connection pool shut down".

Spark Version: 3.4.1
Iceberg: 1.3.1

Any help or guidance would of great help.

Thank You,
Sanket A.


This message (including any attachments) contains confidential information 
intended for a specific individual and purpose, and is protected by law. If you 
are not the intended recipient, you should delete this message and any 
disclosure, copying, or distribution of this message, or the taking of any 
action based on it, by you is strictly prohibited.

Deloitte refers to a Deloitte member firm, one of its related entities, or 
Deloitte Touche Tohmatsu Limited ("DTTL"). Each Deloitte member firm is a 
separate legal entity and a member of DTTL. DTTL does not provide services to 
clients. Please see www.deloitte.com/about to learn more.

v.E.1


spark_iceberg_streaming.logs
Description: spark_iceberg_streaming.logs

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Re: [PySpark Structured Streaming] How to tune .repartition(N) ?

2023-10-04 Thread Shao Yang Hong
Hi Raghavendra,

Yes, we are trying to reduce the number of files in delta as well (the
small file problem [0][1]).

We already have a scheduled app to compact files, but the number of
files is still large, at 14K files per day.

[0]: https://docs.delta.io/latest/optimizations-oss.html#language-python
[1]: https://delta.io/blog/2023-01-25-delta-lake-small-file-compaction-optimize/

On Thu, Oct 5, 2023 at 12:53 PM Raghavendra Ganesh
 wrote:
>
> Hi,
> What is the purpose for which you want to use repartition() .. to reduce the 
> number of files in delta?
> Also note that there is an alternative option of using coalesce() instead of 
> repartition().
> --
> Raghavendra
>
>
> On Thu, Oct 5, 2023 at 10:15 AM Shao Yang Hong 
>  wrote:
>>
>> Hi all on user@spark:
>>
>> We are looking for advice and suggestions on how to tune the
>> .repartition() parameter.
>>
>> We are using Spark Streaming on our data pipeline to consume messages
>> and persist them to a Delta Lake
>> (https://delta.io/learn/getting-started/).
>>
>> We read messages from a Kafka topic, then add a generated date column
>> as a daily partitioning, and save these records to Delta Lake. We have
>> 60 Kafka partitions on the Kafka topic, 15 Spark executor instances
>> (so 4 Kafka partitions per executor).
>>
>> How then, should we use .repartition()? Should we omit this parameter?
>> Or set it to 15? or 4?
>>
>> Our code looks roughly like the below:
>>
>> ```
>> df = (
>> spark.readStream.format("kafka")
>> .option("kafka.bootstrap.servers", os.environ["KAFKA_BROKERS"])
>> .option("subscribe", os.environ["KAFKA_TOPIC"])
>> .load()
>> )
>>
>> table = (
>> df.select(
>> from_protobuf(
>> "value", "table", "/opt/protobuf-desc/table.desc"
>> ).alias("msg")
>> )
>> .withColumn("uuid", col("msg.uuid"))
>> # etc other columns...
>>
>> # generated column for daily partitioning in Delta Lake
>> .withColumn(CREATED_DATE,
>> date_format(from_unixtime("msg.logged_at"), "-MM-dd"))
>> .drop("msg")
>> )
>>
>> query = (
>> table
>> .repartition(10).writeStream
>> .queryName(APP_NAME)
>> .outputMode("append")
>> .format("delta")
>> .partitionBy(CREATED_DATE)
>> .option("checkpointLocation", os.environ["CHECKPOINT"])
>> .start(os.environ["DELTA_PATH"])
>> )
>>
>> query.awaitTermination()
>> spark.stop()
>> ```
>>
>> Any advice would be appreciated.
>>
>> --
>> Best Regards,
>> Shao Yang HONG
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>


-- 
Best Regards,
Shao Yang HONG
Software Engineer, Pricing, Tech

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



[PySpark Structured Streaming] How to tune .repartition(N) ?

2023-10-04 Thread Shao Yang Hong
Hi all on user@spark:

We are looking for advice and suggestions on how to tune the
.repartition() parameter.

We are using Spark Streaming on our data pipeline to consume messages
and persist them to a Delta Lake
(https://delta.io/learn/getting-started/).

We read messages from a Kafka topic, then add a generated date column
as a daily partitioning, and save these records to Delta Lake. We have
60 Kafka partitions on the Kafka topic, 15 Spark executor instances
(so 4 Kafka partitions per executor).

How then, should we use .repartition()? Should we omit this parameter?
Or set it to 15? or 4?

Our code looks roughly like the below:

```
df = (
spark.readStream.format("kafka")
.option("kafka.bootstrap.servers", os.environ["KAFKA_BROKERS"])
.option("subscribe", os.environ["KAFKA_TOPIC"])
.load()
)

table = (
df.select(
from_protobuf(
"value", "table", "/opt/protobuf-desc/table.desc"
).alias("msg")
)
.withColumn("uuid", col("msg.uuid"))
# etc other columns...

# generated column for daily partitioning in Delta Lake
.withColumn(CREATED_DATE,
date_format(from_unixtime("msg.logged_at"), "-MM-dd"))
.drop("msg")
)

query = (
table
.repartition(10).writeStream
.queryName(APP_NAME)
.outputMode("append")
.format("delta")
.partitionBy(CREATED_DATE)
.option("checkpointLocation", os.environ["CHECKPOINT"])
.start(os.environ["DELTA_PATH"])
)

query.awaitTermination()
spark.stop()
```

Any advice would be appreciated.

-- 
Best Regards,
Shao Yang HONG

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: [PySpark Structured Streaming] How to tune .repartition(N) ?

2023-10-04 Thread Raghavendra Ganesh
Hi,
What is the purpose for which you want to use repartition() .. to reduce
the number of files in delta?
Also note that there is an alternative option of using coalesce() instead
of repartition().
--
Raghavendra


On Thu, Oct 5, 2023 at 10:15 AM Shao Yang Hong
 wrote:

> Hi all on user@spark:
>
> We are looking for advice and suggestions on how to tune the
> .repartition() parameter.
>
> We are using Spark Streaming on our data pipeline to consume messages
> and persist them to a Delta Lake
> (https://delta.io/learn/getting-started/).
>
> We read messages from a Kafka topic, then add a generated date column
> as a daily partitioning, and save these records to Delta Lake. We have
> 60 Kafka partitions on the Kafka topic, 15 Spark executor instances
> (so 4 Kafka partitions per executor).
>
> How then, should we use .repartition()? Should we omit this parameter?
> Or set it to 15? or 4?
>
> Our code looks roughly like the below:
>
> ```
> df = (
> spark.readStream.format("kafka")
> .option("kafka.bootstrap.servers", os.environ["KAFKA_BROKERS"])
> .option("subscribe", os.environ["KAFKA_TOPIC"])
> .load()
> )
>
> table = (
> df.select(
> from_protobuf(
> "value", "table", "/opt/protobuf-desc/table.desc"
> ).alias("msg")
> )
> .withColumn("uuid", col("msg.uuid"))
> # etc other columns...
>
> # generated column for daily partitioning in Delta Lake
> .withColumn(CREATED_DATE,
> date_format(from_unixtime("msg.logged_at"), "-MM-dd"))
> .drop("msg")
> )
>
> query = (
> table
> .repartition(10).writeStream
> .queryName(APP_NAME)
> .outputMode("append")
> .format("delta")
> .partitionBy(CREATED_DATE)
> .option("checkpointLocation", os.environ["CHECKPOINT"])
> .start(os.environ["DELTA_PATH"])
> )
>
> query.awaitTermination()
> spark.stop()
> ```
>
> Any advice would be appreciated.
>
> --
> Best Regards,
> Shao Yang HONG
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


[PySpark Structured Streaming] How to tune .repartition(N) ?

2023-10-04 Thread Shao Yang Hong
Hi all on user@spark:

We are looking for advice and suggestions on how to tune the
.repartition() parameter.

We are using Spark Streaming on our data pipeline to consume messages
and persist them to a Delta Lake
(https://delta.io/learn/getting-started/).

We read messages from a Kafka topic, then add a generated date column
as a daily partitioning, and save these records to Delta Lake. We have
60 Kafka partitions on the Kafka topic, 15 Spark executor instances
(so 4 Kafka partitions per executor).

How then, should we use .repartition()? Should we omit this parameter?
Or set it to 15? or 4?

Our code looks roughly like the below:

```
df = (
spark.readStream.format("kafka")
.option("kafka.bootstrap.servers", os.environ["KAFKA_BROKERS"])
.option("subscribe", os.environ["KAFKA_TOPIC"])
.load()
)

table = (
df.select(
from_protobuf(
"value", "table", "/opt/protobuf-desc/table.desc"
).alias("msg")
)
.withColumn("uuid", col("msg.uuid"))
# etc other columns...

# generated column for daily partitioning in Delta Lake
.withColumn(CREATED_DATE,
date_format(from_unixtime("msg.logged_at"), "-MM-dd"))
.drop("msg")
)

query = (
table
.repartition(10).writeStream
.queryName(APP_NAME)
.outputMode("append")
.format("delta")
.partitionBy(CREATED_DATE)
.option("checkpointLocation", os.environ["CHECKPOINT"])
.start(os.environ["DELTA_PATH"])
)

query.awaitTermination()
spark.stop()
```

Any advice would be appreciated.

-- 
Best Regards,
Shao Yang HONG

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



[Spark Core]: Recomputation cost of a job due to executor failures

2023-10-04 Thread Faiz Halde
Hello,

Due to the way Spark implements shuffle, a loss of an executor sometimes
results in the recomputation of partitions that were lost

The definition of a *partition* is the tuple ( RDD-ids, partition id )
RDD-ids is a sequence of RDD ids

In our system, we define the unit of work performed for a job X as

work = count of tasks executed to complete the job X

We want to be able to segregate the *goodput* from this metric

Goodput is defined as - Had there been 0 failures in a cluster, how many
tasks spark had to compute to complete this job

Using the event listener, would the following work?

1. Build a hashmap of type [(RDD-ids, partition), int] with default value =
0
2. For each task T, hashmap[(T.RDD-ids, T.partition-id)] += 1

The assumption here is that spark will never recompute a *partition* twice
( when there are no failures ). Is this assumption true?

So for any entry, a value of greater than 1 means that the particular
partition identified by the tuple ( RDD-ids, partition id ) was recomputed
because spark thought the partition was "lost"

Given the above data structure, the recomputation cost would be
1 - (hashmap.size() / sum(hashmap.values))

Thanks
Faiz


Updating delta file column data

2023-10-02 Thread Karthick Nk
Hi community members,

In databricks adls2 delta tables, I need to perform the below operation,
could you help me with your thoughts

 I have the delta tables with one colum with data type string , which
contains the json data in string data type, I need to do the following
1. I have to update one particular field value in the json and update it
back in the same column of the data.

Example :

In string column, inside json I have one field with value in hexadecimal.
Like { version : ''0xabcd1234"}

I have to convert this field into corresponding integer value and update
back into same strong column json value.
Note: I have to perform this operation within this column. This column is
basically with data type string in delta table.

Could you suggest some sample example.

Thanks in advance.


Re: Seeking Guidance on Spark on Kubernetes Secrets Configuration

2023-10-01 Thread Jon Rodríguez Aranguren
Dear Jörn Franke, Jayabindu Singh and Spark Community members,

Thank you profoundly for your initial insights. I feel it's necessary to
provide more precision on our setup to facilitate a deeper understanding.

We're interfacing with S3 Compatible storages, but our operational context
is somewhat distinct. Our infrastructure doesn't lean on conventional cloud
providers like AWS. Instead, we've architected our environment on
On-Premise Kubernetes distributions, specifically k0s and Openshift.

Our objective extends beyond just handling S3 keys. We're orchestrating a
solution that integrates Azure SPNs, API Credentials, and other sensitive
credentials, intending to make Kubernetes' native secrets our central
management hub. The aspiration is to have a universally deployable JAR, one
that can function unmodified across different ecosystems like EMR,
Databricks (on both AWS and Azure), etc. Platforms like Databricks have
already made strides in this direction, allowing secrets to be woven
directly into the Spark Conf through mechanisms like
{{secret_scope/secret_name}}, which are resolved dynamically.

The spark-on-k8s-operator's user guide suggests the feasibility of mounting
secrets. However, a gap exists in our understanding of how to subsequently
access these mounted secret values within the Spark application's context.

Here lies my inquiry: is the spark-on-k8s-operator currently equipped to
support this level of integration? If it does, any elucidation on the
method or best practices would be pivotal for our project. Alternatively,
if you could point me to resources or community experts who have tackled
similar challenges, it would be of immense assistance.

Thank you for bearing with the intricacies of our query, and I appreciate
your continued guidance in this endeavor.

Warm regards,

Jon Rodríguez Aranguren.

El sáb, 30 sept 2023 a las 23:19, Jayabindu Singh ()
escribió:

> Hi Jon,
>
> Using IAM as suggested by Jorn is the best approach.
> We recently moved our spark workload from HDP to Spark on K8 and utilizing
> IAM.
> It will save you from secret management headaches and also allows a lot
> more flexibility on access control and option to allow access to multiple
> S3 buckets in the same pod.
> We have implemented this across Azure, Google and AWS. Azure does require
> some extra work to make it work.
>
> On Sat, Sep 30, 2023 at 12:05 PM Jörn Franke  wrote:
>
>> Don’t use static iam (s3) credentials. It is an outdated insecure method
>> - even AWS recommend against using this for anything (cf eg
>> https://docs.aws.amazon.com/cli/latest/userguide/cli-authentication-user.html
>> ).
>> It is almost a guarantee to get your data stolen and your account
>> manipulated.
>>
>> If you need to use kubernetes (which has its own very problematic
>> security issues) then assign AWS IAM roles with minimal permissions to the
>> pods (for EKS it means using OIDC, cf
>> https://docs.aws.amazon.com/eks/latest/userguide/service_IAM_role.html).
>>
>> Am 30.09.2023 um 03:41 schrieb Jon Rodríguez Aranguren <
>> jon.r.arangu...@gmail.com>:
>>
>> 
>> Dear Spark Community Members,
>>
>> I trust this message finds you all in good health and spirits.
>>
>> I'm reaching out to the collective expertise of this esteemed community
>> with a query regarding Spark on Kubernetes. As a newcomer, I have always
>> admired the depth and breadth of knowledge shared within this forum, and it
>> is my hope that some of you might have insights on a specific challenge I'm
>> facing.
>>
>> I am currently trying to configure multiple Kubernetes secrets, notably
>> multiple S3 keys, at the SparkConf level for a Spark application. My
>> objective is to understand the best approach or methods to ensure that
>> these secrets can be smoothly accessed by the Spark application.
>>
>> If any of you have previously encountered this scenario or possess
>> relevant insights on the matter, your guidance would be highly beneficial.
>>
>> Thank you for your time and consideration. I'm eager to learn from the
>> experiences and knowledge present within this community.
>>
>> Warm regards,
>> Jon
>>
>>


Re: Seeking Guidance on Spark on Kubernetes Secrets Configuration

2023-10-01 Thread Jörn Franke
There is nowadays more a trend to move away from static credentials/certificates that are stored in a secret vault. The issue is that the rotation of them is complex, once they are leaked they can be abused, making minimal permissions feasible is cumbersome etc. That is why keyless approaches are used for A2A access (workload identity federation was mentioned). E.g. in AWS EKS you would build this on oidc (https://docs.aws.amazon.com/eks/latest/userguide/enable-iam-roles-for-service-accounts.html) and configure this instead of using secrets. Similar approaches exist in other clouds and even on-premise (eg SPIFFE https://spiffe.io/).If this will become the standard will be difficult to say - for sure they seem to more easier to manage.Since you seem to have a Kubernetes setup which means per cloud/data Centre a lot of extra work, infrastructure cost and security issues, workload Identity federation may ease this compared to a secret store.Am 01.10.2023 um 08:27 schrieb Jon Rodríguez Aranguren :Dear Jörn Franke, Jayabindu Singh and Spark Community members,Thank you profoundly for your initial insights. I feel it's necessary to provide more precision on our setup to facilitate a deeper understanding.We're interfacing with S3 Compatible storages, but our operational context is somewhat distinct. Our infrastructure doesn't lean on conventional cloud providers like AWS. Instead, we've architected our environment on On-Premise Kubernetes distributions, specifically k0s and Openshift.Our objective extends beyond just handling S3 keys. We're orchestrating a solution that integrates Azure SPNs, API Credentials, and other sensitive credentials, intending to make Kubernetes' native secrets our central management hub. The aspiration is to have a universally deployable JAR, one that can function unmodified across different ecosystems like EMR, Databricks (on both AWS and Azure), etc. Platforms like Databricks have already made strides in this direction, allowing secrets to be woven directly into the Spark Conf through mechanisms like {{secret_scope/secret_name}}, which are resolved dynamically.The spark-on-k8s-operator's user guide suggests the feasibility of mounting secrets. However, a gap exists in our understanding of how to subsequently access these mounted secret values within the Spark application's context.Here lies my inquiry: is the spark-on-k8s-operator currently equipped to support this level of integration? If it does, any elucidation on the method or best practices would be pivotal for our project. Alternatively, if you could point me to resources or community experts who have tackled similar challenges, it would be of immense assistance.Thank you for bearing with the intricacies of our query, and I appreciate your continued guidance in this endeavor.Warm regards,Jon Rodríguez Aranguren.El sáb, 30 sept 2023 a las 23:19, Jayabindu Singh () escribió:Hi Jon,Using IAM as suggested by Jorn is the best approach.We recently moved our spark workload from HDP to Spark on K8 and utilizing IAM.It will save you from secret management headaches and also allows a lot more flexibility on access control and option to allow access to multiple S3 buckets in the same pod. We have implemented this across Azure, Google and AWS. Azure does require some extra work to make it work.On Sat, Sep 30, 2023 at 12:05 PM Jörn Franke  wrote:Don’t use static iam (s3) credentials. It is an outdated insecure method - even AWS recommend against using this for anything (cf eg https://docs.aws.amazon.com/cli/latest/userguide/cli-authentication-user.html).It is almost a guarantee to get your data stolen and your account manipulated. If you need to use kubernetes (which has its own very problematic security issues) then assign AWS IAM roles with minimal permissions to the pods (for EKS it means using OIDC, cf https://docs.aws.amazon.com/eks/latest/userguide/service_IAM_role.html).Am 30.09.2023 um 03:41 schrieb Jon Rodríguez Aranguren :Dear Spark Community Members,I trust this message finds you all in good health and spirits.I'm reaching out to the collective expertise of this esteemed community with a query regarding Spark on Kubernetes. As a newcomer, I have always admired the depth and breadth of knowledge shared within this forum, and it is my hope that some of you might have insights on a specific challenge I'm facing.I am currently trying to configure multiple Kubernetes secrets, notably multiple S3 keys, at the SparkConf level for a Spark application. My objective is to understand the best approach or methods to ensure that these secrets can be smoothly accessed by the Spark application.If any of you have previously encountered this scenario or possess relevant insights on the matter, your guidance would be highly beneficial.Thank you for your time and consideration. I'm eager to learn from the experiences and knowledge present within this community.Warm regards,Jon




Re: Seeking Guidance on Spark on Kubernetes Secrets Configuration

2023-10-01 Thread Jörn Franke
With oidc sth comparable is possible: https://docs.aws.amazon.com/eks/latest/userguide/enable-iam-roles-for-service-accounts.htmlAm 01.10.2023 um 11:13 schrieb Mich Talebzadeh :It seems that workload identity is not available on AWS. Workload Identity replaces the need to use Metadata concealment on exposed storage such as s3 and gcs. The sensitive metadata protected by metadata concealment is also protected by Workload Identity.Both Google Cloud Kubernetes (GKE) and Azure Kubernetes Service support Workload Identity. Taking notes from Google Cloud:  "Workload Identity is the recommended way for your workloads running on Google Kubernetes Engine (GKE) to access Google Cloud services in a secure and manageable way."HTH

Mich Talebzadeh,Distinguished Technologist, Solutions Architect & EngineerLondonUnited Kingdom

   view my Linkedin profile https://en.everybodywiki.com/Mich_Talebzadeh

 Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction
of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from such
loss, damage or destruction.  

On Sun, 1 Oct 2023 at 06:36, Jayabindu Singh  wrote:Hi Jon,Using IAM as suggested by Jorn is the best approach.We recently moved our spark workload from HDP to Spark on K8 and utilizing IAM.It will save you from secret management headaches and also allows a lot more flexibility on access control and option to allow access to multiple S3 buckets in the same pod. We have implemented this across Azure, Google and AWS. Azure does require some extra work to make it work.On Sat, Sep 30, 2023 at 12:05 PM Jörn Franke  wrote:Don’t use static iam (s3) credentials. It is an outdated insecure method - even AWS recommend against using this for anything (cf eg https://docs.aws.amazon.com/cli/latest/userguide/cli-authentication-user.html).It is almost a guarantee to get your data stolen and your account manipulated. If you need to use kubernetes (which has its own very problematic security issues) then assign AWS IAM roles with minimal permissions to the pods (for EKS it means using OIDC, cf https://docs.aws.amazon.com/eks/latest/userguide/service_IAM_role.html).Am 30.09.2023 um 03:41 schrieb Jon Rodríguez Aranguren :Dear Spark Community Members,I trust this message finds you all in good health and spirits.I'm reaching out to the collective expertise of this esteemed community with a query regarding Spark on Kubernetes. As a newcomer, I have always admired the depth and breadth of knowledge shared within this forum, and it is my hope that some of you might have insights on a specific challenge I'm facing.I am currently trying to configure multiple Kubernetes secrets, notably multiple S3 keys, at the SparkConf level for a Spark application. My objective is to understand the best approach or methods to ensure that these secrets can be smoothly accessed by the Spark application.If any of you have previously encountered this scenario or possess relevant insights on the matter, your guidance would be highly beneficial.Thank you for your time and consideration. I'm eager to learn from the experiences and knowledge present within this community.Warm regards,Jon




Re: Seeking Guidance on Spark on Kubernetes Secrets Configuration

2023-10-01 Thread Mich Talebzadeh
It seems that workload identity
 is not
available on AWS. Workload Identity replaces the need to use Metadata
concealment on exposed storage such as s3 and gcs. The sensitive metadata
protected by metadata concealment is also protected by Workload Identity.

Both Google Cloud Kubernetes (GKE
)
and Azure Kubernetes Servi
ce
support Workload Identity. Taking notes from Google Cloud:  "Workload
Identity is the recommended way for your workloads running on Google
Kubernetes Engine (GKE) to access Google Cloud services in a secure and
manageable way."


HTH


Mich Talebzadeh,
Distinguished Technologist, Solutions Architect & Engineer
London
United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Sun, 1 Oct 2023 at 06:36, Jayabindu Singh  wrote:

> Hi Jon,
>
> Using IAM as suggested by Jorn is the best approach.
> We recently moved our spark workload from HDP to Spark on K8 and utilizing
> IAM.
> It will save you from secret management headaches and also allows a lot
> more flexibility on access control and option to allow access to multiple
> S3 buckets in the same pod.
> We have implemented this across Azure, Google and AWS. Azure does require
> some extra work to make it work.
>
> On Sat, Sep 30, 2023 at 12:05 PM Jörn Franke  wrote:
>
>> Don’t use static iam (s3) credentials. It is an outdated insecure method
>> - even AWS recommend against using this for anything (cf eg
>> https://docs.aws.amazon.com/cli/latest/userguide/cli-authentication-user.html
>> ).
>> It is almost a guarantee to get your data stolen and your account
>> manipulated.
>>
>> If you need to use kubernetes (which has its own very problematic
>> security issues) then assign AWS IAM roles with minimal permissions to the
>> pods (for EKS it means using OIDC, cf
>> https://docs.aws.amazon.com/eks/latest/userguide/service_IAM_role.html).
>>
>> Am 30.09.2023 um 03:41 schrieb Jon Rodríguez Aranguren <
>> jon.r.arangu...@gmail.com>:
>>
>> 
>> Dear Spark Community Members,
>>
>> I trust this message finds you all in good health and spirits.
>>
>> I'm reaching out to the collective expertise of this esteemed community
>> with a query regarding Spark on Kubernetes. As a newcomer, I have always
>> admired the depth and breadth of knowledge shared within this forum, and it
>> is my hope that some of you might have insights on a specific challenge I'm
>> facing.
>>
>> I am currently trying to configure multiple Kubernetes secrets, notably
>> multiple S3 keys, at the SparkConf level for a Spark application. My
>> objective is to understand the best approach or methods to ensure that
>> these secrets can be smoothly accessed by the Spark application.
>>
>> If any of you have previously encountered this scenario or possess
>> relevant insights on the matter, your guidance would be highly beneficial.
>>
>> Thank you for your time and consideration. I'm eager to learn from the
>> experiences and knowledge present within this community.
>>
>> Warm regards,
>> Jon
>>
>>


Re: Seeking Guidance on Spark on Kubernetes Secrets Configuration

2023-09-30 Thread Jayabindu Singh
Hi Jon,

Using IAM as suggested by Jorn is the best approach.
We recently moved our spark workload from HDP to Spark on K8 and utilizing
IAM.
It will save you from secret management headaches and also allows a lot
more flexibility on access control and option to allow access to multiple
S3 buckets in the same pod.
We have implemented this across Azure, Google and AWS. Azure does require
some extra work to make it work.

On Sat, Sep 30, 2023 at 12:05 PM Jörn Franke  wrote:

> Don’t use static iam (s3) credentials. It is an outdated insecure method -
> even AWS recommend against using this for anything (cf eg
> https://docs.aws.amazon.com/cli/latest/userguide/cli-authentication-user.html
> ).
> It is almost a guarantee to get your data stolen and your account
> manipulated.
>
> If you need to use kubernetes (which has its own very problematic security
> issues) then assign AWS IAM roles with minimal permissions to the pods (for
> EKS it means using OIDC, cf
> https://docs.aws.amazon.com/eks/latest/userguide/service_IAM_role.html).
>
> Am 30.09.2023 um 03:41 schrieb Jon Rodríguez Aranguren <
> jon.r.arangu...@gmail.com>:
>
> 
> Dear Spark Community Members,
>
> I trust this message finds you all in good health and spirits.
>
> I'm reaching out to the collective expertise of this esteemed community
> with a query regarding Spark on Kubernetes. As a newcomer, I have always
> admired the depth and breadth of knowledge shared within this forum, and it
> is my hope that some of you might have insights on a specific challenge I'm
> facing.
>
> I am currently trying to configure multiple Kubernetes secrets, notably
> multiple S3 keys, at the SparkConf level for a Spark application. My
> objective is to understand the best approach or methods to ensure that
> these secrets can be smoothly accessed by the Spark application.
>
> If any of you have previously encountered this scenario or possess
> relevant insights on the matter, your guidance would be highly beneficial.
>
> Thank you for your time and consideration. I'm eager to learn from the
> experiences and knowledge present within this community.
>
> Warm regards,
> Jon
>
>


Re: Seeking Guidance on Spark on Kubernetes Secrets Configuration

2023-09-30 Thread Jörn Franke
Don’t use static iam (s3) credentials. It is an outdated insecure method - even 
AWS recommend against using this for anything (cf eg 
https://docs.aws.amazon.com/cli/latest/userguide/cli-authentication-user.html).
It is almost a guarantee to get your data stolen and your account manipulated. 

If you need to use kubernetes (which has its own very problematic security 
issues) then assign AWS IAM roles with minimal permissions to the pods (for EKS 
it means using OIDC, cf 
https://docs.aws.amazon.com/eks/latest/userguide/service_IAM_role.html).

> Am 30.09.2023 um 03:41 schrieb Jon Rodríguez Aranguren 
> :
> 
> 
> Dear Spark Community Members,
> 
> I trust this message finds you all in good health and spirits.
> 
> I'm reaching out to the collective expertise of this esteemed community with 
> a query regarding Spark on Kubernetes. As a newcomer, I have always admired 
> the depth and breadth of knowledge shared within this forum, and it is my 
> hope that some of you might have insights on a specific challenge I'm facing.
> 
> I am currently trying to configure multiple Kubernetes secrets, notably 
> multiple S3 keys, at the SparkConf level for a Spark application. My 
> objective is to understand the best approach or methods to ensure that these 
> secrets can be smoothly accessed by the Spark application.
> 
> If any of you have previously encountered this scenario or possess relevant 
> insights on the matter, your guidance would be highly beneficial.
> 
> Thank you for your time and consideration. I'm eager to learn from the 
> experiences and knowledge present within this community.
> 
> Warm regards,
> Jon


using facebook Prophet + pyspark for forecasting - Dataframe has less than 2 non-NaN rows

2023-09-29 Thread karan alang
Hello - Anyone used Prophet + pyspark for forecasting ?
I'm trying to backfill forecasts, and running into issues (error -
Dataframe has less than 2 non-NaN rows)

I'm removing all records with NaN values, yet getting this error.

details are in stackoverflow link ->
https://stackoverflow.com/questions/77205021/facebook-prophet-dataframe-has-less-than-2-non-nan-rows

any ideas on how to fix/debug this?

tia!


Seeking Guidance on Spark on Kubernetes Secrets Configuration

2023-09-29 Thread Jon Rodríguez Aranguren
Dear Spark Community Members,

I trust this message finds you all in good health and spirits.

I'm reaching out to the collective expertise of this esteemed community
with a query regarding Spark on Kubernetes. As a newcomer, I have always
admired the depth and breadth of knowledge shared within this forum, and it
is my hope that some of you might have insights on a specific challenge I'm
facing.

I am currently trying to configure multiple Kubernetes secrets, notably
multiple S3 keys, at the SparkConf level for a Spark application. My
objective is to understand the best approach or methods to ensure that
these secrets can be smoothly accessed by the Spark application.

If any of you have previously encountered this scenario or possess relevant
insights on the matter, your guidance would be highly beneficial.

Thank you for your time and consideration. I'm eager to learn from the
experiences and knowledge present within this community.

Warm regards,
Jon


Re: Inquiry about Processing Speed

2023-09-28 Thread Jack Goodson
Hi Haseeb,

I think the user mailing list is what you're looking for, people are
usually pretty active on here if you present a direct question about apache
spark. I've linked below the community guidelines which says which mailing
lists are for what etc

https://spark.apache.org/community.html

There's a few resources below for cluster management and code performance
tweaks but if you write declaratively in Spark the planning engine does a
pretty good job of optimising jobs, it's hard to answer without a specific
problem presented, hope the docs get you started

https://spark.apache.org/docs/latest/cluster-overview.html

https://spark.apache.org/docs/latest/tuning.html

https://spark.apache.org/docs/latest/sql-performance-tuning.html

On Thu, Sep 28, 2023 at 3:22 PM Haseeb Khalid 
wrote:

> Dear Support Team,
>
> I hope this message finds you well. My name is Haseeb Khalid, and I am
> reaching out to discuss a scenario related to processing speed in Apache
> Spark.
>
> I have been utilizing these technologies in our projects, and we have
> encountered a specific use case where we are seeking to optimize processing
> speed. Given the critical nature of this requirement, I would greatly
> appreciate the opportunity to discuss this with a knowledgeable
> representative from your team.
>
> I am particularly interested in understanding any best practices,
> configuration tweaks, or architectural considerations that can be employed
> to enhance processing speed in our specific scenario.
>
> Would it be possible to schedule a call or exchange emails to delve deeper
> into this matter? I am available at your convenience and can accommodate
> any preferred mode of communication.
>
> I genuinely value the expertise of the Apache Spark communities and
> believe that your insights will be instrumental in achieving our objectives.
>
> Thank you very much for your time and consideration. I look forward to
> hearing from you soon.
>
> --
>
> Thanks & Best Regards,
>
> *Haseeb Khalid*
>
> *Senior Data Analyst*
>
> +92 306 4436 790
>
>
>


Thread dump only shows 10 shuffle clients

2023-09-28 Thread Nebi Aydin
Hi all,
I set the spark.shuffle.io.serverThreads and spark.shuffle.io.clientThreads
to *800*
But when I click Thread dump from the Spark UI for the executor: I only see
10 shuffle client threads for the executor.
Is that normal, am I missing something?


<    2   3   4   5   6   7   8   9   10   11   >