Re: Spark File Output Committer algorithm for GCS

2023-07-17 Thread Jay
You can try increasing fs.gs.batch.threads and fs.gs.max.requests.per.batch.

The definitions for these flags are available here -
https://github.com/GoogleCloudDataproc/hadoop-connectors/blob/master/gcs/CONFIGURATION.md

On Mon, 17 Jul 2023 at 14:59, Dipayan Dev  wrote:

> No, I am using Spark 2.4 to update the GCS partitions . I have a managed
> Hive table on top of this.
> [image: image.png]
> When I do a dynamic partition update of Spark, it creates the new file in
> a Staging area as shown here.
> But the GCS blob renaming takes a lot of time. I have a partition based on
> dates and I need to update around 3 years of data. It usually takes 3 hours
> to finish the process. Anyway to speed up this?
> With Best Regards,
>
> Dipayan Dev
>
> On Mon, Jul 17, 2023 at 1:53 PM Mich Talebzadeh 
> wrote:
>
>> So you are using GCP and your Hive is installed on Dataproc which happens
>> to run your Spark as well. Is that correct?
>>
>> What version of Hive are you using?
>>
>> HTH
>>
>>
>> Mich Talebzadeh,
>> Solutions Architect/Engineering Lead
>> Palantir Technologies Limited
>> London
>> United Kingdom
>>
>>
>>view my Linkedin profile
>> 
>>
>>
>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> On Mon, 17 Jul 2023 at 09:16, Dipayan Dev 
>> wrote:
>>
>>> Hi All,
>>>
>>> Of late, I have encountered the issue where I have to overwrite a lot of
>>> partitions of the Hive table through Spark. It looks like writing to
>>> hive_staging_directory takes 25% of the total time, whereas 75% or more
>>> time goes in moving the ORC files from staging directory to the final
>>> partitioned directory structure.
>>>
>>> I got some reference where it's mentioned to use this config during the
>>> Spark write.
>>> *mapreduce.fileoutputcommitter.algorithm.version = 2*
>>>
>>> However, it's also mentioned it's not safe as partial job failure might
>>> cause data loss.
>>>
>>> Is there any suggestion on the pros and cons of using this version? Or
>>> any ongoing Spark feature development to address this issue?
>>>
>>>
>>>
>>> With Best Regards,
>>>
>>> Dipayan Dev
>>>
>>


Unsubscribe

2023-07-17 Thread Bode, Meikel
Unsubscribe


Spark Scala SBT Local build fails

2023-07-17 Thread Varun Shah
Resending this message with a proper Subject line

Hi Spark Community,

I am trying to set up my forked apache/spark project locally for my 1st
Open Source Contribution, by building and creating a package as mentioned here
under Running Individual Tests
.
Here are the steps I have followed:
>> .build/sbt  # this opens a sbt console
>> test # to execute all tests

I am getting the following error and the tests are failing. Even compile /
package sbt commands fail with the same errors.

>
> [info] compiling 19 Java sources to
> forked/spark/common/network-shuffle/target/scala-2.12/test-classes ...
> [info] compiling 330 Scala sources and 29 Java sources to
> forked/spark/core/target/scala-2.12/test-classes ...
> [error]
> forked/spark/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataType.scala:21:0:
> There should at least one a single empty line separating groups 3rdParty
> and spark.
> [error]
> forked/spark/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataType.scala:32:0:
> org.json4s.JsonAST.JValue should be in group 3rdParty, not spark.
> [error]
> forked/spark/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataType.scala:33:0:
> org.json4s.JsonDSL._ should be in group 3rdParty, not spark.
> [error]
> forked/spark/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataType.scala:34:0:
> org.json4s._ should be in group 3rdParty, not spark.
> [error]
> forked/spark/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataType.scala:35:0:
> org.json4s.jackson.JsonMethods._ should be in group 3rdParty, not spark.
> [error]
> forked/spark/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataType.scala:37:0:
> java.util.Locale should be in group java, not spark.
> [error]
> forked/spark/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataType.scala:38:0:
> scala.util.control.NonFatal should be in group scala, not spark.
> [error]
> forked/spark/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala:226:
> File line length exceeds 100 characters
> [error] stack trace is suppressed; run last catalyst / scalaStyleOnCompile
> for the full output
> [error] stack trace is suppressed; run last scalaStyleOnTest for the full
> outpu
> [error] (catalyst / scalaStyleOnCompile) Failing because of negative
> scalastyle result
> [error] (scalaStyleOnTest) Failing because of negative scalastyle result
>

Can you please guide me if I am doing something wrong.

Regards,
Varun Shah


Re: Unsubscribe

2023-07-17 Thread srini subramanian
 Unsubscribe 
On Monday, July 17, 2023 at 11:19:41 AM GMT+5:30, Bode, Meikel 
 wrote:  
 
  
Unsubscribe
   

Re: Spark File Output Committer algorithm for GCS

2023-07-17 Thread Dipayan Dev
No, I am using Spark 2.4 to update the GCS partitions . I have a managed
Hive table on top of this.
[image: image.png]
When I do a dynamic partition update of Spark, it creates the new file in a
Staging area as shown here.
But the GCS blob renaming takes a lot of time. I have a partition based on
dates and I need to update around 3 years of data. It usually takes 3 hours
to finish the process. Anyway to speed up this?
With Best Regards,

Dipayan Dev

On Mon, Jul 17, 2023 at 1:53 PM Mich Talebzadeh 
wrote:

> So you are using GCP and your Hive is installed on Dataproc which happens
> to run your Spark as well. Is that correct?
>
> What version of Hive are you using?
>
> HTH
>
>
> Mich Talebzadeh,
> Solutions Architect/Engineering Lead
> Palantir Technologies Limited
> London
> United Kingdom
>
>
>view my Linkedin profile
> 
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Mon, 17 Jul 2023 at 09:16, Dipayan Dev  wrote:
>
>> Hi All,
>>
>> Of late, I have encountered the issue where I have to overwrite a lot of
>> partitions of the Hive table through Spark. It looks like writing to
>> hive_staging_directory takes 25% of the total time, whereas 75% or more
>> time goes in moving the ORC files from staging directory to the final
>> partitioned directory structure.
>>
>> I got some reference where it's mentioned to use this config during the
>> Spark write.
>> *mapreduce.fileoutputcommitter.algorithm.version = 2*
>>
>> However, it's also mentioned it's not safe as partial job failure might
>> cause data loss.
>>
>> Is there any suggestion on the pros and cons of using this version? Or
>> any ongoing Spark feature development to address this issue?
>>
>>
>>
>> With Best Regards,
>>
>> Dipayan Dev
>>
>


Re: Spark File Output Committer algorithm for GCS

2023-07-17 Thread Mich Talebzadeh
So you are using GCP and your Hive is installed on Dataproc which happens
to run your Spark as well. Is that correct?

What version of Hive are you using?

HTH


Mich Talebzadeh,
Solutions Architect/Engineering Lead
Palantir Technologies Limited
London
United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Mon, 17 Jul 2023 at 09:16, Dipayan Dev  wrote:

> Hi All,
>
> Of late, I have encountered the issue where I have to overwrite a lot of
> partitions of the Hive table through Spark. It looks like writing to
> hive_staging_directory takes 25% of the total time, whereas 75% or more
> time goes in moving the ORC files from staging directory to the final
> partitioned directory structure.
>
> I got some reference where it's mentioned to use this config during the
> Spark write.
> *mapreduce.fileoutputcommitter.algorithm.version = 2*
>
> However, it's also mentioned it's not safe as partial job failure might
> cause data loss.
>
> Is there any suggestion on the pros and cons of using this version? Or any
> ongoing Spark feature development to address this issue?
>
>
>
> With Best Regards,
>
> Dipayan Dev
>


Spark File Output Committer algorithm for GCS

2023-07-17 Thread Dipayan Dev
Hi All,

Of late, I have encountered the issue where I have to overwrite a lot of
partitions of the Hive table through Spark. It looks like writing to
hive_staging_directory takes 25% of the total time, whereas 75% or more
time goes in moving the ORC files from staging directory to the final
partitioned directory structure.

I got some reference where it's mentioned to use this config during the
Spark write.
*mapreduce.fileoutputcommitter.algorithm.version = 2*

However, it's also mentioned it's not safe as partial job failure might
cause data loss.

Is there any suggestion on the pros and cons of using this version? Or any
ongoing Spark feature development to address this issue?



With Best Regards,

Dipayan Dev


Unsubscribe

2023-07-16 Thread Bode, Meikel
Unsubscribe


Re: Contributing to Spark MLLib

2023-07-16 Thread Brian Huynh
Good morning Dipayan,

Happy to see another contributor!

Please go through this document for contributors. Please note the
MLlib-specific contribution guidelines section in particular.

https://spark.apache.org/contributing.html

Since you are looking for something to start with, take a look at this Jira
query for starter issues.

https://issues.apache.org/jira/browse/SPARK-38719?jql=project%20%3D%20SPARK%20AND%20labels%20%3D%20%22starter%22%20AND%20status%20%3D%20Open

Cheers,
Brian

On Sun, Jul 16, 2023 at 8:49 AM Dipayan Dev  wrote:

> Hi Spark Community,
>
> A very good morning to you.
>
> I am using Spark from last few years now, and new to the community.
>
> I am very much interested to be a contributor.
>
> I am looking to contribute to Spark MLLib. Can anyone please suggest me
> how to start with contributing to any new MLLib feature? Is there any new
> features in line and the best way to explore this?
> Looking forward to little guidance to start with.
>
>
> Thanks
> Dipayan
> --
>
>
>
> With Best Regards,
>
> Dipayan Dev
> Author of *Deep Learning with Hadoop
> *
> M.Tech (AI), IISc, Bangalore
>


-- 
>From Brian H.


Contributing to Spark MLLib

2023-07-16 Thread Dipayan Dev
Hi Spark Community,

A very good morning to you.

I am using Spark from last few years now, and new to the community.

I am very much interested to be a contributor.

I am looking to contribute to Spark MLLib. Can anyone please suggest me how
to start with contributing to any new MLLib feature? Is there any new
features in line and the best way to explore this?
Looking forward to little guidance to start with.


Thanks
Dipayan
-- 



With Best Regards,

Dipayan Dev
Author of *Deep Learning with Hadoop
*
M.Tech (AI), IISc, Bangalore


[no subject]

2023-07-16 Thread Varun Shah
Hi Spark Community,

I am trying to setup my forked apache/spark project locally by building and
creating a package as mentioned here under Running Individual Tests
.
Here are the steps I have followed:
>> .build/sbt  # this opens a sbt console
>> test # to execute all tests

I am getting the following error and the tests are failing. Even compile /
package sbt commands fail with the same errors.

>
> [info] compiling 19 Java sources to
> forked/spark/common/network-shuffle/target/scala-2.12/test-classes ...
> [info] compiling 330 Scala sources and 29 Java sources to
> forked/spark/core/target/scala-2.12/test-classes ...
> [error]
> forked/spark/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataType.scala:21:0:
> There should at least one a single empty line separating groups 3rdParty
> and spark.
> [error]
> forked/spark/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataType.scala:32:0:
> org.json4s.JsonAST.JValue should be in group 3rdParty, not spark.
> [error]
> forked/spark/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataType.scala:33:0:
> org.json4s.JsonDSL._ should be in group 3rdParty, not spark.
> [error]
> forked/spark/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataType.scala:34:0:
> org.json4s._ should be in group 3rdParty, not spark.
> [error]
> forked/spark/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataType.scala:35:0:
> org.json4s.jackson.JsonMethods._ should be in group 3rdParty, not spark.
> [error]
> forked/spark/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataType.scala:37:0:
> java.util.Locale should be in group java, not spark.
> [error]
> forked/spark/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataType.scala:38:0:
> scala.util.control.NonFatal should be in group scala, not spark.
> [error]
> forked/spark/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala:226:
> File line length exceeds 100 characters
> [error] stack trace is suppressed; run last catalyst / scalaStyleOnCompile
> for the full output
> [error] stack trace is suppressed; run last scalaStyleOnTest for the full
> outpu
> [error] (catalyst / scalaStyleOnCompile) Failing because of negative
> scalastyle result
> [error] (scalaStyleOnTest) Failing because of negative scalastyle result
>

Can you please guide me if I am doing something wrong. Looking forward to
get a response soon :)

Regards,
Varun Shah


[Spark RPC]: Yarn - Application Master / executors to Driver communication issue

2023-07-14 Thread Sunayan Saikia
Hey Spark Community,

Our Jupyterhub/Jupyterlab (with spark client) runs behind two layers of
HAProxy and the Yarn cluster runs remotely. We want to use deploy mode
'client' so that we can capture the output of any spark sql query in
jupyterlab. I'm aware of other technologies like Livy and Spark Connect,
however, we want to do things without using any of these at the moment.

With Spark 3, we have seen that during spark session creation itself, the
*ApplicationMaster* attempts fail to talk back to the Driver with an
Exception of *'awaitResults - Too Large Frame: 5211803372140375592 -
Connection closed'.* This doesn't look like a correct exception because it
fails just during the creating the spark session without any query load.

We are using the configs *spark.driver.host*, *spark.driver.port,*
*spark.driver.blockManager.port* and *spark.driver.bindAddress. *We have
tested that, from outside, the host and the ports used with the above
configs, are already accessible.

I'm wondering if Spark supports this type of communication? Any suggestions
to debug this further?

-- 
Thanks,
Sunayan Saikia


Re: Unable to populate spark metrics using custom metrics API

2023-07-13 Thread Surya Soma
Gentle reminder on this.

On Sat, Jul 8, 2023 at 7:59 PM Surya Soma  wrote:

> Hello,
>
> I am trying to publish custom metrics using Spark CustomMetric API as
> supported since spark 3.2 https://github.com/apache/spark/pull/31476,
>
>
> https://spark.apache.org/docs/3.2.0/api/java/org/apache/spark/sql/connector/metric/CustomMetric.html
>
> I have created a custom metric implementing `CustomMetic` with default
> constructor overriding name and description.
> Created a new instance of the created custom metric in the
> `supportedCustomMetrics` method of  `spark.sql.connector.read.Scan`.
>
> Created a custom task metric implementing `CustomTaskMetric` with the same
> name as that of CustomMetric class and initialized this in
> `currentMetricsValues` of PartitionReader.
>
> I have static values as of now but when I run the application, I see in
> the spark history page the corresponding value to the metric as N/A.
> I have added logs in the `aggregateTaskMetrics` and my flow is going into
> it. The spark SQLAppStatusListener.aggregateMetrics is loading my class and
> calling the `aggregateTaskMetrics` yet I still see N/A in the spark ui page.
>
> Also, I do see the metrics in the spark events log.
>
>
> Driver log:
>
> ```
>
> 23/06/23 19:23:53 INFO Spark32CustomMetric: Spark32CustomMetric in 
> aggregateTaskMetrics start
> 23/06/23 19:23:53 INFO Spark32CustomMetric: Spark32CustomMetric in 
> aggregateTaskMetrics sum:1234 end
> +-+--+---+---+
> | word|word_count| corpus|corpus_date|
> +-+--+---+---+
> | LVII| 1|sonnets|  0|
> |   augurs| 1|sonnets|  0|
> |   dimm'd| 1|sonnets|  0|```
>
>
> Attaching the Spark UI page screenshot.
>
> Am I missing something? Any help is really appreciated.
>
> Thanks.
>


Re: Spark Not Connecting

2023-07-12 Thread Artemis User
Well, in that case, you may want to make sure your Spark server is 
running properly and you can access the Spark UI using your browser.  If 
you're not owning the spark cluster, contact your spark admin.


On 7/12/23 1:56 PM, timi ayoade wrote:

I can't even connect to the spark UI

On Wed, Jul 12, 2023, 6:00 PM Artemis User  wrote:

The error screenshot doesn't tell much.  Maybe your job wasn't
submitted
properly.  Make sure you IP/port numbers were defined correctly. 
Take a
look at the Spark server UI to see what errors occur.

On 7/12/23 6:11 AM, timi ayoade wrote:
> Hi Apache spark community, I am a Data EngineerI have been using
> Apache spark for some time now. I recently tried to use it but I
have
> been getting some errors. I have tried debugging the error but
to no
> avail. the screenshot is attached below. I will be glad if
> responded to. thanks
>
>
-
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: [EXTERNAL] Spark Not Connecting

2023-07-12 Thread Daniel Tavares de Santana
unsubscribe

From: timi ayoade 
Sent: Wednesday, July 12, 2023 6:11 AM
To: user@spark.apache.org 
Subject: [EXTERNAL] Spark Not Connecting

Hi Apache spark community, I am a Data EngineerI have been using Apache spark 
for some time now. I recently tried to use it but I have been getting some 
errors. I have tried debugging the error but to no avail. the screenshot is 
attached below. 

Hi Apache spark community, I am a Data EngineerI have been using Apache spark 
for some time now. I recently tried to use it but I have been getting some 
errors. I have tried debugging the error but to no avail. the screenshot is 
attached below. I will be glad if responded to. thanks


Spark Not Connecting

2023-07-12 Thread timi ayoade
Hi Apache spark community, I am a Data EngineerI have been using Apache
spark for some time now. I recently tried to use it but I have been getting
some errors. I have tried debugging the error but to no avail. the
screenshot is attached below. I will be glad if responded to. thanks

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Re: Loading in custom Hive jars for spark

2023-07-11 Thread Mich Talebzadeh
Are you using Spark 3.4?
Under directory $SPARK_HOME get a list of jar files for hive and hadoop.
This one is for version 3.4.0

 /opt/spark/jars> ltr *hive* *hadoop*
-rw-r--r--. 1 hduser hadoop   717820 Apr  7 03:43 spark-hive_2.12-3.4.0.jar
-rw-r--r--. 1 hduser hadoop   563632 Apr  7 03:43
spark-hive-thriftserver_2.12-3.4.0.jar
-rw-r--r--. 1 hduser hadoop   990925 Apr  7 03:43 parquet-hadoop-1.12.3.jar
-rw-r--r--. 1 hduser hadoop   258346 Apr  7 03:43 hive-storage-api-2.8.1.jar
-rw-r--r--. 1 hduser hadoop12923 Apr  7 03:43
hive-shims-scheduler-2.3.9.jar
-rw-r--r--. 1 hduser hadoop   120293 Apr  7 03:43
hive-shims-common-2.3.9.jar
-rw-r--r--. 1 hduser hadoop 8786 Apr  7 03:43 hive-shims-2.3.9.jar
-rw-r--r--. 1 hduser hadoop53902 Apr  7 03:43 hive-shims-0.23-2.3.9.jar
-rw-r--r--. 1 hduser hadoop  1679366 Apr  7 03:43 hive-service-rpc-3.1.3.jar
-rw-r--r--. 1 hduser hadoop   916630 Apr  7 03:43 hive-serde-2.3.9.jar
-rw-r--r--. 1 hduser hadoop  8195966 Apr  7 03:43 hive-metastore-2.3.9.jar
-rw-r--r--. 1 hduser hadoop   326585 Apr  7 03:43 hive-llap-common-2.3.9.jar
-rw-r--r--. 1 hduser hadoop   116364 Apr  7 03:43 hive-jdbc-2.3.9.jar
-rw-r--r--. 1 hduser hadoop 10840949 Apr  7 03:43 hive-exec-2.3.9-core.jar
-rw-r--r--. 1 hduser hadoop   436169 Apr  7 03:43 hive-common-2.3.9.jar
-rw-r--r--. 1 hduser hadoop44704 Apr  7 03:43 hive-cli-2.3.9.jar
-rw-r--r--. 1 hduser hadoop   183633 Apr  7 03:43 hive-beeline-2.3.9.jar
-rw-r--r--. 1 hduser hadoop56812 Apr  7 03:43
hadoop-yarn-server-web-proxy-3.3.4.jar
-rw-r--r--. 1 hduser hadoop  3362359 Apr  7 03:43
hadoop-shaded-guava-1.1.1.jar
-rw-r--r--. 1 hduser hadoop 30085504 Apr  7 03:43
hadoop-client-runtime-3.3.4.jar
-rw-r--r--. 1 hduser hadoop 19458635 Apr  7 03:43
hadoop-client-api-3.3.4.jar
-rw-r--r--. 1 hduser hadoop15935 Apr  7 03:43
dropwizard-metrics-hadoop-metrics2-reporter-0.1.2.jar
-rwxr--r--. 1 hduser hadoop 17663298 Apr 20 09:37
gcs-connector-hadoop3-2.2.0-shaded.jar

HTH

Mich Talebzadeh,
Solutions Architect/Engineering Lead
Palantir Technologies Limited
London
United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Tue, 11 Jul 2023 at 18:29, Yeachan Park  wrote:

> Hi all,
>
> We made some changes to hive which require changes to the hive jars that
> Spark is bundled with. Since Spark 3.3.1 comes bundled with Hive 2.3.9
> jars, we built our changes in Hive 2.3.9 and put the necessary jars under
> $SPARK_HOME/jars (replacing the original jars that were there), everything
> works fine.
>
> However since I wanted to make use of spark.jars.packages to download jars
> at runtime, I thought what would also work is if I deleted the original
> hive jars from $SPARK_HOME/jars and download the same jars at runtime.
> Apparently spark.jars.packages should add these jars to the classpath.
> Instead I get a NoClassDefFoundError downloading the same Jars:
>
> ```
> Caused by: java.lang.reflect.InvocationTargetException:
> java.lang.NoClassDefFoundError:
> org/apache/hadoop/hive/ql/metadata/HiveException
>   at
> java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native
> Method)
>   at
> java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(Unknown
> Source)
>   at
> java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(Unknown
> Source)
>   at java.base/java.lang.reflect.Constructor.newInstance(Unknown Source)
>   at
> org.apache.spark.sql.internal.SharedState$.org$apache$spark$sql$internal$SharedState$$reflect(SharedState.scala:227)
>   ... 87 more
> Caused by: java.lang.NoClassDefFoundError:
> org/apache/hadoop/hive/ql/metadata/HiveException
>   at
> org.apache.spark.sql.hive.HiveExternalCatalog.(HiveExternalCatalog.scala:75)
>   ... 92 more
> Caused by: java.lang.ClassNotFoundException:
> org.apache.hadoop.hive.ql.metadata.HiveException
>   at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(Unknown
> Source)
>   at
> java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(Unknown
> Source)
>   at java.base/java.lang.ClassLoader.loadClass(Unknown Source)
> ```
>
> The class HiveException should already be available in the jars that have
> been supplied by spark.jars.packages... Any idea what could be wrong?
>
> Thanks,
> Yeachan
>
>
>


Loading in custom Hive jars for spark

2023-07-11 Thread Yeachan Park
Hi all,

We made some changes to hive which require changes to the hive jars that
Spark is bundled with. Since Spark 3.3.1 comes bundled with Hive 2.3.9
jars, we built our changes in Hive 2.3.9 and put the necessary jars under
$SPARK_HOME/jars (replacing the original jars that were there), everything
works fine.

However since I wanted to make use of spark.jars.packages to download jars
at runtime, I thought what would also work is if I deleted the original
hive jars from $SPARK_HOME/jars and download the same jars at runtime.
Apparently spark.jars.packages should add these jars to the classpath.
Instead I get a NoClassDefFoundError downloading the same Jars:

```
Caused by: java.lang.reflect.InvocationTargetException:
java.lang.NoClassDefFoundError:
org/apache/hadoop/hive/ql/metadata/HiveException
  at
java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native
Method)
  at
java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(Unknown
Source)
  at
java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(Unknown
Source)
  at java.base/java.lang.reflect.Constructor.newInstance(Unknown Source)
  at
org.apache.spark.sql.internal.SharedState$.org$apache$spark$sql$internal$SharedState$$reflect(SharedState.scala:227)
  ... 87 more
Caused by: java.lang.NoClassDefFoundError:
org/apache/hadoop/hive/ql/metadata/HiveException
  at
org.apache.spark.sql.hive.HiveExternalCatalog.(HiveExternalCatalog.scala:75)
  ... 92 more
Caused by: java.lang.ClassNotFoundException:
org.apache.hadoop.hive.ql.metadata.HiveException
  at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(Unknown
Source)
  at
java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(Unknown
Source)
  at java.base/java.lang.ClassLoader.loadClass(Unknown Source)
```

The class HiveException should already be available in the jars that have
been supplied by spark.jars.packages... Any idea what could be wrong?

Thanks,
Yeachan


Jobs that have join & have .rdd calls get executed 2x when AQE is enabled.

2023-07-11 Thread Priyanka Raju
We have a few spark scala jobs that are currently running in production.
Most jobs typically use Dataset, Dataframes. There is a small code in our
custom library code, that makes rdd calls example to check if the dataframe
is empty: df.rdd.getNumPartitions == 0

When I enable aqe for these jobs, this .rdd is converted into a separate
job of its own and the entire dag is executed 2x, taking 2x more time. This
does not happen when AQE is disabled. Why does this happen and what is the
best way to fix the issue?


Sample code to reproduce the issue code also in gist
:



import org.apache.spark.sql._

  case class Record(

id: Int,

name: String

)

val partCount = 4

val input1 = (0 until 100).map(part => Record(part, "a"))

val input2 = (100 until 110).map(part => Record(part, "c"))

implicit val enc: Encoder[Record] = Encoders.product[Record]

val ds1 = spark.createDataset(

  spark.sparkContext

.parallelize(input1, partCount)

)

val ds2 = spark.createDataset(

  spark.sparkContext

.parallelize(input2, partCount)

)

val ds3 = ds1.join(ds2, Seq("id"))

val l = ds3.count()

val incomingPartitions = ds3.rdd.getNumPartitions

log.info(s"Num partitions ${incomingPartitions}")


Spark UI job view with AQE enabled.



Spark UI job view without AQE.




We use spark 3.1 in production, but I can see the same behavior in spark
3.2 from the spark-shell as well


This is causing unexpected regression in our jobs when we try to enable AQE
for our jobs in production.

-- 
Regards,
Priyanka


Re: PySpark error java.lang.IllegalArgumentException

2023-07-10 Thread elango vaidyanathan
Finally I was able to solve this issue by setting this conf.
"spark.driver.extraJavaOptions=-Dorg.xerial.snappy.tempdir=/my_user/temp_
folder"

Thanks all!



On Sat, 8 Jul 2023 at 3:45 AM, Brian Huynh  wrote:

> Hi Khalid,
>
> Elango mentioned the file is working fine in our another environment with
> the same driver and executor memory
>
> Brian
>
> On Jul 7, 2023, at 10:18 AM, Khalid Mammadov 
> wrote:
>
> 
>
> Perhaps that parquet file is corrupted or got that is in that folder?
> To check, try to read that file with pandas or other tools to see if you
> can read without Spark.
>
> On Wed, 5 Jul 2023, 07:25 elango vaidyanathan, 
> wrote:
>
>>
>> Hi team,
>>
>> Any updates on this below issue
>>
>> On Mon, 3 Jul 2023 at 6:18 PM, elango vaidyanathan 
>> wrote:
>>
>>>
>>>
>>> Hi all,
>>>
>>> I am reading a parquet file like this and it gives 
>>> java.lang.IllegalArgumentException.
>>> However i can work with other parquet files (such as nyc taxi parquet
>>> files) without any issue. I have copied the full error log as well. Can you
>>> please check once and let me know how to fix this?
>>>
>>> import pyspark
>>>
>>> from pyspark.sql import SparkSession
>>>
>>> spark=SparkSession.builder.appName("testPyspark").config("spark.executor.memory",
>>> "20g").config("spark.driver.memory", "50g").getOrCreate()
>>>
>>> df=spark.read.parquet("/data/202301/account_cycle")
>>>
>>> df.printSchema() # worksfine
>>>
>>> df.count() #worksfine
>>>
>>> df.show()# getting below error
>>>
>>> >>> df.show()
>>>
>>> 23/07/03 18:07:20 INFO FileSourceStrategy: Pushed Filters:
>>>
>>> 23/07/03 18:07:20 INFO FileSourceStrategy: Post-Scan Filters:
>>>
>>> 23/07/03 18:07:20 INFO FileSourceStrategy: Output Data Schema:
>>> struct>> account_status: string, currency_code: string, opened_dt: date ... 30 more
>>> fields>
>>>
>>> 23/07/03 18:07:20 INFO MemoryStore: Block broadcast_19 stored as values
>>> in memory (estimated size 540.6 KiB, free 26.5 GiB)
>>>
>>> 23/07/03 18:07:20 INFO MemoryStore: Block broadcast_19_piece0 stored as
>>> bytes in memory (estimated size 46.0 KiB, free 26.5 GiB)
>>>
>>> 23/07/03 18:07:20 INFO BlockManagerInfo: Added broadcast_19_piece0 in
>>> memory on mynode:41055 (size: 46.0 KiB, free: 26.5 GiB)
>>>
>>> 23/07/03 18:07:20 INFO SparkContext: Created broadcast 19 from
>>> showString at NativeMethodAccessorImpl.java:0
>>>
>>> 23/07/03 18:07:20 INFO FileSourceScanExec: Planning scan with bin
>>> packing, max size: 134217728 bytes, open cost is considered as scanning
>>> 4194304 bytes.
>>>
>>> 23/07/03 18:07:20 INFO SparkContext: Starting job: showString at
>>> NativeMethodAccessorImpl.java:0
>>>
>>> 23/07/03 18:07:20 INFO DAGScheduler: Got job 13 (showString at
>>> NativeMethodAccessorImpl.java:0) with 1 output partitions
>>>
>>> 23/07/03 18:07:20 INFO DAGScheduler: Final stage: ResultStage 14
>>> (showString at NativeMethodAccessorImpl.java:0)
>>>
>>> 23/07/03 18:07:20 INFO DAGScheduler: Parents of final stage: List()
>>>
>>> 23/07/03 18:07:20 INFO DAGScheduler: Missing parents: List()
>>>
>>> 23/07/03 18:07:20 INFO DAGScheduler: Submitting ResultStage 14
>>> (MapPartitionsRDD[42] at showString at NativeMethodAccessorImpl.java:0),
>>> which has no missing parents
>>>
>>> 23/07/03 18:07:20 INFO MemoryStore: Block broadcast_20 stored as values
>>> in memory (estimated size 38.1 KiB, free 26.5 GiB)
>>>
>>> 23/07/03 18:07:20 INFO MemoryStore: Block broadcast_20_piece0 stored as
>>> bytes in memory (estimated size 10.5 KiB, free 26.5 GiB)
>>>
>>> 23/07/03 18:07:20 INFO BlockManagerInfo: Added broadcast_20_piece0 in
>>> memory on mynode:41055 (size: 10.5 KiB, free: 26.5 GiB)
>>>
>>> 23/07/03 18:07:20 INFO SparkContext: Created broadcast 20 from broadcast
>>> at DAGScheduler.scala:1478
>>>
>>> 23/07/03 18:07:20 INFO DAGScheduler: Submitting 1 missing tasks from
>>> ResultStage 14 (MapPartitionsRDD[42] at showString at
>>> NativeMethodAccessorImpl.java:0) (first 15 tasks are for partitions
>>> Vector(0))
>>>
>>> 23/07/03 18:07:20 INFO TaskSchedulerImpl: Adding task set 14.0 with 1
>>> tasks resource profile 0
>>>
>>> 23/07/03 18:07:20 INFO TaskSetManager: Starting task 0.0 in stage 14.0
>>> (TID 48) (mynode, executor driver, partition 0, PROCESS_LOCAL, 4890 bytes)
>>> taskResourceAssignments Map()
>>>
>>> 23/07/03 18:07:20 INFO Executor: Running task 0.0 in stage 14.0 (TID 48)
>>>
>>> 23/07/03 18:07:20 INFO FileScanRDD: Reading File path:
>>> file:///data/202301/account_cycle/account_cycle-202301-53.parquet, range:
>>> 0-134217728, partition values: [empty row]
>>>
>>> 23/07/03 18:07:20 ERROR Executor: Exception in task 0.0 in stage 14.0
>>> (TID 48)
>>>
>>> java.lang.IllegalArgumentException
>>>
>>> at java.nio.Buffer.limit(Buffer.java:275)
>>>
>>> at org.xerial.snappy.Snappy.uncompress(Snappy.java:553)
>>>
>>> at
>>> org.apache.parquet.hadoop.codec.SnappyDecompressor.decompress(SnappyDecompressor.java:71)
>>>
>>> at
>>> 

Unsubscribe

2023-07-09 Thread chen...@birdiexx.com
Unsubscribe




Unable to populate spark metrics using custom metrics API

2023-07-08 Thread Surya Soma
Hello,

I am trying to publish custom metrics using Spark CustomMetric API as
supported since spark 3.2 https://github.com/apache/spark/pull/31476,

https://spark.apache.org/docs/3.2.0/api/java/org/apache/spark/sql/connector/metric/CustomMetric.html

I have created a custom metric implementing `CustomMetic` with default
constructor overriding name and description.
Created a new instance of the created custom metric in the
`supportedCustomMetrics` method of  `spark.sql.connector.read.Scan`.

Created a custom task metric implementing `CustomTaskMetric` with the same
name as that of CustomMetric class and initialized this in
`currentMetricsValues` of PartitionReader.

I have static values as of now but when I run the application, I see in the
spark history page the corresponding value to the metric as N/A.
I have added logs in the `aggregateTaskMetrics` and my flow is going into
it. The spark SQLAppStatusListener.aggregateMetrics is loading my class and
calling the `aggregateTaskMetrics` yet I still see N/A in the spark ui page.

Also, I do see the metrics in the spark events log.


Driver log:

```

23/06/23 19:23:53 INFO Spark32CustomMetric: Spark32CustomMetric in
aggregateTaskMetrics start
23/06/23 19:23:53 INFO Spark32CustomMetric: Spark32CustomMetric in
aggregateTaskMetrics sum:1234 end
+-+--+---+---+
| word|word_count| corpus|corpus_date|
+-+--+---+---+
| LVII| 1|sonnets|  0|
|   augurs| 1|sonnets|  0|
|   dimm'd| 1|sonnets|  0|```


Attaching the Spark UI page screenshot.

Am I missing something? Any help is really appreciated.

Thanks.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Unsubscribe

2023-07-08 Thread yixu2...@163.com

Unsubscribe 


yixu2...@163.com


Re: PySpark error java.lang.IllegalArgumentException

2023-07-07 Thread Brian Huynh
Hi Khalid,Elango mentioned the file is working fine in our another environment with the same driver and executor memoryBrianOn Jul 7, 2023, at 10:18 AM, Khalid Mammadov  wrote:Perhaps that parquet file is corrupted or got that is in that folder?To check, try to read that file with pandas or other tools to see if you can read without Spark.On Wed, 5 Jul 2023, 07:25 elango vaidyanathan,  wrote:Hi team,Any updates on this below issueOn Mon, 3 Jul 2023 at 6:18 PM, elango vaidyanathan  wrote: Hi all,
I am reading a parquet file like this and it gives java.lang.IllegalArgumentException. However i can work with other parquet files (such as nyc taxi parquet files) without any issue. I have copied the full error log as well. Can you please check once and let me know how to fix this?
import pyspark
from pyspark.sql import SparkSession
spark=SparkSession.builder.appName("testPyspark").config("spark.executor.memory", "20g").config("spark.driver.memory", "50g").getOrCreate()
df=spark.read.parquet("/data/202301/account_cycle")
df.printSchema() # worksfine
df.count() #worksfine
df.show()# getting below error
>>> df.show()
23/07/03 18:07:20 INFO FileSourceStrategy: Pushed Filters:
23/07/03 18:07:20 INFO FileSourceStrategy: Post-Scan Filters:
23/07/03 18:07:20 INFO FileSourceStrategy: Output Data Schema: struct
23/07/03 18:07:20 INFO MemoryStore: Block broadcast_19 stored as values in memory (estimated size 540.6 KiB, free 26.5 GiB)
23/07/03 18:07:20 INFO MemoryStore: Block broadcast_19_piece0 stored as bytes in memory (estimated size 46.0 KiB, free 26.5 GiB)
23/07/03 18:07:20 INFO BlockManagerInfo: Added broadcast_19_piece0 in memory on mynode:41055 (size: 46.0 KiB, free: 26.5 GiB)
23/07/03 18:07:20 INFO SparkContext: Created broadcast 19 from showString at NativeMethodAccessorImpl.java:0
23/07/03 18:07:20 INFO FileSourceScanExec: Planning scan with bin packing, max size: 134217728 bytes, open cost is considered as scanning 4194304 bytes.
23/07/03 18:07:20 INFO SparkContext: Starting job: showString at NativeMethodAccessorImpl.java:0
23/07/03 18:07:20 INFO DAGScheduler: Got job 13 (showString at NativeMethodAccessorImpl.java:0) with 1 output partitions
23/07/03 18:07:20 INFO DAGScheduler: Final stage: ResultStage 14 (showString at NativeMethodAccessorImpl.java:0)
23/07/03 18:07:20 INFO DAGScheduler: Parents of final stage: List()
23/07/03 18:07:20 INFO DAGScheduler: Missing parents: List()
23/07/03 18:07:20 INFO DAGScheduler: Submitting ResultStage 14 (MapPartitionsRDD[42] at showString at NativeMethodAccessorImpl.java:0), which has no missing parents
23/07/03 18:07:20 INFO MemoryStore: Block broadcast_20 stored as values in memory (estimated size 38.1 KiB, free 26.5 GiB)
23/07/03 18:07:20 INFO MemoryStore: Block broadcast_20_piece0 stored as bytes in memory (estimated size 10.5 KiB, free 26.5 GiB)
23/07/03 18:07:20 INFO BlockManagerInfo: Added broadcast_20_piece0 in memory on mynode:41055 (size: 10.5 KiB, free: 26.5 GiB)
23/07/03 18:07:20 INFO SparkContext: Created broadcast 20 from broadcast at DAGScheduler.scala:1478
23/07/03 18:07:20 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 14 (MapPartitionsRDD[42] at showString at NativeMethodAccessorImpl.java:0) (first 15 tasks are for partitions Vector(0))
23/07/03 18:07:20 INFO TaskSchedulerImpl: Adding task set 14.0 with 1 tasks resource profile 0
23/07/03 18:07:20 INFO TaskSetManager: Starting task 0.0 in stage 14.0 (TID 48) (mynode, executor driver, partition 0, PROCESS_LOCAL, 4890 bytes) taskResourceAssignments Map()
23/07/03 18:07:20 INFO Executor: Running task 0.0 in stage 14.0 (TID 48)
23/07/03 18:07:20 INFO FileScanRDD: Reading File path: file:///data/202301/account_cycle/account_cycle-202301-53.parquet, range: 0-134217728, partition values: [empty row]
23/07/03 18:07:20 ERROR Executor: Exception in task 0.0 in stage 14.0 (TID 48)
java.lang.IllegalArgumentException
    at java.nio.Buffer.limit(Buffer.java:275)
    at org.xerial.snappy.Snappy.uncompress(Snappy.java:553)
    at org.apache.parquet.hadoop.codec.SnappyDecompressor.decompress(SnappyDecompressor.java:71)
    at org.apache.parquet.hadoop.codec.NonBlockedDecompressorStream.read(NonBlockedDecompressorStream.java:51)
    at java.io.DataInputStream.readFully(DataInputStream.java:195)
    at java.io.DataInputStream.readFully(DataInputStream.java:169)
    at org.apache.parquet.bytes.BytesInput$StreamBytesInput.toByteArray(BytesInput.java:286)
    at org.apache.parquet.bytes.BytesInput.toByteBuffer(BytesInput.java:237)
    at org.apache.parquet.bytes.BytesInput.toInputStream(BytesInput.java:246)
    at org.apache.parquet.column.values.dictionary.PlainValuesDictionary$PlainLongDictionary.(PlainValuesDictionary.java:154)
    at org.apache.parquet.column.Encoding$1.initDictionary(Encoding.java:96)
    at org.apache.parquet.column.Encoding$5.initDictionary(Encoding.java:163)
    at 

Re: PySpark error java.lang.IllegalArgumentException

2023-07-07 Thread Khalid Mammadov
Perhaps that parquet file is corrupted or got that is in that folder?
To check, try to read that file with pandas or other tools to see if you
can read without Spark.

On Wed, 5 Jul 2023, 07:25 elango vaidyanathan,  wrote:

>
> Hi team,
>
> Any updates on this below issue
>
> On Mon, 3 Jul 2023 at 6:18 PM, elango vaidyanathan 
> wrote:
>
>>
>>
>> Hi all,
>>
>> I am reading a parquet file like this and it gives 
>> java.lang.IllegalArgumentException.
>> However i can work with other parquet files (such as nyc taxi parquet
>> files) without any issue. I have copied the full error log as well. Can you
>> please check once and let me know how to fix this?
>>
>> import pyspark
>>
>> from pyspark.sql import SparkSession
>>
>> spark=SparkSession.builder.appName("testPyspark").config("spark.executor.memory",
>> "20g").config("spark.driver.memory", "50g").getOrCreate()
>>
>> df=spark.read.parquet("/data/202301/account_cycle")
>>
>> df.printSchema() # worksfine
>>
>> df.count() #worksfine
>>
>> df.show()# getting below error
>>
>> >>> df.show()
>>
>> 23/07/03 18:07:20 INFO FileSourceStrategy: Pushed Filters:
>>
>> 23/07/03 18:07:20 INFO FileSourceStrategy: Post-Scan Filters:
>>
>> 23/07/03 18:07:20 INFO FileSourceStrategy: Output Data Schema:
>> struct> account_status: string, currency_code: string, opened_dt: date ... 30 more
>> fields>
>>
>> 23/07/03 18:07:20 INFO MemoryStore: Block broadcast_19 stored as values
>> in memory (estimated size 540.6 KiB, free 26.5 GiB)
>>
>> 23/07/03 18:07:20 INFO MemoryStore: Block broadcast_19_piece0 stored as
>> bytes in memory (estimated size 46.0 KiB, free 26.5 GiB)
>>
>> 23/07/03 18:07:20 INFO BlockManagerInfo: Added broadcast_19_piece0 in
>> memory on mynode:41055 (size: 46.0 KiB, free: 26.5 GiB)
>>
>> 23/07/03 18:07:20 INFO SparkContext: Created broadcast 19 from showString
>> at NativeMethodAccessorImpl.java:0
>>
>> 23/07/03 18:07:20 INFO FileSourceScanExec: Planning scan with bin
>> packing, max size: 134217728 bytes, open cost is considered as scanning
>> 4194304 bytes.
>>
>> 23/07/03 18:07:20 INFO SparkContext: Starting job: showString at
>> NativeMethodAccessorImpl.java:0
>>
>> 23/07/03 18:07:20 INFO DAGScheduler: Got job 13 (showString at
>> NativeMethodAccessorImpl.java:0) with 1 output partitions
>>
>> 23/07/03 18:07:20 INFO DAGScheduler: Final stage: ResultStage 14
>> (showString at NativeMethodAccessorImpl.java:0)
>>
>> 23/07/03 18:07:20 INFO DAGScheduler: Parents of final stage: List()
>>
>> 23/07/03 18:07:20 INFO DAGScheduler: Missing parents: List()
>>
>> 23/07/03 18:07:20 INFO DAGScheduler: Submitting ResultStage 14
>> (MapPartitionsRDD[42] at showString at NativeMethodAccessorImpl.java:0),
>> which has no missing parents
>>
>> 23/07/03 18:07:20 INFO MemoryStore: Block broadcast_20 stored as values
>> in memory (estimated size 38.1 KiB, free 26.5 GiB)
>>
>> 23/07/03 18:07:20 INFO MemoryStore: Block broadcast_20_piece0 stored as
>> bytes in memory (estimated size 10.5 KiB, free 26.5 GiB)
>>
>> 23/07/03 18:07:20 INFO BlockManagerInfo: Added broadcast_20_piece0 in
>> memory on mynode:41055 (size: 10.5 KiB, free: 26.5 GiB)
>>
>> 23/07/03 18:07:20 INFO SparkContext: Created broadcast 20 from broadcast
>> at DAGScheduler.scala:1478
>>
>> 23/07/03 18:07:20 INFO DAGScheduler: Submitting 1 missing tasks from
>> ResultStage 14 (MapPartitionsRDD[42] at showString at
>> NativeMethodAccessorImpl.java:0) (first 15 tasks are for partitions
>> Vector(0))
>>
>> 23/07/03 18:07:20 INFO TaskSchedulerImpl: Adding task set 14.0 with 1
>> tasks resource profile 0
>>
>> 23/07/03 18:07:20 INFO TaskSetManager: Starting task 0.0 in stage 14.0
>> (TID 48) (mynode, executor driver, partition 0, PROCESS_LOCAL, 4890 bytes)
>> taskResourceAssignments Map()
>>
>> 23/07/03 18:07:20 INFO Executor: Running task 0.0 in stage 14.0 (TID 48)
>>
>> 23/07/03 18:07:20 INFO FileScanRDD: Reading File path:
>> file:///data/202301/account_cycle/account_cycle-202301-53.parquet, range:
>> 0-134217728, partition values: [empty row]
>>
>> 23/07/03 18:07:20 ERROR Executor: Exception in task 0.0 in stage 14.0
>> (TID 48)
>>
>> java.lang.IllegalArgumentException
>>
>> at java.nio.Buffer.limit(Buffer.java:275)
>>
>> at org.xerial.snappy.Snappy.uncompress(Snappy.java:553)
>>
>> at
>> org.apache.parquet.hadoop.codec.SnappyDecompressor.decompress(SnappyDecompressor.java:71)
>>
>> at
>> org.apache.parquet.hadoop.codec.NonBlockedDecompressorStream.read(NonBlockedDecompressorStream.java:51)
>>
>> at java.io.DataInputStream.readFully(DataInputStream.java:195)
>>
>> at java.io.DataInputStream.readFully(DataInputStream.java:169)
>>
>> at
>> org.apache.parquet.bytes.BytesInput$StreamBytesInput.toByteArray(BytesInput.java:286)
>>
>> at
>> org.apache.parquet.bytes.BytesInput.toByteBuffer(BytesInput.java:237)
>>
>> at
>> org.apache.parquet.bytes.BytesInput.toInputStream(BytesInput.java:246)
>>
>> at
>> 

Re: Unsubscribe

2023-07-07 Thread Atheeth SH
please send an empty email to:
user-unsubscr...@spark.apache.org
to unsubscribe yourself from the list.

Thanks


On Fri, 7 Jul 2023 at 12:05, Mihai Musat  wrote:

> Unsubscribe
>


Unsubscribe

2023-07-07 Thread Mihai Musat
Unsubscribe


Spark UI - Bug Executors tab when using proxy port

2023-07-06 Thread Bruno Pistone
Hello everyone,

I’m really sorry to use this mailing list, but seems impossible to notify a 
strange behaviour that is happening with the Spark UI. I’m sending also the 
link to the stackoverflow question here 
https://stackoverflow.com/questions/76632692/spark-ui-executors-tab-its-empty

I’m trying to run the Spark UI on a web server. I need to configure a specific 
port for running the UI and a redirect URL. I’m setting up the following OPTS:

```
export SPARK_HISTORY_OPTS="-Dspark.history.fs.logDirectory=${LOCAL_PATH_LOGS}
-Dspark.history.ui.port=18080 
-Dspark.eventLog.enabled=true 
-Dspark.ui.proxyRedirectUri=${SERVER_URL}"

./start-history-server.sh
``

What is happening: The UI is accessible through the url 
https://${SERVER_URL}/proxy/18080 

When I’m selecting an application and I’m clicking on the tab “Executors”, it 
remains empty. By looking at the API calls done by the UI, I see there is the 
"/allexecutors” which returns 404.

Instead of calling 
https://${SERVER_URL}/proxy/18080/api/v1/applications/${APP_ID}/allexecutors 

I see that the URL called is 
https://${SERVER_URL}/proxy/18080/api/v1/applications/18080/allexecutors 


Seems that the appId is not correctly identified. Can you please provide a 
solution for this, or an estimated date for fixing the error?

Thank you,

Re: PySpark error java.lang.IllegalArgumentException

2023-07-05 Thread elango vaidyanathan
Hi team,

Any updates on this below issue

On Mon, 3 Jul 2023 at 6:18 PM, elango vaidyanathan 
wrote:

>
>
> Hi all,
>
> I am reading a parquet file like this and it gives 
> java.lang.IllegalArgumentException.
> However i can work with other parquet files (such as nyc taxi parquet
> files) without any issue. I have copied the full error log as well. Can you
> please check once and let me know how to fix this?
>
> import pyspark
>
> from pyspark.sql import SparkSession
>
> spark=SparkSession.builder.appName("testPyspark").config("spark.executor.memory",
> "20g").config("spark.driver.memory", "50g").getOrCreate()
>
> df=spark.read.parquet("/data/202301/account_cycle")
>
> df.printSchema() # worksfine
>
> df.count() #worksfine
>
> df.show()# getting below error
>
> >>> df.show()
>
> 23/07/03 18:07:20 INFO FileSourceStrategy: Pushed Filters:
>
> 23/07/03 18:07:20 INFO FileSourceStrategy: Post-Scan Filters:
>
> 23/07/03 18:07:20 INFO FileSourceStrategy: Output Data Schema:
> struct account_status: string, currency_code: string, opened_dt: date ... 30 more
> fields>
>
> 23/07/03 18:07:20 INFO MemoryStore: Block broadcast_19 stored as values in
> memory (estimated size 540.6 KiB, free 26.5 GiB)
>
> 23/07/03 18:07:20 INFO MemoryStore: Block broadcast_19_piece0 stored as
> bytes in memory (estimated size 46.0 KiB, free 26.5 GiB)
>
> 23/07/03 18:07:20 INFO BlockManagerInfo: Added broadcast_19_piece0 in
> memory on mynode:41055 (size: 46.0 KiB, free: 26.5 GiB)
>
> 23/07/03 18:07:20 INFO SparkContext: Created broadcast 19 from showString
> at NativeMethodAccessorImpl.java:0
>
> 23/07/03 18:07:20 INFO FileSourceScanExec: Planning scan with bin packing,
> max size: 134217728 bytes, open cost is considered as scanning 4194304
> bytes.
>
> 23/07/03 18:07:20 INFO SparkContext: Starting job: showString at
> NativeMethodAccessorImpl.java:0
>
> 23/07/03 18:07:20 INFO DAGScheduler: Got job 13 (showString at
> NativeMethodAccessorImpl.java:0) with 1 output partitions
>
> 23/07/03 18:07:20 INFO DAGScheduler: Final stage: ResultStage 14
> (showString at NativeMethodAccessorImpl.java:0)
>
> 23/07/03 18:07:20 INFO DAGScheduler: Parents of final stage: List()
>
> 23/07/03 18:07:20 INFO DAGScheduler: Missing parents: List()
>
> 23/07/03 18:07:20 INFO DAGScheduler: Submitting ResultStage 14
> (MapPartitionsRDD[42] at showString at NativeMethodAccessorImpl.java:0),
> which has no missing parents
>
> 23/07/03 18:07:20 INFO MemoryStore: Block broadcast_20 stored as values in
> memory (estimated size 38.1 KiB, free 26.5 GiB)
>
> 23/07/03 18:07:20 INFO MemoryStore: Block broadcast_20_piece0 stored as
> bytes in memory (estimated size 10.5 KiB, free 26.5 GiB)
>
> 23/07/03 18:07:20 INFO BlockManagerInfo: Added broadcast_20_piece0 in
> memory on mynode:41055 (size: 10.5 KiB, free: 26.5 GiB)
>
> 23/07/03 18:07:20 INFO SparkContext: Created broadcast 20 from broadcast
> at DAGScheduler.scala:1478
>
> 23/07/03 18:07:20 INFO DAGScheduler: Submitting 1 missing tasks from
> ResultStage 14 (MapPartitionsRDD[42] at showString at
> NativeMethodAccessorImpl.java:0) (first 15 tasks are for partitions
> Vector(0))
>
> 23/07/03 18:07:20 INFO TaskSchedulerImpl: Adding task set 14.0 with 1
> tasks resource profile 0
>
> 23/07/03 18:07:20 INFO TaskSetManager: Starting task 0.0 in stage 14.0
> (TID 48) (mynode, executor driver, partition 0, PROCESS_LOCAL, 4890 bytes)
> taskResourceAssignments Map()
>
> 23/07/03 18:07:20 INFO Executor: Running task 0.0 in stage 14.0 (TID 48)
>
> 23/07/03 18:07:20 INFO FileScanRDD: Reading File path:
> file:///data/202301/account_cycle/account_cycle-202301-53.parquet, range:
> 0-134217728, partition values: [empty row]
>
> 23/07/03 18:07:20 ERROR Executor: Exception in task 0.0 in stage 14.0 (TID
> 48)
>
> java.lang.IllegalArgumentException
>
> at java.nio.Buffer.limit(Buffer.java:275)
>
> at org.xerial.snappy.Snappy.uncompress(Snappy.java:553)
>
> at
> org.apache.parquet.hadoop.codec.SnappyDecompressor.decompress(SnappyDecompressor.java:71)
>
> at
> org.apache.parquet.hadoop.codec.NonBlockedDecompressorStream.read(NonBlockedDecompressorStream.java:51)
>
> at java.io.DataInputStream.readFully(DataInputStream.java:195)
>
> at java.io.DataInputStream.readFully(DataInputStream.java:169)
>
> at
> org.apache.parquet.bytes.BytesInput$StreamBytesInput.toByteArray(BytesInput.java:286)
>
> at
> org.apache.parquet.bytes.BytesInput.toByteBuffer(BytesInput.java:237)
>
> at
> org.apache.parquet.bytes.BytesInput.toInputStream(BytesInput.java:246)
>
> at
> org.apache.parquet.column.values.dictionary.PlainValuesDictionary$PlainLongDictionary.(PlainValuesDictionary.java:154)
>
> at
> org.apache.parquet.column.Encoding$1.initDictionary(Encoding.java:96)
>
> at
> org.apache.parquet.column.Encoding$5.initDictionary(Encoding.java:163)
>
> at
> 

Performance Issue with Column Addition in Spark 3.4.x: Time Doubling with Increased Columns

2023-07-04 Thread KO Dukhyun
Dear spark users,

I'm experiencing an unusual issue with Spark 3.4.x.
When creating a new column as the sum of several existing columns, the time 
taken almost doubles as the number of columns increases. This operation doesn't 
require much resources, so I suspect there might be a problem with the parse 
engine.

This phenomenon did not occur in versions prior to 3.3.x. I've attached a 
simple example below.


//example code
val schema = StructType((1 to 100).map(x => StructField(s"c$x", IntegerType)))
val data = Row.fromSeq(Seq.fill(100)(1))
val df = spark.createDataFrame(spark.sparkContext.parallelize(Seq(data)), 
schema=schema)
val sumCols = (1 to 31).map(x => s"c$x").toList
spark.time{df.withColumn("sumofcols", sumCols.map(col).reduce(_+_)).count}
//=
Time taken: 288213 ms
res13: Long = 1L

With spark 3.3.2, last line takes about 150ms.

Is there any known problem like this?

regards,

Dukhyun Ko


Re: Filtering JSON records when there isn't an exact schema match in Spark

2023-07-04 Thread Shashank Rao
Z is just an example. It could be anything. Basically, anything that's not
in schema should be filtered out.

On Tue, 4 Jul 2023, 13:27 Hill Liu,  wrote:

> I think you can define schema with column z and filter out records with z
> is null.
>
> On Tue, Jul 4, 2023 at 3:24 PM Shashank Rao 
> wrote:
>
>> Yes, drop malformed does filter out record4. However, record 5 is not.
>>
>> On Tue, 4 Jul 2023 at 07:41, Vikas Kumar  wrote:
>>
>>> Have you tried dropmalformed option ?
>>>
>>> On Mon, Jul 3, 2023, 1:34 PM Shashank Rao 
>>> wrote:
>>>
 Update: Got it working by using the *_corrupt_record *field for the
 first case (record 4)

 schema = schema.add("_corrupt_record", DataTypes.StringType);
 Dataset ds = spark.read().schema(schema).option("mode",
 "PERMISSIVE").json("path").collect();
 ds = ds.filter(functions.col("_corrupt_record").isNull()).collect();

 However, I haven't figured out on how to ignore record 5.

 Any help is appreciated.

 On Mon, 3 Jul 2023 at 19:24, Shashank Rao 
 wrote:

> Hi all,
> I'm trying to read around 1,000,000 JSONL files present in S3 using
> Spark. Once read, I need to write them to BigQuery.
> I have a schema that may not be an exact match with all the records.
> How can I filter records where there isn't an exact schema match:
>
> Eg: if my records were:
> {"x": 1, "y": 1}
> {"x": 2, "y": 2}
> {"x": 3, "y": 3}
> {"x": 4, "y": "4"}
> {"x": 5, "y": 5, "z": 5}
>
> and if my schema were:
> root
>  |-- x: long (nullable = true)
>  |-- y: long (nullable = true)
>
> I need the records 4 and 5 to be filtered out.
> Record 4 should be filtered out since y is a string instead of long.
> Record 5 should be filtered out since z is not part of the schema.
>
> I tried applying my schema on read, but it does not work as needed:
>
> StructType schema = new StructType().add("a",
> DataTypes.LongType).add("b", DataTypes.LongType);
> Dataset ds = spark.read().schema(schema).json("path/to/file")
>
> This gives me a dataset that has record 4 with y=null and record 5
> with x and y.
>
> Any help is appreciated.
>
> --
> Thanks,
> Shashank Rao
>


 --
 Regards,
 Shashank Rao

>>>
>>
>> --
>> Regards,
>> Shashank Rao
>>
>


Re: Filtering JSON records when there isn't an exact schema match in Spark

2023-07-04 Thread Hill Liu
I think you can define schema with column z and filter out records with z
is null.

On Tue, Jul 4, 2023 at 3:24 PM Shashank Rao  wrote:

> Yes, drop malformed does filter out record4. However, record 5 is not.
>
> On Tue, 4 Jul 2023 at 07:41, Vikas Kumar  wrote:
>
>> Have you tried dropmalformed option ?
>>
>> On Mon, Jul 3, 2023, 1:34 PM Shashank Rao 
>> wrote:
>>
>>> Update: Got it working by using the *_corrupt_record *field for the
>>> first case (record 4)
>>>
>>> schema = schema.add("_corrupt_record", DataTypes.StringType);
>>> Dataset ds = spark.read().schema(schema).option("mode",
>>> "PERMISSIVE").json("path").collect();
>>> ds = ds.filter(functions.col("_corrupt_record").isNull()).collect();
>>>
>>> However, I haven't figured out on how to ignore record 5.
>>>
>>> Any help is appreciated.
>>>
>>> On Mon, 3 Jul 2023 at 19:24, Shashank Rao 
>>> wrote:
>>>
 Hi all,
 I'm trying to read around 1,000,000 JSONL files present in S3 using
 Spark. Once read, I need to write them to BigQuery.
 I have a schema that may not be an exact match with all the records.
 How can I filter records where there isn't an exact schema match:

 Eg: if my records were:
 {"x": 1, "y": 1}
 {"x": 2, "y": 2}
 {"x": 3, "y": 3}
 {"x": 4, "y": "4"}
 {"x": 5, "y": 5, "z": 5}

 and if my schema were:
 root
  |-- x: long (nullable = true)
  |-- y: long (nullable = true)

 I need the records 4 and 5 to be filtered out.
 Record 4 should be filtered out since y is a string instead of long.
 Record 5 should be filtered out since z is not part of the schema.

 I tried applying my schema on read, but it does not work as needed:

 StructType schema = new StructType().add("a",
 DataTypes.LongType).add("b", DataTypes.LongType);
 Dataset ds = spark.read().schema(schema).json("path/to/file")

 This gives me a dataset that has record 4 with y=null and record 5 with
 x and y.

 Any help is appreciated.

 --
 Thanks,
 Shashank Rao

>>>
>>>
>>> --
>>> Regards,
>>> Shashank Rao
>>>
>>
>
> --
> Regards,
> Shashank Rao
>


Re: Filtering JSON records when there isn't an exact schema match in Spark

2023-07-04 Thread Shashank Rao
Yes, drop malformed does filter out record4. However, record 5 is not.

On Tue, 4 Jul 2023 at 07:41, Vikas Kumar  wrote:

> Have you tried dropmalformed option ?
>
> On Mon, Jul 3, 2023, 1:34 PM Shashank Rao  wrote:
>
>> Update: Got it working by using the *_corrupt_record *field for the
>> first case (record 4)
>>
>> schema = schema.add("_corrupt_record", DataTypes.StringType);
>> Dataset ds = spark.read().schema(schema).option("mode",
>> "PERMISSIVE").json("path").collect();
>> ds = ds.filter(functions.col("_corrupt_record").isNull()).collect();
>>
>> However, I haven't figured out on how to ignore record 5.
>>
>> Any help is appreciated.
>>
>> On Mon, 3 Jul 2023 at 19:24, Shashank Rao 
>> wrote:
>>
>>> Hi all,
>>> I'm trying to read around 1,000,000 JSONL files present in S3 using
>>> Spark. Once read, I need to write them to BigQuery.
>>> I have a schema that may not be an exact match with all the records.
>>> How can I filter records where there isn't an exact schema match:
>>>
>>> Eg: if my records were:
>>> {"x": 1, "y": 1}
>>> {"x": 2, "y": 2}
>>> {"x": 3, "y": 3}
>>> {"x": 4, "y": "4"}
>>> {"x": 5, "y": 5, "z": 5}
>>>
>>> and if my schema were:
>>> root
>>>  |-- x: long (nullable = true)
>>>  |-- y: long (nullable = true)
>>>
>>> I need the records 4 and 5 to be filtered out.
>>> Record 4 should be filtered out since y is a string instead of long.
>>> Record 5 should be filtered out since z is not part of the schema.
>>>
>>> I tried applying my schema on read, but it does not work as needed:
>>>
>>> StructType schema = new StructType().add("a",
>>> DataTypes.LongType).add("b", DataTypes.LongType);
>>> Dataset ds = spark.read().schema(schema).json("path/to/file")
>>>
>>> This gives me a dataset that has record 4 with y=null and record 5 with
>>> x and y.
>>>
>>> Any help is appreciated.
>>>
>>> --
>>> Thanks,
>>> Shashank Rao
>>>
>>
>>
>> --
>> Regards,
>> Shashank Rao
>>
>

-- 
Regards,
Shashank Rao


Re: Filtering JSON records when there isn't an exact schema match in Spark

2023-07-03 Thread Vikas Kumar
Have you tried dropmalformed option ?

On Mon, Jul 3, 2023, 1:34 PM Shashank Rao  wrote:

> Update: Got it working by using the *_corrupt_record *field for the first
> case (record 4)
>
> schema = schema.add("_corrupt_record", DataTypes.StringType);
> Dataset ds = spark.read().schema(schema).option("mode",
> "PERMISSIVE").json("path").collect();
> ds = ds.filter(functions.col("_corrupt_record").isNull()).collect();
>
> However, I haven't figured out on how to ignore record 5.
>
> Any help is appreciated.
>
> On Mon, 3 Jul 2023 at 19:24, Shashank Rao  wrote:
>
>> Hi all,
>> I'm trying to read around 1,000,000 JSONL files present in S3 using
>> Spark. Once read, I need to write them to BigQuery.
>> I have a schema that may not be an exact match with all the records.
>> How can I filter records where there isn't an exact schema match:
>>
>> Eg: if my records were:
>> {"x": 1, "y": 1}
>> {"x": 2, "y": 2}
>> {"x": 3, "y": 3}
>> {"x": 4, "y": "4"}
>> {"x": 5, "y": 5, "z": 5}
>>
>> and if my schema were:
>> root
>>  |-- x: long (nullable = true)
>>  |-- y: long (nullable = true)
>>
>> I need the records 4 and 5 to be filtered out.
>> Record 4 should be filtered out since y is a string instead of long.
>> Record 5 should be filtered out since z is not part of the schema.
>>
>> I tried applying my schema on read, but it does not work as needed:
>>
>> StructType schema = new StructType().add("a",
>> DataTypes.LongType).add("b", DataTypes.LongType);
>> Dataset ds = spark.read().schema(schema).json("path/to/file")
>>
>> This gives me a dataset that has record 4 with y=null and record 5 with x
>> and y.
>>
>> Any help is appreciated.
>>
>> --
>> Thanks,
>> Shashank Rao
>>
>
>
> --
> Regards,
> Shashank Rao
>


Re: Introducing English SDK for Apache Spark - Seeking Your Feedback and Contributions

2023-07-03 Thread Gavin Ray
Wow, really neat -- thanks for sharing!

On Mon, Jul 3, 2023 at 8:12 PM Gengliang Wang  wrote:

> Dear Apache Spark community,
>
> We are delighted to announce the launch of a groundbreaking tool that aims
> to make Apache Spark more user-friendly and accessible - the English SDK
> . Powered by the
> application of Generative AI, the English SDK
>  allows you to execute
> complex tasks with simple English instructions. This exciting news was 
> announced
> recently at the Data+AI Summit
>  and also introduced
> through a detailed blog post
> 
> .
>
> Now, we need your invaluable feedback and contributions. The aim of the
> English SDK is not only to simplify and enrich your Apache Spark experience
> but also to grow with the community. We're calling upon Spark developers
> and users to explore this innovative tool, offer your insights, provide
> feedback, and contribute to its evolution.
>
> You can find more details about the SDK and usage examples on the GitHub
> repository https://github.com/databrickslabs/pyspark-ai/. If you have any
> feedback or suggestions, please feel free to open an issue directly on the
> repository. We are actively monitoring the issues and value your insights.
>
> We also welcome pull requests and are eager to see how you might extend or
> refine this tool. Let's come together to continue making Apache Spark more
> approachable and user-friendly.
>
> Thank you in advance for your attention and involvement. We look forward
> to hearing your thoughts and seeing your contributions!
>
> Best,
> Gengliang Wang
>


Re: Introducing English SDK for Apache Spark - Seeking Your Feedback and Contributions

2023-07-03 Thread Hyukjin Kwon
The demo was really amazing.

On Tue, 4 Jul 2023 at 09:17, Farshid Ashouri 
wrote:

> This is wonderful news!
>
> On Tue, 4 Jul 2023 at 01:14, Gengliang Wang  wrote:
>
>> Dear Apache Spark community,
>>
>> We are delighted to announce the launch of a groundbreaking tool that
>> aims to make Apache Spark more user-friendly and accessible - the
>> English SDK . Powered by
>> the application of Generative AI, the English SDK
>>  allows you to execute
>> complex tasks with simple English instructions. This exciting news was 
>> announced
>> recently at the Data+AI Summit
>>  and also introduced
>> through a detailed blog post
>> 
>> .
>>
>> Now, we need your invaluable feedback and contributions. The aim of the
>> English SDK is not only to simplify and enrich your Apache Spark experience
>> but also to grow with the community. We're calling upon Spark developers
>> and users to explore this innovative tool, offer your insights, provide
>> feedback, and contribute to its evolution.
>>
>> You can find more details about the SDK and usage examples on the GitHub
>> repository https://github.com/databrickslabs/pyspark-ai/. If you have
>> any feedback or suggestions, please feel free to open an issue directly on
>> the repository. We are actively monitoring the issues and value your
>> insights.
>>
>> We also welcome pull requests and are eager to see how you might extend
>> or refine this tool. Let's come together to continue making Apache Spark
>> more approachable and user-friendly.
>>
>> Thank you in advance for your attention and involvement. We look forward
>> to hearing your thoughts and seeing your contributions!
>>
>> Best,
>> Gengliang Wang
>>
> --
>
>
> *Farshid Ashouri*,
> Senior Vice President,
> J.P. Morgan & Chase Co.
> +44 7932 650 788
>
>


Re: Introducing English SDK for Apache Spark - Seeking Your Feedback and Contributions

2023-07-03 Thread Farshid Ashouri
This is wonderful news!

On Tue, 4 Jul 2023 at 01:14, Gengliang Wang  wrote:

> Dear Apache Spark community,
>
> We are delighted to announce the launch of a groundbreaking tool that aims
> to make Apache Spark more user-friendly and accessible - the English SDK
> . Powered by the
> application of Generative AI, the English SDK
>  allows you to execute
> complex tasks with simple English instructions. This exciting news was 
> announced
> recently at the Data+AI Summit
>  and also introduced
> through a detailed blog post
> 
> .
>
> Now, we need your invaluable feedback and contributions. The aim of the
> English SDK is not only to simplify and enrich your Apache Spark experience
> but also to grow with the community. We're calling upon Spark developers
> and users to explore this innovative tool, offer your insights, provide
> feedback, and contribute to its evolution.
>
> You can find more details about the SDK and usage examples on the GitHub
> repository https://github.com/databrickslabs/pyspark-ai/. If you have any
> feedback or suggestions, please feel free to open an issue directly on the
> repository. We are actively monitoring the issues and value your insights.
>
> We also welcome pull requests and are eager to see how you might extend or
> refine this tool. Let's come together to continue making Apache Spark more
> approachable and user-friendly.
>
> Thank you in advance for your attention and involvement. We look forward
> to hearing your thoughts and seeing your contributions!
>
> Best,
> Gengliang Wang
>
-- 


*Farshid Ashouri*,
Senior Vice President,
J.P. Morgan & Chase Co.
+44 7932 650 788


Introducing English SDK for Apache Spark - Seeking Your Feedback and Contributions

2023-07-03 Thread Gengliang Wang
Dear Apache Spark community,

We are delighted to announce the launch of a groundbreaking tool that aims
to make Apache Spark more user-friendly and accessible - the English SDK
. Powered by the application
of Generative AI, the English SDK
 allows you to execute
complex tasks with simple English instructions. This exciting news was
announced
recently at the Data+AI Summit
 and also introduced
through a detailed blog post

.

Now, we need your invaluable feedback and contributions. The aim of the
English SDK is not only to simplify and enrich your Apache Spark experience
but also to grow with the community. We're calling upon Spark developers
and users to explore this innovative tool, offer your insights, provide
feedback, and contribute to its evolution.

You can find more details about the SDK and usage examples on the GitHub
repository https://github.com/databrickslabs/pyspark-ai/. If you have any
feedback or suggestions, please feel free to open an issue directly on the
repository. We are actively monitoring the issues and value your insights.

We also welcome pull requests and are eager to see how you might extend or
refine this tool. Let's come together to continue making Apache Spark more
approachable and user-friendly.

Thank you in advance for your attention and involvement. We look forward to
hearing your thoughts and seeing your contributions!

Best,
Gengliang Wang


Re: Filtering JSON records when there isn't an exact schema match in Spark

2023-07-03 Thread Shashank Rao
Update: Got it working by using the *_corrupt_record *field for the first
case (record 4)

schema = schema.add("_corrupt_record", DataTypes.StringType);
Dataset ds = spark.read().schema(schema).option("mode",
"PERMISSIVE").json("path").collect();
ds = ds.filter(functions.col("_corrupt_record").isNull()).collect();

However, I haven't figured out on how to ignore record 5.

Any help is appreciated.

On Mon, 3 Jul 2023 at 19:24, Shashank Rao  wrote:

> Hi all,
> I'm trying to read around 1,000,000 JSONL files present in S3 using Spark.
> Once read, I need to write them to BigQuery.
> I have a schema that may not be an exact match with all the records.
> How can I filter records where there isn't an exact schema match:
>
> Eg: if my records were:
> {"x": 1, "y": 1}
> {"x": 2, "y": 2}
> {"x": 3, "y": 3}
> {"x": 4, "y": "4"}
> {"x": 5, "y": 5, "z": 5}
>
> and if my schema were:
> root
>  |-- x: long (nullable = true)
>  |-- y: long (nullable = true)
>
> I need the records 4 and 5 to be filtered out.
> Record 4 should be filtered out since y is a string instead of long.
> Record 5 should be filtered out since z is not part of the schema.
>
> I tried applying my schema on read, but it does not work as needed:
>
> StructType schema = new StructType().add("a", DataTypes.LongType).add("b",
> DataTypes.LongType);
> Dataset ds = spark.read().schema(schema).json("path/to/file")
>
> This gives me a dataset that has record 4 with y=null and record 5 with x
> and y.
>
> Any help is appreciated.
>
> --
> Thanks,
> Shashank Rao
>


-- 
Regards,
Shashank Rao


Re: [Spark SQL] Data objects from query history

2023-07-03 Thread Jack Wells
 Hi Ruben,

I’m not sure if this answers your question, but if you’re interested in
exploring the underlying tables, you could always try something like the
below in a Databricks notebook:

display(spark.read.table(’samples.nyctaxi.trips’))

(For vanilla Spark users, it would be
spark.read.table(’samples.nyctaxi.trips’).show(100, False) )

Since you’re using Databricks, you can also find the data under the Data
menu, scroll down to the samples metastore then click through to trips to
find the file location, schema, and sample data.

On Jun 29, 2023 at 23:53:25, Ruben Mennes  wrote:

> Dear Apache Spark community,
>
> I hope this email finds you well. My name is Ruben, and I am an
> enthusiastic user of Apache Spark, specifically through the Databricks
> platform. I am reaching out to you today to seek your assistance and
> guidance regarding a specific use case.
>
> I have been exploring the capabilities of Spark SQL and Databricks, and I
> have encountered a challenge related to accessing the data objects used by
> queries from the query history. I am aware that Databricks provides a
> comprehensive query history that contains valuable information about
> executed queries.
>
> However, my objective is to extract the underlying data objects (tables)
> involved in each query. By doing so, I aim to analyze and understand the
> dependencies between queries and the data they operate on. This information
> will provide us new insights in how data is used across our data platform.
>
> I have attempted to leverage the Spark SQL Antlr grammar, available at
> https://github.com/apache/spark/blob/master/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4,
> to parse the queries retrieved from the query history. Unfortunately, I
> have encountered difficulties when parsing more complex queries.
>
> As an example, I have struggled to parse queries with intricate constructs
> such as the following:
>
> SELECT
>   concat(pickup_zip, '-', dropoff_zip) as route,
>   AVG(fare_amount) as average_fare
> FROM
>   `samples`.`nyctaxi`.`trips`
> GROUP BY
>   1
> ORDER BY
>   2 DESC
> LIMIT 1000
>
> I would greatly appreciate it if you could provide me with some guidance
> on how to overcome these challenges. Specifically, I am interested in
> understanding if there are alternative approaches or existing tools that
> can help me achieve my goal of extracting the data objects used by queries
> from the Databricks query history.
>
> Additionally, if there are any resources, documentation, or examples that
> provide further clarity on this topic, I would be more than grateful to
> receive them. Any insights you can provide would be of immense help in
> advancing my understanding and enabling me to make the most of the Spark
> SQL and Databricks ecosystem.
>
> Thank you very much for your time and support. I eagerly look forward to
> hearing from you and benefiting from your expertise.
>
> Best regards,
> Ruben Mennes
>


Filtering JSON records when there isn't an exact schema match in Spark

2023-07-03 Thread Shashank Rao
Hi all,
I'm trying to read around 1,000,000 JSONL files present in S3 using Spark.
Once read, I need to write them to BigQuery.
I have a schema that may not be an exact match with all the records.
How can I filter records where there isn't an exact schema match:

Eg: if my records were:
{"x": 1, "y": 1}
{"x": 2, "y": 2}
{"x": 3, "y": 3}
{"x": 4, "y": "4"}
{"x": 5, "y": 5, "z": 5}

and if my schema were:
root
 |-- x: long (nullable = true)
 |-- y: long (nullable = true)

I need the records 4 and 5 to be filtered out.
Record 4 should be filtered out since y is a string instead of long.
Record 5 should be filtered out since z is not part of the schema.

I tried applying my schema on read, but it does not work as needed:

StructType schema = new StructType().add("a", DataTypes.LongType).add("b",
DataTypes.LongType);
Dataset ds = spark.read().schema(schema).json("path/to/file")

This gives me a dataset that has record 4 with y=null and record 5 with x
and y.

Any help is appreciated.

-- 
Thanks,
Shashank Rao


CFP for the 2nd Performance Engineering track at Community over Code NA 2023

2023-07-03 Thread Brebner, Paul
Hi Apache Spark people - There are only 10 days left to submit a talk proposal 
(title and abstract only) for Community over Code NA 2023 - the 2nd Performance 
Engineering track is on this year so any Apache project-related performance and 
scalability talks are welcome, here's the CFP for more ideas and links 
including the CPF submission page:  
https://nam04.safelinks.protection.outlook.com/?url=https%3A%2F%2Fwww.linkedin.com%2Fpulse%2Fcall-papers-2nd-performance-engineering-track-over-code-brebner%2F=05%7C01%7CPaul.Brebner%40netapp.com%7C0d1187d03bfc4f4feaa108db7b7b805f%7C4b0911a0929b4715944bc03745165b3a%7C0%7C0%7C638239542594411186%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C3000%7C%7C%7C=KLphTZD56cLkYNTRjsnPB0lkQ40kpEW1CB1wyVtutps%3D=0
 - Paul Brebner and Roger Abelenda


PySpark error java.lang.IllegalArgumentException

2023-07-03 Thread elango vaidyanathan
Hi all,

I am reading a parquet file like this and it gives
java.lang.IllegalArgumentException.
However i can work with other parquet files (such as nyc taxi parquet
files) without any issue. I have copied the full error log as well. Can you
please check once and let me know how to fix this?

import pyspark

from pyspark.sql import SparkSession

spark=SparkSession.builder.appName("testPyspark").config("spark.executor.memory",
"20g").config("spark.driver.memory", "50g").getOrCreate()

df=spark.read.parquet("/data/202301/account_cycle")

df.printSchema() # worksfine

df.count() #worksfine

df.show()# getting below error

>>> df.show()

23/07/03 18:07:20 INFO FileSourceStrategy: Pushed Filters:

23/07/03 18:07:20 INFO FileSourceStrategy: Post-Scan Filters:

23/07/03 18:07:20 INFO FileSourceStrategy: Output Data Schema:
struct

23/07/03 18:07:20 INFO MemoryStore: Block broadcast_19 stored as values in
memory (estimated size 540.6 KiB, free 26.5 GiB)

23/07/03 18:07:20 INFO MemoryStore: Block broadcast_19_piece0 stored as
bytes in memory (estimated size 46.0 KiB, free 26.5 GiB)

23/07/03 18:07:20 INFO BlockManagerInfo: Added broadcast_19_piece0 in
memory on mynode:41055 (size: 46.0 KiB, free: 26.5 GiB)

23/07/03 18:07:20 INFO SparkContext: Created broadcast 19 from showString
at NativeMethodAccessorImpl.java:0

23/07/03 18:07:20 INFO FileSourceScanExec: Planning scan with bin packing,
max size: 134217728 bytes, open cost is considered as scanning 4194304
bytes.

23/07/03 18:07:20 INFO SparkContext: Starting job: showString at
NativeMethodAccessorImpl.java:0

23/07/03 18:07:20 INFO DAGScheduler: Got job 13 (showString at
NativeMethodAccessorImpl.java:0) with 1 output partitions

23/07/03 18:07:20 INFO DAGScheduler: Final stage: ResultStage 14
(showString at NativeMethodAccessorImpl.java:0)

23/07/03 18:07:20 INFO DAGScheduler: Parents of final stage: List()

23/07/03 18:07:20 INFO DAGScheduler: Missing parents: List()

23/07/03 18:07:20 INFO DAGScheduler: Submitting ResultStage 14
(MapPartitionsRDD[42] at showString at NativeMethodAccessorImpl.java:0),
which has no missing parents

23/07/03 18:07:20 INFO MemoryStore: Block broadcast_20 stored as values in
memory (estimated size 38.1 KiB, free 26.5 GiB)

23/07/03 18:07:20 INFO MemoryStore: Block broadcast_20_piece0 stored as
bytes in memory (estimated size 10.5 KiB, free 26.5 GiB)

23/07/03 18:07:20 INFO BlockManagerInfo: Added broadcast_20_piece0 in
memory on mynode:41055 (size: 10.5 KiB, free: 26.5 GiB)

23/07/03 18:07:20 INFO SparkContext: Created broadcast 20 from broadcast at
DAGScheduler.scala:1478

23/07/03 18:07:20 INFO DAGScheduler: Submitting 1 missing tasks from
ResultStage 14 (MapPartitionsRDD[42] at showString at
NativeMethodAccessorImpl.java:0) (first 15 tasks are for partitions
Vector(0))

23/07/03 18:07:20 INFO TaskSchedulerImpl: Adding task set 14.0 with 1 tasks
resource profile 0

23/07/03 18:07:20 INFO TaskSetManager: Starting task 0.0 in stage 14.0 (TID
48) (mynode, executor driver, partition 0, PROCESS_LOCAL, 4890 bytes)
taskResourceAssignments Map()

23/07/03 18:07:20 INFO Executor: Running task 0.0 in stage 14.0 (TID 48)

23/07/03 18:07:20 INFO FileScanRDD: Reading File path:
file:///data/202301/account_cycle/account_cycle-202301-53.parquet, range:
0-134217728, partition values: [empty row]

23/07/03 18:07:20 ERROR Executor: Exception in task 0.0 in stage 14.0 (TID
48)

java.lang.IllegalArgumentException

at java.nio.Buffer.limit(Buffer.java:275)

at org.xerial.snappy.Snappy.uncompress(Snappy.java:553)

at
org.apache.parquet.hadoop.codec.SnappyDecompressor.decompress(SnappyDecompressor.java:71)

at
org.apache.parquet.hadoop.codec.NonBlockedDecompressorStream.read(NonBlockedDecompressorStream.java:51)

at java.io.DataInputStream.readFully(DataInputStream.java:195)

at java.io.DataInputStream.readFully(DataInputStream.java:169)

at
org.apache.parquet.bytes.BytesInput$StreamBytesInput.toByteArray(BytesInput.java:286)

at
org.apache.parquet.bytes.BytesInput.toByteBuffer(BytesInput.java:237)

at
org.apache.parquet.bytes.BytesInput.toInputStream(BytesInput.java:246)

at
org.apache.parquet.column.values.dictionary.PlainValuesDictionary$PlainLongDictionary.(PlainValuesDictionary.java:154)

at
org.apache.parquet.column.Encoding$1.initDictionary(Encoding.java:96)

at
org.apache.parquet.column.Encoding$5.initDictionary(Encoding.java:163)

at
org.apache.spark.sql.execution.datasources.parquet.VectorizedColumnReader.(VectorizedColumnReader.java:114)

at
org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.checkEndOfRowGroup(VectorizedParquetRecordReader.java:352)

at
org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextBatch(VectorizedParquetRecordReader.java:293)

at

[Spark SQL] Data objects from query history

2023-06-30 Thread Ruben Mennes
Dear Apache Spark community,

I hope this email finds you well. My name is Ruben, and I am an enthusiastic 
user of Apache Spark, specifically through the Databricks platform. I am 
reaching out to you today to seek your assistance and guidance regarding a 
specific use case.

I have been exploring the capabilities of Spark SQL and Databricks, and I have 
encountered a challenge related to accessing the data objects used by queries 
from the query history. I am aware that Databricks provides a comprehensive 
query history that contains valuable information about executed queries.

However, my objective is to extract the underlying data objects (tables) 
involved in each query. By doing so, I aim to analyze and understand the 
dependencies between queries and the data they operate on. This information 
will provide us new insights in how data is used across our data platform.

I have attempted to leverage the Spark SQL Antlr grammar, available at 
https://github.com/apache/spark/blob/master/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4,
 to parse the queries retrieved from the query history. Unfortunately, I have 
encountered difficulties when parsing more complex queries.

As an example, I have struggled to parse queries with intricate constructs such 
as the following:
> SELECT
>   concat(pickup_zip, '-', dropoff_zip) as route,
>   AVG(fare_amount) as average_fare
> FROM
>   `samples`.`nyctaxi`.`trips`
> GROUP BY
>   1
> ORDER BY
>   2 DESC
> LIMIT 1000
I would greatly appreciate it if you could provide me with some guidance on how 
to overcome these challenges. Specifically, I am interested in understanding if 
there are alternative approaches or existing tools that can help me achieve my 
goal of extracting the data objects used by queries from the Databricks query 
history.

Additionally, if there are any resources, documentation, or examples that 
provide further clarity on this topic, I would be more than grateful to receive 
them. Any insights you can provide would be of immense help in advancing my 
understanding and enabling me to make the most of the Spark SQL and Databricks 
ecosystem.

Thank you very much for your time and support. I eagerly look forward to 
hearing from you and benefiting from your expertise.

Best regards,
Ruben Mennes


checkpoint file deletion

2023-06-29 Thread Lingzhe Sun
Hi all,

I performed a stateful structure streaming job, and configured 
spark.cleaner.referenceTracking.cleanCheckpoints to true
spark.cleaner.periodicGC.interval to 1min
in the config. But the checkpoint files are not deleted and the number of them 
keeps growing. Did I miss something?



Lingzhe Sun  
Hirain Technology


Unsubscribe

2023-06-29 Thread lee
Unsubscribe 


| |
李杰
|
|
leedd1...@163.com
|

Unsubscribe

2023-06-28 Thread Ghazi Naceur
Unsubscribe


Re:subscribe

2023-06-28 Thread mojianan2015
test
















At 2023-06-29 10:21:56, "mojianan2015"  wrote:



Test

| |
mojianan2015
|
|
mojianan2...@163.com
|



subscribe

2023-06-28 Thread mojianan2015


Test

| |
mojianan2015
|
|
mojianan2...@163.com
|



[PySpark] Intermittent Spark session initialization error on M1 Mac

2023-06-27 Thread BeoumSuk Kim
Hi,

When I launch pyspark CLI on my M1 Macbook (standalone mode), I
intermittently get the following error and the Spark session doesn't get
initialized. 7~8 times out of 10, it doesn't have the issue, but it
intermittently fails. And, this occurs only when I specify
`spark.jars.packages` option.
May I get some help on this?

PySpark: 3.3.0 (installed with PIP)
Python: 3.8.10
Java: corretto-1.8.0_332

"""
 % pyspark --conf spark.jars.packages=org.apache.hadoop:hadoop-aws:3.3.2
...

23/06/27 17:51:38 ERROR SparkContext: Error initializing SparkContext.
java.lang.IllegalArgumentException: Too large frame: 5785721462337832960
at
org.sparkproject.guava.base.Preconditions.checkArgument(Preconditions.java:119)
at
org.apache.spark.network.util.TransportFrameDecoder.decodeNext(TransportFrameDecoder.java:148)
at
org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:98)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at
io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at
io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
at
io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
at
io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:722)
at
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:658)
at
io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:584)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:496)
at
io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at
io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.lang.Thread.run(Thread.java:750)
...
23/06/27 17:51:38 ERROR TransportRequestHandler: Error sending result
StreamResponse[streamId=/jars/com.amazonaws_aws-java-sdk-bundle-1.11.1026.jar,byteCount=226379782,body=FileSegmentManagedBuffer[file=.../.ivy2/jars/com.amazonaws_aws-java-sdk-bundle-1.11.1026.jar,offset=0,length=226379782]]
to /192.168.1.19:65116; closing connection
...
"""


[k8s] Fail to expose custom port on executor container specified in my executor pod template

2023-06-26 Thread James Yu
Hi Team,


I have no luck in trying to expose port 5005 (for remote debugging purpose) on 
my executor container using the following pod template and spark configuration

s3a://mybucket/pod-template-executor-debug.yaml

apiVersion: v1
kind: Pod
spec:
  containers:
  - name: spark-kubernetes-executor
ports:
- containerPort: 5005
  name: debug
  protocol: TCP

--config 
spark.kubernetes.executor.podTemplateFile=s3a://mybucket/pod-template-executor-debug.yaml


The resultant executor container only exposes the default 7079/TCP port, but 
not the 5005/TCP that I wanted it to expose.

It works just fine for the driver container with the similar settings where I 
can see all ports are exposed (5005/TCP, 7078/TCP, 7079/TCP, 4040/TCP) as 
expected.

Did I miss anything, or is this a known bug where executor pod template is not 
respected in terms of the port expose?

Thanks in advance for your help.

James


[Spark-SQL] Dataframe write saveAsTable failed

2023-06-26 Thread Anil Dasari
Hi,

We have upgraded Spark from 2.4.x to 3.3.1 recently and managed table
creation while writing dataframe as saveAsTable failed with below error.

Can not create the managed table(``) The associated
location('hdfs:') already exists.

On high level our code does below before writing dataframe as table:

sparkSession.sql(s"DROP TABLE IF EXISTS $hiveTableName PURGE")
mydataframe.write.mode(SaveMode.Overwrite).saveAsTable(hiveTableName)

The above code works with Spark 2 because of
spark.sql.legacy.allowCreatingManagedTableUsingNonemptyLocation which is
deprecated in Spark 3.

The table is dropped and purged before writing the dataframe. I expected
dataframe write shouldn't complain that the path already exists.

After digging further, I noticed there is `_tempory` folder present in the
hdfs table path.

dfs -ls /apps/hive/warehouse//
Found 1 items
drwxr-xr-x   - hadoop hdfsadmingroup  0 2023-06-23 04:45
/apps/hive/warehouse//_temporary

[root@ip-10-121-107-90 bin]# hdfs dfs -ls
/apps/hive/warehouse//_temporary
Found 1 items
drwxr-xr-x   - hadoop hdfsadmingroup  0 2023-06-23 04:45
/apps/hive/warehouse//_temporary/0

[root@ip-10-121-107-90 bin]# hdfs dfs -ls
/apps/hive/warehouse//_temporary/0
Found 1 items
drwxr-xr-x   - hadoop hdfsadmingroup  0 2023-06-23 04:45
/apps/hive/warehouse//_temporary/0/_temporary

Is it because of task failures ? Is there a way to workaround this issue ?

Thanks


Re: Spark-Sql - Slow Performance With CTAS and Large Gzipped File

2023-06-26 Thread Mich Talebzadeh
OK, good news. You have made some progress here :)

bzip (bzip2) works (splittable) because it is block-oriented whereas gzip
is stream oriented. I also noticed that you are creating a managed ORC
file.  You can bucket and partition an ORC (Optimized Row Columnar file
format. An example below:


DROP TABLE IF EXISTS dummy;

CREATE TABLE dummy (
 ID INT
   , CLUSTERED INT
   , SCATTERED INT
   , RANDOMISED INT
   , RANDOM_STRING VARCHAR(50)
   , SMALL_VC VARCHAR(10)
   , PADDING  VARCHAR(10)
)
CLUSTERED BY (ID) INTO 256 BUCKETS
STORED AS ORC
TBLPROPERTIES (
"orc.create.index"="true",
"orc.bloom.filter.columns"="ID",
"orc.bloom.filter.fpp"="0.05",
"orc.compress"="SNAPPY",
"orc.stripe.size"="16777216",
"orc.row.index.stride"="1" )
;

HTH

Mich Talebzadeh,
Solutions Architect/Engineering Lead
Palantir Technologies Limited
London
United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Mon, 26 Jun 2023 at 19:35, Patrick Tucci  wrote:

> Hi Mich,
>
> Thanks for the reply. I started running ANALYZE TABLE on the external
> table, but the progress was very slow. The stage had only read about 275MB
> in 10 minutes. That equates to about 5.5 hours just to analyze the table.
>
> This might just be the reality of trying to process a 240m record file
> with 80+ columns, unless there's an obvious issue with my setup that
> someone sees. The solution is likely going to involve increasing
> parallelization.
>
> To that end, I extracted and re-zipped this file in bzip. Since bzip is
> splittable and gzip is not, Spark can process the bzip file in parallel.
> The same CTAS query only took about 45 minutes. This is still a bit slower
> than I had hoped, but the import from bzip fully utilized all available
> cores. So we can give the cluster more resources if we need the process to
> go faster.
>
> Patrick
>
> On Mon, Jun 26, 2023 at 12:52 PM Mich Talebzadeh <
> mich.talebza...@gmail.com> wrote:
>
>> OK for now have you analyzed statistics in Hive external table
>>
>> spark-sql (default)> ANALYZE TABLE test.stg_t2 COMPUTE STATISTICS FOR ALL
>> COLUMNS;
>> spark-sql (default)> DESC EXTENDED test.stg_t2;
>>
>> Hive external tables have little optimization
>>
>> HTH
>>
>>
>>
>> Mich Talebzadeh,
>> Solutions Architect/Engineering Lead
>> Palantir Technologies Limited
>> London
>> United Kingdom
>>
>>
>>view my Linkedin profile
>> 
>>
>>
>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> On Mon, 26 Jun 2023 at 16:33, Patrick Tucci 
>> wrote:
>>
>>> Hello,
>>>
>>> I'm using Spark 3.4.0 in standalone mode with Hadoop 3.3.5. The master
>>> node has 2 cores and 8GB of RAM. There is a single worker node with 8 cores
>>> and 64GB of RAM.
>>>
>>> I'm trying to process a large pipe delimited file that has been
>>> compressed with gzip (9.2GB zipped, ~58GB unzipped, ~241m records, ~85
>>> columns). I uploaded the gzipped file to HDFS and created an external table
>>> using the attached script. I tried two simpler queries on the same table,
>>> and they finished in ~5 and ~10 minutes respectively:
>>>
>>> SELECT COUNT(*) FROM ClaimsImport;
>>> SELECT COUNT(*) FROM ClaimsImport WHERE ClaimLineID = 1;
>>>
>>> However, when I tried to create a table stored as ORC using this table
>>> as the input, the query ran for almost 4 hours:
>>>
>>> CREATE TABLE Claims STORED AS ORC
>>> AS
>>> SELECT *
>>> FROM ClaimsImport
>>> --Exclude the header record
>>> WHERE ClaimID <> 'ClaimID';
>>>
>>> [image: image.png]
>>>
>>> Why is there such a speed disparity between these different operations?
>>> I understand that this job cannot be parallelized because the file is
>>> compressed with gzip. I also understand that creating an ORC table from the
>>> input will take more time than a simple COUNT(*). But it doesn't feel like
>>> the CREATE TABLE operation should take more than 24x longer than a simple
>>> SELECT COUNT(*) statement.
>>>
>>> Thanks for any help. Please let me know if I can provide any additional
>>> information.
>>>
>>> Patrick
>>>
>>> -
>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>


Re: Spark-Sql - Slow Performance With CTAS and Large Gzipped File

2023-06-26 Thread Patrick Tucci
Hi Mich,

Thanks for the reply. I started running ANALYZE TABLE on the external
table, but the progress was very slow. The stage had only read about 275MB
in 10 minutes. That equates to about 5.5 hours just to analyze the table.

This might just be the reality of trying to process a 240m record file with
80+ columns, unless there's an obvious issue with my setup that someone
sees. The solution is likely going to involve increasing parallelization.

To that end, I extracted and re-zipped this file in bzip. Since bzip is
splittable and gzip is not, Spark can process the bzip file in parallel.
The same CTAS query only took about 45 minutes. This is still a bit slower
than I had hoped, but the import from bzip fully utilized all available
cores. So we can give the cluster more resources if we need the process to
go faster.

Patrick

On Mon, Jun 26, 2023 at 12:52 PM Mich Talebzadeh 
wrote:

> OK for now have you analyzed statistics in Hive external table
>
> spark-sql (default)> ANALYZE TABLE test.stg_t2 COMPUTE STATISTICS FOR ALL
> COLUMNS;
> spark-sql (default)> DESC EXTENDED test.stg_t2;
>
> Hive external tables have little optimization
>
> HTH
>
>
>
> Mich Talebzadeh,
> Solutions Architect/Engineering Lead
> Palantir Technologies Limited
> London
> United Kingdom
>
>
>view my Linkedin profile
> 
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Mon, 26 Jun 2023 at 16:33, Patrick Tucci 
> wrote:
>
>> Hello,
>>
>> I'm using Spark 3.4.0 in standalone mode with Hadoop 3.3.5. The master
>> node has 2 cores and 8GB of RAM. There is a single worker node with 8 cores
>> and 64GB of RAM.
>>
>> I'm trying to process a large pipe delimited file that has been
>> compressed with gzip (9.2GB zipped, ~58GB unzipped, ~241m records, ~85
>> columns). I uploaded the gzipped file to HDFS and created an external table
>> using the attached script. I tried two simpler queries on the same table,
>> and they finished in ~5 and ~10 minutes respectively:
>>
>> SELECT COUNT(*) FROM ClaimsImport;
>> SELECT COUNT(*) FROM ClaimsImport WHERE ClaimLineID = 1;
>>
>> However, when I tried to create a table stored as ORC using this table as
>> the input, the query ran for almost 4 hours:
>>
>> CREATE TABLE Claims STORED AS ORC
>> AS
>> SELECT *
>> FROM ClaimsImport
>> --Exclude the header record
>> WHERE ClaimID <> 'ClaimID';
>>
>> [image: image.png]
>>
>> Why is there such a speed disparity between these different operations? I
>> understand that this job cannot be parallelized because the file is
>> compressed with gzip. I also understand that creating an ORC table from the
>> input will take more time than a simple COUNT(*). But it doesn't feel like
>> the CREATE TABLE operation should take more than 24x longer than a simple
>> SELECT COUNT(*) statement.
>>
>> Thanks for any help. Please let me know if I can provide any additional
>> information.
>>
>> Patrick
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Unable to populate spark metrics using custom metrics API

2023-06-26 Thread Surya Soma
Hello,

I am trying to publish custom metrics using Spark CustomMetric API as
supported since spark 3.2 https://github.com/apache/spark/pull/31476,

https://spark.apache.org/docs/3.2.0/api/java/org/apache/spark/sql/connector/metric/CustomMetric.html

I have created a custom metric implementing `CustomMetic` with default
constructor overriding name and description.
Created a new instance of the created custom metric in the
`supportedCustomMetrics` method of  `spark.sql.connector.read.Scan`.

Created a custom task metric implementing `CustomTaskMetric` with the same
name as that of CustomMetric class and initialized this in
`currentMetricsValues` of PartitionReader.

I have static values as of now but when I run the application, I see in the
spark history page the corresponding value to the metric as N/A.
I have added logs in the `aggregateTaskMetrics` and my flow is going into
it. The spark SQLAppStatusListener.aggregateMetrics is loading my class and
calling the `aggregateTaskMetrics` yet I still see N/A in the spark ui
page.


Driver log:

```

23/06/23 19:23:53 INFO Spark32CustomMetric: Spark32CustomMetric in
aggregateTaskMetrics start
23/06/23 19:23:53 INFO Spark32CustomMetric: Spark32CustomMetric in
aggregateTaskMetrics sum:1234end
+-+--+---+---+
| word|word_count| corpus|corpus_date|
+-+--+---+---+
| LVII| 1|sonnets|  0|
|   augurs| 1|sonnets|  0|
|   dimm'd| 1|sonnets|  0|```


Attaching the Spark UI page screenshot.

Am I missing something? Any help is really appreciated.

Thanks.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Re: Spark-Sql - Slow Performance With CTAS and Large Gzipped File

2023-06-26 Thread Mich Talebzadeh
OK for now have you analyzed statistics in Hive external table

spark-sql (default)> ANALYZE TABLE test.stg_t2 COMPUTE STATISTICS FOR ALL
COLUMNS;
spark-sql (default)> DESC EXTENDED test.stg_t2;

Hive external tables have little optimization

HTH



Mich Talebzadeh,
Solutions Architect/Engineering Lead
Palantir Technologies Limited
London
United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Mon, 26 Jun 2023 at 16:33, Patrick Tucci  wrote:

> Hello,
>
> I'm using Spark 3.4.0 in standalone mode with Hadoop 3.3.5. The master
> node has 2 cores and 8GB of RAM. There is a single worker node with 8 cores
> and 64GB of RAM.
>
> I'm trying to process a large pipe delimited file that has been compressed
> with gzip (9.2GB zipped, ~58GB unzipped, ~241m records, ~85 columns). I
> uploaded the gzipped file to HDFS and created an external table using the
> attached script. I tried two simpler queries on the same table, and they
> finished in ~5 and ~10 minutes respectively:
>
> SELECT COUNT(*) FROM ClaimsImport;
> SELECT COUNT(*) FROM ClaimsImport WHERE ClaimLineID = 1;
>
> However, when I tried to create a table stored as ORC using this table as
> the input, the query ran for almost 4 hours:
>
> CREATE TABLE Claims STORED AS ORC
> AS
> SELECT *
> FROM ClaimsImport
> --Exclude the header record
> WHERE ClaimID <> 'ClaimID';
>
> [image: image.png]
>
> Why is there such a speed disparity between these different operations? I
> understand that this job cannot be parallelized because the file is
> compressed with gzip. I also understand that creating an ORC table from the
> input will take more time than a simple COUNT(*). But it doesn't feel like
> the CREATE TABLE operation should take more than 24x longer than a simple
> SELECT COUNT(*) statement.
>
> Thanks for any help. Please let me know if I can provide any additional
> information.
>
> Patrick
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org


Unsubscribe

2023-06-26 Thread Ghazi Naceur
Unsubscribe


Spark-Sql - Slow Performance With CTAS and Large Gzipped File

2023-06-26 Thread Patrick Tucci
Hello,

I'm using Spark 3.4.0 in standalone mode with Hadoop 3.3.5. The master node
has 2 cores and 8GB of RAM. There is a single worker node with 8 cores and
64GB of RAM.

I'm trying to process a large pipe delimited file that has been compressed
with gzip (9.2GB zipped, ~58GB unzipped, ~241m records, ~85 columns). I
uploaded the gzipped file to HDFS and created an external table using the
attached script. I tried two simpler queries on the same table, and they
finished in ~5 and ~10 minutes respectively:

SELECT COUNT(*) FROM ClaimsImport;
SELECT COUNT(*) FROM ClaimsImport WHERE ClaimLineID = 1;

However, when I tried to create a table stored as ORC using this table as
the input, the query ran for almost 4 hours:

CREATE TABLE Claims STORED AS ORC
AS
SELECT *
FROM ClaimsImport
--Exclude the header record
WHERE ClaimID <> 'ClaimID';

[image: image.png]

Why is there such a speed disparity between these different operations? I
understand that this job cannot be parallelized because the file is
compressed with gzip. I also understand that creating an ORC table from the
input will take more time than a simple COUNT(*). But it doesn't feel like
the CREATE TABLE operation should take more than 24x longer than a simple
SELECT COUNT(*) statement.

Thanks for any help. Please let me know if I can provide any additional
information.

Patrick


Create Table.sql
Description: Binary data

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Re: [Spark streaming]: Microbatch id in logs

2023-06-26 Thread Mich Talebzadeh
In SSS
writeStream. \
   outputMode('append'). \
   option("truncate", "false"). \
  * foreachBatch(SendToBigQuery). \*
   option('checkpointLocation', checkpoint_path). \

so this writeStream will call  foreachBatch()

   """
   "foreachBatch" performs custom write logic on each
micro-batch through SendToBigQuery function
foreachBatch(SendToBigQuery) expects 2 parameters, first:*
micro-batch as DataFrame or Dataset and second: unique id for each batch*
   Using foreachBatch, we write each micro batch to storage
defined in our custom logic. In this case, we store the output of our
streaming application to Google BigQuery table

that does this

def SendToBigQuery(df, batchId):
if(len(df.take(1))) > 0:
print(batchId)
# do your logic
else:
print("DataFrame is empty")

You should also have it in

   option('checkpointLocation', checkpoint_path).

See this article on mine
Processing Change Data Capture with Spark Structured Streaming


HTH

Mich Talebzadeh,
Solutions Architect/Engineering Lead
Palantir Technologies Limited
London
United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Mon, 26 Jun 2023 at 06:01, Anil Dasari  wrote:

> Hi,
> I am using spark 3.3.1 distribution and spark stream in my application. Is
> there a way to add a microbatch id to all logs generated by spark and spark
> applications ?
>
> Thanks.
>


[Spark streaming]: Microbatch id in logs

2023-06-25 Thread Anil Dasari
Hi,
I am using spark 3.3.1 distribution and spark stream in my application. Is
there a way to add a microbatch id to all logs generated by spark and spark
applications ?

Thanks.


Re: [ANNOUNCE] Apache Spark 3.4.1 released

2023-06-24 Thread yangjie01
Thanks Dongjoon ~

在 2023/6/24 10:29,“L. C. Hsieh”mailto:vii...@gmail.com>> 写入:


Thanks Dongjoon!


On Fri, Jun 23, 2023 at 7:10 PM Hyukjin Kwon mailto:gurwls...@apache.org>> wrote:
>
> Thanks!
>
> On Sat, Jun 24, 2023 at 11:01 AM Mridul Muralidharan  > wrote:
>>
>>
>> Thanks Dongjoon !
>>
>> Regards,
>> Mridul
>>
>> On Fri, Jun 23, 2023 at 6:58 PM Dongjoon Hyun > > wrote:
>>>
>>> We are happy to announce the availability of Apache Spark 3.4.1!
>>>
>>> Spark 3.4.1 is a maintenance release containing stability fixes. This
>>> release is based on the branch-3.4 maintenance branch of Spark. We strongly
>>> recommend all 3.4 users to upgrade to this stable release.
>>>
>>> To download Spark 3.4.1, head over to the download page:
>>> https://spark.apache.org/downloads.html 
>>> 
>>>
>>> To view the release notes:
>>> https://spark.apache.org/releases/spark-release-3-4-1.html 
>>> 
>>>
>>> We would like to acknowledge all community members for contributing to this
>>> release. This release would not have been possible without you.
>>>
>>>
>>> Dongjoon Hyun


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org 







-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re:[ANNOUNCE] Apache Spark 3.4.1 released

2023-06-24 Thread beliefer
Thanks! Dongjoon Hyun.
Congratulation too!







At 2023-06-24 07:57:05, "Dongjoon Hyun"  wrote:

We are happy to announce the availability of Apache Spark 3.4.1!

Spark 3.4.1 is a maintenance release containing stability fixes. This
release is based on the branch-3.4 maintenance branch of Spark. We strongly
recommend all 3.4 users to upgrade to this stable release.

To download Spark 3.4.1, head over to the download page:
https://spark.apache.org/downloads.html

To view the release notes:
https://spark.apache.org/releases/spark-release-3-4-1.html

We would like to acknowledge all community members for contributing to this
release. This release would not have been possible without you.

Dongjoon Hyun


Apache Spark with watermark - processing data different LogTypes in same kafka topic

2023-06-24 Thread karan alang
Hello All -

I'm using Apache Spark Structured Streaming to read data from Kafka topic,
and do some processing. I'm using watermark to account for late-coming
records and the code works fine.

Here is the working(sample) code:
```

from pyspark.sql import SparkSessionfrom pyspark.sql.functions import
from_json, col, to_timestamp, window, max,exprfrom pyspark.sql.types
import StructType, StructField, StringType, DoubleType,IntegerType

spark = SparkSession \
.builder \
.master("local[3]") \
.appName("Sliding Window Demo") \
.config("spark.streaming.stopGracefullyOnShutdown", "true") \
.config("spark.sql.shuffle.partitions", 1) \
.getOrCreate()


stock_schema = StructType([
StructField("LogType", StringType()),
StructField("CreatedTime", StringType()),
StructField("Type", StringType()),
StructField("Amount", IntegerType()),
StructField("BrokerCode", StringType())
])

kafka_df = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "trades") \
.option("startingOffsets", "earliest") \
.load()

value_df = kafka_df.select(from_json(col("value").cast("string"),
stock_schema).alias("value"))

trade_df = value_df.select("value.*") \
.withColumn("CreatedTime", to_timestamp(col("CreatedTime"),
"-MM-dd HH:mm:ss")) \
.withColumn("Buy", expr("case when Type == 'BUY' then Amount
else 0 end")) \
.withColumn("Sell", expr("case when Type == 'SELL' then Amount
else 0 end"))


window_agg_df = trade_df \
.withWatermark("CreatedTime", "10 minute") \
.groupBy(window(col("CreatedTime"), "10 minute")) \
.agg({"Buy":"sum",
"Sell":"sum"}).withColumnRenamed("sum(Buy)",
"TotalBuy").withColumnRenamed("sum(Sell)", "TotalSell")

output_df = window_agg_df.select("window.start", "window.end",
"TotalBuy", "TotalSell")

window_query = output_df.writeStream \
.format("console") \
.outputMode("append") \
.option("checkpointLocation", "chk-point-dir-mar28") \
.trigger(processingTime="30 second") \
.start()

window_query.awaitTermination()


```

Currently, I'm processing a single LogType, the requirement is to process
multiple LogTypes in the same flow .. LogTypes will be config driven (not
hard-coded). Objective is to have generic code that can process all
logTypes.

As an example, for LogType X, I will need to get groupby columns col1, col2
and get the sum of values 'sent' & 'received'. for LogType Y, the groupBy
columns will remain the same but the sum will be on column col3 instead.

w/o the watermark, I can look at the LogType and do the processing in batch
mode (using foreachBatch). However, with watermark - i'm unable to figure
out how to process based on LogType.

Any inputs on this ?

Here is the stackoverflow for this

https://stackoverflow.com/questions/76547349/apache-spark-with-watermark-processing-data-different-logtypes-in-same-kafka-t

tia!


Re: [ANNOUNCE] Apache Spark 3.4.1 released

2023-06-23 Thread L. C. Hsieh
Thanks Dongjoon!

On Fri, Jun 23, 2023 at 7:10 PM Hyukjin Kwon  wrote:
>
> Thanks!
>
> On Sat, Jun 24, 2023 at 11:01 AM Mridul Muralidharan  wrote:
>>
>>
>> Thanks Dongjoon !
>>
>> Regards,
>> Mridul
>>
>> On Fri, Jun 23, 2023 at 6:58 PM Dongjoon Hyun  wrote:
>>>
>>> We are happy to announce the availability of Apache Spark 3.4.1!
>>>
>>> Spark 3.4.1 is a maintenance release containing stability fixes. This
>>> release is based on the branch-3.4 maintenance branch of Spark. We strongly
>>> recommend all 3.4 users to upgrade to this stable release.
>>>
>>> To download Spark 3.4.1, head over to the download page:
>>> https://spark.apache.org/downloads.html
>>>
>>> To view the release notes:
>>> https://spark.apache.org/releases/spark-release-3-4-1.html
>>>
>>> We would like to acknowledge all community members for contributing to this
>>> release. This release would not have been possible without you.
>>>
>>>
>>> Dongjoon Hyun

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: [ANNOUNCE] Apache Spark 3.4.1 released

2023-06-23 Thread Hyukjin Kwon
Thanks!

On Sat, Jun 24, 2023 at 11:01 AM Mridul Muralidharan 
wrote:

>
> Thanks Dongjoon !
>
> Regards,
> Mridul
>
> On Fri, Jun 23, 2023 at 6:58 PM Dongjoon Hyun  wrote:
>
>> We are happy to announce the availability of Apache Spark 3.4.1!
>>
>> Spark 3.4.1 is a maintenance release containing stability fixes. This
>> release is based on the branch-3.4 maintenance branch of Spark. We
>> strongly
>> recommend all 3.4 users to upgrade to this stable release.
>>
>> To download Spark 3.4.1, head over to the download page:
>> https://spark.apache.org/downloads.html
>>
>> To view the release notes:
>> https://spark.apache.org/releases/spark-release-3-4-1.html
>>
>> We would like to acknowledge all community members for contributing to
>> this
>> release. This release would not have been possible without you.
>>
>>
>> Dongjoon Hyun
>>
>


Re: [ANNOUNCE] Apache Spark 3.4.1 released

2023-06-23 Thread Mridul Muralidharan
Thanks Dongjoon !

Regards,
Mridul

On Fri, Jun 23, 2023 at 6:58 PM Dongjoon Hyun  wrote:

> We are happy to announce the availability of Apache Spark 3.4.1!
>
> Spark 3.4.1 is a maintenance release containing stability fixes. This
> release is based on the branch-3.4 maintenance branch of Spark. We strongly
> recommend all 3.4 users to upgrade to this stable release.
>
> To download Spark 3.4.1, head over to the download page:
> https://spark.apache.org/downloads.html
>
> To view the release notes:
> https://spark.apache.org/releases/spark-release-3-4-1.html
>
> We would like to acknowledge all community members for contributing to this
> release. This release would not have been possible without you.
>
>
> Dongjoon Hyun
>


[ANNOUNCE] Apache Spark 3.4.1 released

2023-06-23 Thread Dongjoon Hyun
We are happy to announce the availability of Apache Spark 3.4.1!

Spark 3.4.1 is a maintenance release containing stability fixes. This
release is based on the branch-3.4 maintenance branch of Spark. We strongly
recommend all 3.4 users to upgrade to this stable release.

To download Spark 3.4.1, head over to the download page:
https://spark.apache.org/downloads.html

To view the release notes:
https://spark.apache.org/releases/spark-release-3-4-1.html

We would like to acknowledge all community members for contributing to this
release. This release would not have been possible without you.

Dongjoon Hyun


Re: Rename columns without manually setting them all

2023-06-21 Thread Bjørn Jørgensen
data = {
"Employee ID": [12345, 12346, 12347, 12348, 12349],
"Name": ["Dummy x", "Dummy y", "Dummy z", "Dummy a", "Dummy b"],
"Client": ["Dummy a", "Dummy b", "Dummy c", "Dummy d", "Dummy e"],
"Project": ["abc", "def", "ghi", "jkl", "mno"],
"Team": ["team a", "team b", "team c", "team d", "team e"],
"01/01/2022": ["OFF", "WO", "WH", "WH", "OFF"],
"02/01/2022": ["WO", "WO", "OFF", "WH", "WH"],
"03/01/2022": ["WH", "WH", "WH", "OFF", "WO"],
"04/01/2022": ["WH", "WO", "WO", "WH", "OFF"],
"05/01/2022": ["WH", "WH", "OFF", "WO", "WO"],
}

df = ps.DataFrame(data)

# Define dates columns
dates_columns = df.columns[5:]

# Melt the dataframe and count the occurrences
df_melt = df.melt(id_vars=df.columns[:5], value_vars=dates_columns,
var_name="Date", value_name="Status")
df_counts = df_melt.groupby(["Date", "Status"]).size().unstack()
df_counts.sort_index(inplace=True)
df_counts

[image: image.png]

ons. 21. juni 2023 kl. 14:39 skrev Farshid Ashouri <
farsheed.asho...@gmail.com>:

> You can use selectExpr and stack to achieve the same effect in PySpark:
>
>
>
> df = spark.read.csv("your_file.csv", header=True, inferSchema=True)
>
> date_columns = [col for col in df.columns if '/' in col]
>
> df = df.selectExpr(["`Employee ID`", "`Name`", "`Client`", "`Project`",
> "`Team`”]
> + [f"stack({len(date_columns)}, {', '.join([f'`{col}`, `{col}` as
> `Status`' for col in date_columns])}) as (`Date`, `Status`)”])
>
> result = df.groupby("Date", "Status").count()
>
>
>
>
> On 21 Jun 2023, at 11:45, John Paul Jayme 
> wrote:
>
> Hi,
>
> This is currently my column definition :
> Employee ID Name Client Project Team 01/01/2022 02/01/2022 03/01/2022
> 04/01/2022 05/01/2022
> 12345 Dummy x Dummy a abc team a OFF WO  WH WH WH
> As you can see, the outer columns are just daily attendance dates. My goal
> is to count the employees who were OFF / WO / WH on said dates. I need to
> transpose them so it would look like this :
>
> 
>
> I am still new to pandas. Can you guide me on how to produce this? I am
> reading about melt() and set_index() but I am not sure if they are the
> correct functions to use.
>
>
>

-- 
Bjørn Jørgensen
Vestre Aspehaug 4, 6010 Ålesund
Norge

+47 480 94 297


Re: Rename columns without manually setting them all

2023-06-21 Thread Farshid Ashouri
You can use selectExpr and stack to achieve the same effect in PySpark:



df = spark.read.csv("your_file.csv", header=True, inferSchema=True)

date_columns = [col for col in df.columns if '/' in col]

df = df.selectExpr(["`Employee ID`", "`Name`", "`Client`", "`Project`", 
"`Team`”] 
+ [f"stack({len(date_columns)}, {', '.join([f'`{col}`, `{col}` as 
`Status`' for col in date_columns])}) as (`Date`, `Status`)”])

result = df.groupby("Date", "Status").count()




> On 21 Jun 2023, at 11:45, John Paul Jayme  wrote:
> 
> Hi,
> 
> This is currently my column definition :
> Employee ID   NameClient  Project Team01/01/2022  02/01/2022  
> 03/01/2022  04/01/2022  05/01/2022
> 12345 Dummy x Dummy a abc team a  OFF WO  WH  WH  
> WH
> 
> As you can see, the outer columns are just daily attendance dates. My goal is 
> to count the employees who were OFF / WO / WH on said dates. I need to 
> transpose them so it would look like this : 
> 
> 
> 
> I am still new to pandas. Can you guide me on how to produce this? I am 
> reading about melt() and set_index() but I am not sure if they are the 
> correct functions to use.



Rename columns without manually setting them all

2023-06-21 Thread John Paul Jayme
Hi,

This is currently my column definition :
Employee ID NameClient  Project Team01/01/2022  02/01/2022  
03/01/2022  04/01/2022  05/01/2022
12345   Dummy x Dummy a abc team a  OFF WO  WH  WH  
WH

As you can see, the outer columns are just daily attendance dates. My goal is 
to count the employees who were OFF / WO / WH on said dates. I need to 
transpose them so it would look like this :

[cid:ff6d0260-0168-40a4-82db-6c2acd517c39]

I am still new to pandas. Can you guide me on how to produce this? I am reading 
about melt() and set_index() but I am not sure if they are the correct 
functions to use.



Re: How to read excel file in PySpark

2023-06-20 Thread Mich Talebzadeh
OK thanks for the info.

Regards

Mich Talebzadeh,
Lead Solutions Architect/Engineering Lead
Palantir Technologies Limited
London
United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Tue, 20 Jun 2023 at 21:27, Bjørn Jørgensen 
wrote:

> yes, p_df = DF.toPandas() that is THE pandas the one you know.
>
> change p_df = DF.toPandas() to
> p_df = DF.pandas_on_spark()
> or
> p_df = DF.to_pandas_on_spark()
> or
> p_df = DF.pandas_api()
> or
> p_df = DF.to_koalas()
>
>
>
> https://spark.apache.org/docs/latest/api/python/migration_guide/koalas_to_pyspark.html
>
> Then you will have yours pyspark df to panda API on spark.
>
> tir. 20. juni 2023 kl. 22:16 skrev Mich Talebzadeh <
> mich.talebza...@gmail.com>:
>
>> OK thanks
>>
>> So the issue seems to be creating  a Panda DF from Spark DF (I do it for
>> plotting with something like
>>
>> import matplotlib.pyplot as plt
>> p_df = DF.toPandas()
>> p_df.plt()
>>
>> I guess that stays in the driver.
>>
>>
>> Mich Talebzadeh,
>> Lead Solutions Architect/Engineering Lead
>> Palantir Technologies Limited
>> London
>> United Kingdom
>>
>>
>>view my Linkedin profile
>> 
>>
>>
>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> On Tue, 20 Jun 2023 at 20:46, Sean Owen  wrote:
>>
>>> No, a pandas on Spark DF is distributed.
>>>
>>> On Tue, Jun 20, 2023, 1:45 PM Mich Talebzadeh 
>>> wrote:
>>>
 Thanks but if you create a Spark DF from Pandas DF that Spark DF is not
 distributed and remains on the driver. I recall a while back we had this
 conversation. I don't think anything has changed.

 Happy to be corrected

 Mich Talebzadeh,
 Lead Solutions Architect/Engineering Lead
 Palantir Technologies Limited
 London
 United Kingdom


view my Linkedin profile
 


  https://en.everybodywiki.com/Mich_Talebzadeh



 *Disclaimer:* Use it at your own risk. Any and all responsibility for
 any loss, damage or destruction of data or any other property which may
 arise from relying on this email's technical content is explicitly
 disclaimed. The author will in no case be liable for any monetary damages
 arising from such loss, damage or destruction.




 On Tue, 20 Jun 2023 at 20:09, Bjørn Jørgensen 
 wrote:

> Pandas API on spark is an API so that users can use spark as they use
> pandas. This was known as koalas.
>
> Is this limitation still valid for Pandas?
> For pandas, yes. But what I did show wos pandas API on spark so its
> spark.
>
>  Additionally when we convert from Panda DF to Spark DF, what process
> is involved under the bonnet?
> I gess pyarrow and drop the index column.
>
> Have a look at
> https://github.com/apache/spark/tree/master/python/pyspark/pandas
>
> tir. 20. juni 2023 kl. 19:05 skrev Mich Talebzadeh <
> mich.talebza...@gmail.com>:
>
>> Whenever someone mentions Pandas I automatically think of it as an
>> excel sheet for Python.
>>
>> OK my point below needs some qualification
>>
>> Why Spark here. Generally, parallel architecture comes into play when
>> the data size is significantly large which cannot be handled on a single
>> machine, hence, the use of Spark becomes meaningful. In cases where (the
>> generated) data size is going to be very large (which is often norm 
>> rather
>> than the exception these days), the data cannot be processed and stored 
>> in
>> Pandas data frames as these data frames store data in RAM. Then, the 
>> whole
>> dataset from a storage like HDFS or cloud storage cannot be collected,
>> because it will take significant time and space and probably won't fit 
>> in a
>> single machine RAM. (in this the driver memory)
>>
>> Is this limitation still valid for Pandas? Additionally when we
>> convert from Panda DF to Spark DF, what process is involved under the
>> bonnet?
>>
>> Thanks
>>
>> Mich Talebzadeh,
>> Lead Solutions 

Unsubscribe

2023-06-20 Thread Bhargava Sukkala
-- 
Thanks,
Bhargava Sukkala.
Cell no:216-278-1066
MS in Business Analytics,
Arizona State University.


Re: How to read excel file in PySpark

2023-06-20 Thread Bjørn Jørgensen
yes, p_df = DF.toPandas() that is THE pandas the one you know.

change p_df = DF.toPandas() to
p_df = DF.pandas_on_spark()
or
p_df = DF.to_pandas_on_spark()
or
p_df = DF.pandas_api()
or
p_df = DF.to_koalas()


https://spark.apache.org/docs/latest/api/python/migration_guide/koalas_to_pyspark.html

Then you will have yours pyspark df to panda API on spark.

tir. 20. juni 2023 kl. 22:16 skrev Mich Talebzadeh <
mich.talebza...@gmail.com>:

> OK thanks
>
> So the issue seems to be creating  a Panda DF from Spark DF (I do it for
> plotting with something like
>
> import matplotlib.pyplot as plt
> p_df = DF.toPandas()
> p_df.plt()
>
> I guess that stays in the driver.
>
>
> Mich Talebzadeh,
> Lead Solutions Architect/Engineering Lead
> Palantir Technologies Limited
> London
> United Kingdom
>
>
>view my Linkedin profile
> 
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Tue, 20 Jun 2023 at 20:46, Sean Owen  wrote:
>
>> No, a pandas on Spark DF is distributed.
>>
>> On Tue, Jun 20, 2023, 1:45 PM Mich Talebzadeh 
>> wrote:
>>
>>> Thanks but if you create a Spark DF from Pandas DF that Spark DF is not
>>> distributed and remains on the driver. I recall a while back we had this
>>> conversation. I don't think anything has changed.
>>>
>>> Happy to be corrected
>>>
>>> Mich Talebzadeh,
>>> Lead Solutions Architect/Engineering Lead
>>> Palantir Technologies Limited
>>> London
>>> United Kingdom
>>>
>>>
>>>view my Linkedin profile
>>> 
>>>
>>>
>>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>>
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>>
>>> On Tue, 20 Jun 2023 at 20:09, Bjørn Jørgensen 
>>> wrote:
>>>
 Pandas API on spark is an API so that users can use spark as they use
 pandas. This was known as koalas.

 Is this limitation still valid for Pandas?
 For pandas, yes. But what I did show wos pandas API on spark so its
 spark.

  Additionally when we convert from Panda DF to Spark DF, what process
 is involved under the bonnet?
 I gess pyarrow and drop the index column.

 Have a look at
 https://github.com/apache/spark/tree/master/python/pyspark/pandas

 tir. 20. juni 2023 kl. 19:05 skrev Mich Talebzadeh <
 mich.talebza...@gmail.com>:

> Whenever someone mentions Pandas I automatically think of it as an
> excel sheet for Python.
>
> OK my point below needs some qualification
>
> Why Spark here. Generally, parallel architecture comes into play when
> the data size is significantly large which cannot be handled on a single
> machine, hence, the use of Spark becomes meaningful. In cases where (the
> generated) data size is going to be very large (which is often norm rather
> than the exception these days), the data cannot be processed and stored in
> Pandas data frames as these data frames store data in RAM. Then, the whole
> dataset from a storage like HDFS or cloud storage cannot be collected,
> because it will take significant time and space and probably won't fit in 
> a
> single machine RAM. (in this the driver memory)
>
> Is this limitation still valid for Pandas? Additionally when we
> convert from Panda DF to Spark DF, what process is involved under the
> bonnet?
>
> Thanks
>
> Mich Talebzadeh,
> Lead Solutions Architect/Engineering Lead
> Palantir Technologies Limited
> London
> United Kingdom
>
>
>view my Linkedin profile
> 
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for
> any loss, damage or destruction of data or any other property which may
> arise from relying on this email's technical content is explicitly
> disclaimed. The author will in no case be liable for any monetary damages
> arising from such loss, damage or destruction.
>
>
>
>
> On Tue, 20 Jun 2023 at 13:07, Bjørn Jørgensen <
> bjornjorgen...@gmail.com> wrote:
>
>> This is pandas API on spark
>>
>> from pyspark import pandas as 

Re: How to read excel file in PySpark

2023-06-20 Thread Mich Talebzadeh
OK thanks

So the issue seems to be creating  a Panda DF from Spark DF (I do it for
plotting with something like

import matplotlib.pyplot as plt
p_df = DF.toPandas()
p_df.plt()

I guess that stays in the driver.


Mich Talebzadeh,
Lead Solutions Architect/Engineering Lead
Palantir Technologies Limited
London
United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Tue, 20 Jun 2023 at 20:46, Sean Owen  wrote:

> No, a pandas on Spark DF is distributed.
>
> On Tue, Jun 20, 2023, 1:45 PM Mich Talebzadeh 
> wrote:
>
>> Thanks but if you create a Spark DF from Pandas DF that Spark DF is not
>> distributed and remains on the driver. I recall a while back we had this
>> conversation. I don't think anything has changed.
>>
>> Happy to be corrected
>>
>> Mich Talebzadeh,
>> Lead Solutions Architect/Engineering Lead
>> Palantir Technologies Limited
>> London
>> United Kingdom
>>
>>
>>view my Linkedin profile
>> 
>>
>>
>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> On Tue, 20 Jun 2023 at 20:09, Bjørn Jørgensen 
>> wrote:
>>
>>> Pandas API on spark is an API so that users can use spark as they use
>>> pandas. This was known as koalas.
>>>
>>> Is this limitation still valid for Pandas?
>>> For pandas, yes. But what I did show wos pandas API on spark so its
>>> spark.
>>>
>>>  Additionally when we convert from Panda DF to Spark DF, what process
>>> is involved under the bonnet?
>>> I gess pyarrow and drop the index column.
>>>
>>> Have a look at
>>> https://github.com/apache/spark/tree/master/python/pyspark/pandas
>>>
>>> tir. 20. juni 2023 kl. 19:05 skrev Mich Talebzadeh <
>>> mich.talebza...@gmail.com>:
>>>
 Whenever someone mentions Pandas I automatically think of it as an
 excel sheet for Python.

 OK my point below needs some qualification

 Why Spark here. Generally, parallel architecture comes into play when
 the data size is significantly large which cannot be handled on a single
 machine, hence, the use of Spark becomes meaningful. In cases where (the
 generated) data size is going to be very large (which is often norm rather
 than the exception these days), the data cannot be processed and stored in
 Pandas data frames as these data frames store data in RAM. Then, the whole
 dataset from a storage like HDFS or cloud storage cannot be collected,
 because it will take significant time and space and probably won't fit in a
 single machine RAM. (in this the driver memory)

 Is this limitation still valid for Pandas? Additionally when we convert
 from Panda DF to Spark DF, what process is involved under the bonnet?

 Thanks

 Mich Talebzadeh,
 Lead Solutions Architect/Engineering Lead
 Palantir Technologies Limited
 London
 United Kingdom


view my Linkedin profile
 


  https://en.everybodywiki.com/Mich_Talebzadeh



 *Disclaimer:* Use it at your own risk. Any and all responsibility for
 any loss, damage or destruction of data or any other property which may
 arise from relying on this email's technical content is explicitly
 disclaimed. The author will in no case be liable for any monetary damages
 arising from such loss, damage or destruction.




 On Tue, 20 Jun 2023 at 13:07, Bjørn Jørgensen 
 wrote:

> This is pandas API on spark
>
> from pyspark import pandas as ps
> df = ps.read_excel("testexcel.xlsx")
> [image: image.png]
> this will convert it to pyspark
> [image: image.png]
>
> tir. 20. juni 2023 kl. 13:42 skrev John Paul Jayme
> :
>
>> Good day,
>>
>>
>>
>> I have a task to read excel files in databricks but I cannot seem to
>> proceed. I am referencing the API documents -  read_excel
>> 
>> , but there is an error sparksession object has no attribute
>> 'read_excel'. Can you advise?
>>
>>
>>
>> 

Re: Shuffle data on pods which get decomissioned

2023-06-20 Thread Mich Talebzadeh
If one executor fails, it moves the processing over to another executor.
However, if the data is lost, it re-executes the processing that generated
the data, and might have to go back to the source.Does this mean that only
those tasks that the dead executor was executing at the time need to be
rerun to generate the processing stages. If I am correct,  It uses RDD
lineage to figure out what needs to be re-executed. Remember we are talking
about the executor failure not node failure hereI don’t know the details
how it determines which tasks to run, but I am guessing that it is a
multi-stage job, it might have to rerun all the stages again. For example,
if you have done a groupBy, you will have 2 stages. After the first stage,
the data will be shuffled by hashing the groupBy key , so that data for the
same value of key lands in the same partition. Now, if one of those
partitions is lost during execution of the second stage, I am guessing
Spark will have to go back and re-execute all the tasks in the first stage.

HTH

Mich Talebzadeh,
Lead Solutions Architect/Engineering Lead
Palantir Technologies Limited
London
United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Tue, 20 Jun 2023 at 20:07, Nikhil Goyal  wrote:

> Hi folks,
> When running Spark on K8s, what would happen to shuffle data if an
> executor is terminated or lost. Since there is no shuffle service, does all
> the work done by that executor gets recomputed?
>
> Thanks
> Nikhil
>


Re: How to read excel file in PySpark

2023-06-20 Thread Sean Owen
No, a pandas on Spark DF is distributed.

On Tue, Jun 20, 2023, 1:45 PM Mich Talebzadeh 
wrote:

> Thanks but if you create a Spark DF from Pandas DF that Spark DF is not
> distributed and remains on the driver. I recall a while back we had this
> conversation. I don't think anything has changed.
>
> Happy to be corrected
>
> Mich Talebzadeh,
> Lead Solutions Architect/Engineering Lead
> Palantir Technologies Limited
> London
> United Kingdom
>
>
>view my Linkedin profile
> 
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Tue, 20 Jun 2023 at 20:09, Bjørn Jørgensen 
> wrote:
>
>> Pandas API on spark is an API so that users can use spark as they use
>> pandas. This was known as koalas.
>>
>> Is this limitation still valid for Pandas?
>> For pandas, yes. But what I did show wos pandas API on spark so its spark.
>>
>>  Additionally when we convert from Panda DF to Spark DF, what process is
>> involved under the bonnet?
>> I gess pyarrow and drop the index column.
>>
>> Have a look at
>> https://github.com/apache/spark/tree/master/python/pyspark/pandas
>>
>> tir. 20. juni 2023 kl. 19:05 skrev Mich Talebzadeh <
>> mich.talebza...@gmail.com>:
>>
>>> Whenever someone mentions Pandas I automatically think of it as an excel
>>> sheet for Python.
>>>
>>> OK my point below needs some qualification
>>>
>>> Why Spark here. Generally, parallel architecture comes into play when
>>> the data size is significantly large which cannot be handled on a single
>>> machine, hence, the use of Spark becomes meaningful. In cases where (the
>>> generated) data size is going to be very large (which is often norm rather
>>> than the exception these days), the data cannot be processed and stored in
>>> Pandas data frames as these data frames store data in RAM. Then, the whole
>>> dataset from a storage like HDFS or cloud storage cannot be collected,
>>> because it will take significant time and space and probably won't fit in a
>>> single machine RAM. (in this the driver memory)
>>>
>>> Is this limitation still valid for Pandas? Additionally when we convert
>>> from Panda DF to Spark DF, what process is involved under the bonnet?
>>>
>>> Thanks
>>>
>>> Mich Talebzadeh,
>>> Lead Solutions Architect/Engineering Lead
>>> Palantir Technologies Limited
>>> London
>>> United Kingdom
>>>
>>>
>>>view my Linkedin profile
>>> 
>>>
>>>
>>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>>
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>>
>>> On Tue, 20 Jun 2023 at 13:07, Bjørn Jørgensen 
>>> wrote:
>>>
 This is pandas API on spark

 from pyspark import pandas as ps
 df = ps.read_excel("testexcel.xlsx")
 [image: image.png]
 this will convert it to pyspark
 [image: image.png]

 tir. 20. juni 2023 kl. 13:42 skrev John Paul Jayme
 :

> Good day,
>
>
>
> I have a task to read excel files in databricks but I cannot seem to
> proceed. I am referencing the API documents -  read_excel
> 
> , but there is an error sparksession object has no attribute
> 'read_excel'. Can you advise?
>
>
>
> *JOHN PAUL JAYME*
> Data Engineer
>
> m. +639055716384  w. www.tdcx.com
>
>
>
> *Winner of over 350 Industry Awards*
>
> [image: Linkedin]  [image:
> Facebook]  [image: Twitter]
>  [image: Youtube]
>  [image: Instagram]
> 
>
>
>
> This is a confidential email that may be privileged or legally
> protected. You are not authorized to copy or disclose the contents of this
> email. If you are not the intended addressee, please inform the sender and
> delete this email.
>
>
>
>
>


 --
 Bjørn Jørgensen
 Vestre Aspehaug 4, 6010 Ålesund
 Norge

 +47 480 94 297

>>>
>>
>> --
>> Bjørn Jørgensen
>> Vestre Aspehaug 4, 6010 Ålesund
>> Norge
>>
>> +47 480 94 297
>>
>


Re: How to read excel file in PySpark

2023-06-20 Thread Mich Talebzadeh
Thanks but if you create a Spark DF from Pandas DF that Spark DF is not
distributed and remains on the driver. I recall a while back we had this
conversation. I don't think anything has changed.

Happy to be corrected

Mich Talebzadeh,
Lead Solutions Architect/Engineering Lead
Palantir Technologies Limited
London
United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Tue, 20 Jun 2023 at 20:09, Bjørn Jørgensen 
wrote:

> Pandas API on spark is an API so that users can use spark as they use
> pandas. This was known as koalas.
>
> Is this limitation still valid for Pandas?
> For pandas, yes. But what I did show wos pandas API on spark so its spark.
>
>  Additionally when we convert from Panda DF to Spark DF, what process is
> involved under the bonnet?
> I gess pyarrow and drop the index column.
>
> Have a look at
> https://github.com/apache/spark/tree/master/python/pyspark/pandas
>
> tir. 20. juni 2023 kl. 19:05 skrev Mich Talebzadeh <
> mich.talebza...@gmail.com>:
>
>> Whenever someone mentions Pandas I automatically think of it as an excel
>> sheet for Python.
>>
>> OK my point below needs some qualification
>>
>> Why Spark here. Generally, parallel architecture comes into play when the
>> data size is significantly large which cannot be handled on a single
>> machine, hence, the use of Spark becomes meaningful. In cases where (the
>> generated) data size is going to be very large (which is often norm rather
>> than the exception these days), the data cannot be processed and stored in
>> Pandas data frames as these data frames store data in RAM. Then, the whole
>> dataset from a storage like HDFS or cloud storage cannot be collected,
>> because it will take significant time and space and probably won't fit in a
>> single machine RAM. (in this the driver memory)
>>
>> Is this limitation still valid for Pandas? Additionally when we convert
>> from Panda DF to Spark DF, what process is involved under the bonnet?
>>
>> Thanks
>>
>> Mich Talebzadeh,
>> Lead Solutions Architect/Engineering Lead
>> Palantir Technologies Limited
>> London
>> United Kingdom
>>
>>
>>view my Linkedin profile
>> 
>>
>>
>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> On Tue, 20 Jun 2023 at 13:07, Bjørn Jørgensen 
>> wrote:
>>
>>> This is pandas API on spark
>>>
>>> from pyspark import pandas as ps
>>> df = ps.read_excel("testexcel.xlsx")
>>> [image: image.png]
>>> this will convert it to pyspark
>>> [image: image.png]
>>>
>>> tir. 20. juni 2023 kl. 13:42 skrev John Paul Jayme
>>> :
>>>
 Good day,



 I have a task to read excel files in databricks but I cannot seem to
 proceed. I am referencing the API documents -  read_excel
 
 , but there is an error sparksession object has no attribute
 'read_excel'. Can you advise?



 *JOHN PAUL JAYME*
 Data Engineer

 m. +639055716384  w. www.tdcx.com



 *Winner of over 350 Industry Awards*

 [image: Linkedin]  [image:
 Facebook]  [image: Twitter]
  [image: Youtube]
  [image: Instagram]
 



 This is a confidential email that may be privileged or legally
 protected. You are not authorized to copy or disclose the contents of this
 email. If you are not the intended addressee, please inform the sender and
 delete this email.





>>>
>>>
>>> --
>>> Bjørn Jørgensen
>>> Vestre Aspehaug 4, 6010 Ålesund
>>> Norge
>>>
>>> +47 480 94 297
>>>
>>
>
> --
> Bjørn Jørgensen
> Vestre Aspehaug 4, 6010 Ålesund
> Norge
>
> +47 480 94 297
>


Re: How to read excel file in PySpark

2023-06-20 Thread Bjørn Jørgensen
Pandas API on spark is an API so that users can use spark as they use
pandas. This was known as koalas.

Is this limitation still valid for Pandas?
For pandas, yes. But what I did show wos pandas API on spark so its spark.

 Additionally when we convert from Panda DF to Spark DF, what process is
involved under the bonnet?
I gess pyarrow and drop the index column.

Have a look at
https://github.com/apache/spark/tree/master/python/pyspark/pandas

tir. 20. juni 2023 kl. 19:05 skrev Mich Talebzadeh <
mich.talebza...@gmail.com>:

> Whenever someone mentions Pandas I automatically think of it as an excel
> sheet for Python.
>
> OK my point below needs some qualification
>
> Why Spark here. Generally, parallel architecture comes into play when the
> data size is significantly large which cannot be handled on a single
> machine, hence, the use of Spark becomes meaningful. In cases where (the
> generated) data size is going to be very large (which is often norm rather
> than the exception these days), the data cannot be processed and stored in
> Pandas data frames as these data frames store data in RAM. Then, the whole
> dataset from a storage like HDFS or cloud storage cannot be collected,
> because it will take significant time and space and probably won't fit in a
> single machine RAM. (in this the driver memory)
>
> Is this limitation still valid for Pandas? Additionally when we convert
> from Panda DF to Spark DF, what process is involved under the bonnet?
>
> Thanks
>
> Mich Talebzadeh,
> Lead Solutions Architect/Engineering Lead
> Palantir Technologies Limited
> London
> United Kingdom
>
>
>view my Linkedin profile
> 
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Tue, 20 Jun 2023 at 13:07, Bjørn Jørgensen 
> wrote:
>
>> This is pandas API on spark
>>
>> from pyspark import pandas as ps
>> df = ps.read_excel("testexcel.xlsx")
>> [image: image.png]
>> this will convert it to pyspark
>> [image: image.png]
>>
>> tir. 20. juni 2023 kl. 13:42 skrev John Paul Jayme
>> :
>>
>>> Good day,
>>>
>>>
>>>
>>> I have a task to read excel files in databricks but I cannot seem to
>>> proceed. I am referencing the API documents -  read_excel
>>> 
>>> , but there is an error sparksession object has no attribute
>>> 'read_excel'. Can you advise?
>>>
>>>
>>>
>>> *JOHN PAUL JAYME*
>>> Data Engineer
>>>
>>> m. +639055716384  w. www.tdcx.com
>>>
>>>
>>>
>>> *Winner of over 350 Industry Awards*
>>>
>>> [image: Linkedin]  [image:
>>> Facebook]  [image: Twitter]
>>>  [image: Youtube]
>>>  [image: Instagram]
>>> 
>>>
>>>
>>>
>>> This is a confidential email that may be privileged or legally
>>> protected. You are not authorized to copy or disclose the contents of this
>>> email. If you are not the intended addressee, please inform the sender and
>>> delete this email.
>>>
>>>
>>>
>>>
>>>
>>
>>
>> --
>> Bjørn Jørgensen
>> Vestre Aspehaug 4, 6010 Ålesund
>> Norge
>>
>> +47 480 94 297
>>
>

-- 
Bjørn Jørgensen
Vestre Aspehaug 4, 6010 Ålesund
Norge

+47 480 94 297


Shuffle data on pods which get decomissioned

2023-06-20 Thread Nikhil Goyal
Hi folks,
When running Spark on K8s, what would happen to shuffle data if an executor
is terminated or lost. Since there is no shuffle service, does all the work
done by that executor gets recomputed?

Thanks
Nikhil


Re: How to read excel file in PySpark

2023-06-20 Thread Mich Talebzadeh
Whenever someone mentions Pandas I automatically think of it as an excel
sheet for Python.

OK my point below needs some qualification

Why Spark here. Generally, parallel architecture comes into play when the
data size is significantly large which cannot be handled on a single
machine, hence, the use of Spark becomes meaningful. In cases where (the
generated) data size is going to be very large (which is often norm rather
than the exception these days), the data cannot be processed and stored in
Pandas data frames as these data frames store data in RAM. Then, the whole
dataset from a storage like HDFS or cloud storage cannot be collected,
because it will take significant time and space and probably won't fit in a
single machine RAM. (in this the driver memory)

Is this limitation still valid for Pandas? Additionally when we convert
from Panda DF to Spark DF, what process is involved under the bonnet?

Thanks

Mich Talebzadeh,
Lead Solutions Architect/Engineering Lead
Palantir Technologies Limited
London
United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Tue, 20 Jun 2023 at 13:07, Bjørn Jørgensen 
wrote:

> This is pandas API on spark
>
> from pyspark import pandas as ps
> df = ps.read_excel("testexcel.xlsx")
> [image: image.png]
> this will convert it to pyspark
> [image: image.png]
>
> tir. 20. juni 2023 kl. 13:42 skrev John Paul Jayme
> :
>
>> Good day,
>>
>>
>>
>> I have a task to read excel files in databricks but I cannot seem to
>> proceed. I am referencing the API documents -  read_excel
>> 
>> , but there is an error sparksession object has no attribute
>> 'read_excel'. Can you advise?
>>
>>
>>
>> *JOHN PAUL JAYME*
>> Data Engineer
>>
>> m. +639055716384  w. www.tdcx.com
>>
>>
>>
>> *Winner of over 350 Industry Awards*
>>
>> [image: Linkedin]  [image:
>> Facebook]  [image: Twitter]
>>  [image: Youtube]
>>  [image: Instagram]
>> 
>>
>>
>>
>> This is a confidential email that may be privileged or legally protected.
>> You are not authorized to copy or disclose the contents of this email. If
>> you are not the intended addressee, please inform the sender and delete
>> this email.
>>
>>
>>
>>
>>
>
>
> --
> Bjørn Jørgensen
> Vestre Aspehaug 4, 6010 Ålesund
> Norge
>
> +47 480 94 297
>


Re: How to read excel file in PySpark

2023-06-20 Thread Bjørn Jørgensen
This is pandas API on spark

from pyspark import pandas as ps
df = ps.read_excel("testexcel.xlsx")
[image: image.png]
this will convert it to pyspark
[image: image.png]

tir. 20. juni 2023 kl. 13:42 skrev John Paul Jayme
:

> Good day,
>
>
>
> I have a task to read excel files in databricks but I cannot seem to
> proceed. I am referencing the API documents -  read_excel
> 
> , but there is an error sparksession object has no attribute
> 'read_excel'. Can you advise?
>
>
>
> *JOHN PAUL JAYME*
> Data Engineer
>
> m. +639055716384  w. www.tdcx.com
>
>
>
> *Winner of over 350 Industry Awards*
>
> [image: Linkedin]  [image:
> Facebook]  [image: Twitter]
>  [image: Youtube]
>  [image: Instagram]
> 
>
>
>
> This is a confidential email that may be privileged or legally protected.
> You are not authorized to copy or disclose the contents of this email. If
> you are not the intended addressee, please inform the sender and delete
> this email.
>
>
>
>
>


-- 
Bjørn Jørgensen
Vestre Aspehaug 4, 6010 Ålesund
Norge

+47 480 94 297


Re: How to read excel file in PySpark

2023-06-20 Thread Sean Owen
It is indeed not part of SparkSession. See the link you cite. It is part of
the pyspark pandas API

On Tue, Jun 20, 2023, 5:42 AM John Paul Jayme 
wrote:

> Good day,
>
>
>
> I have a task to read excel files in databricks but I cannot seem to
> proceed. I am referencing the API documents -  read_excel
> 
> , but there is an error sparksession object has no attribute
> 'read_excel'. Can you advise?
>
>
>
> *JOHN PAUL JAYME*
> Data Engineer
>
> m. +639055716384  w. www.tdcx.com
>
>
>
> *Winner of over 350 Industry Awards*
>
> [image: Linkedin]  [image:
> Facebook]  [image: Twitter]
>  [image: Youtube]
>  [image: Instagram]
> 
>
>
>
> This is a confidential email that may be privileged or legally protected.
> You are not authorized to copy or disclose the contents of this email. If
> you are not the intended addressee, please inform the sender and delete
> this email.
>
>
>
>
>


How to read excel file in PySpark

2023-06-20 Thread John Paul Jayme
Good day,

I have a task to read excel files in databricks but I cannot seem to proceed. I 
am referencing the API documents -  
read_excel
 , but there is an error sparksession object has no attribute 'read_excel'. Can 
you advise?

JOHN PAUL JAYME
Data Engineer
[https://app.tdcx.com/email-signature/assets/img/tdcx-logo.png]
m. +639055716384  w. www.tdcx.com

Winner of over 350 Industry Awards
[Linkedin] [Facebook] 
  [Twitter] 
  [Youtube] 
  [Instagram] 


This is a confidential email that may be privileged or legally protected. You 
are not authorized to copy or disclose the contents of this email. If you are 
not the intended addressee, please inform the sender and delete this email.




Re: implement a distribution without shuffle like RDD.coalesce for DataSource V2 write

2023-06-18 Thread Mich Talebzadeh
OK the number of partitions n or more to the point the "optimum" no of
partitions depends on the size of your batch data DF among other things and
the degree of parallelism at the end point where you will be writing to
sink. If you require high parallelism because your tasks are fine grained,
then a high number for n will be beneficial. Otherwise a coarser writes
that n may be smaller.Since you just want to use the API for
simple transformation, then keeping n smaller may help.

In short I don't think there is a magic number for n that fits all
occasions and likely there is no specific mention of optimum number for n
in the docs. It is likely an iterative process for you to determine the
value of the number of partitions that will work  for your
specific workload and that workload can change and you have to adjust n.
Spark GUI should help to get desired balance between parallelism and
resource efficiency.

HTH

Mich Talebzadeh,
Lead Solutions Architect/Engineering Lead
Palantir Technologies Limited
London
United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Sun, 18 Jun 2023 at 12:08, Pengfei Li  wrote:

> Thanks for you reply. Yes, it's similar to what I want, although I'm using
> batch rather than structured streaming, but I think there is not much
> difference.
> What I want is similar to the property `numPartitions` of JDBC DataSource
> [1]. This is what it describes
> ```
> The maximum number of partitions that can be used for parallelism in table
> reading and writing. This also determines the maximum number of concurrent
> JDBC connections. If the number of partitions to write exceeds this limit,
> we decrease it to this limit by calling coalesce(numPartitions) before
> writing.
> ```
> I want to control the number of partitions using a similar way with `calling
> coalesce(numPartitions) ` because of lower cost of shuffle. The JDBC
> implements it like this [2]
> ```
> df.coalesce(n).rdd.foreachPartition { iterator => savePartition(
>   table, iterator, rddSchema, insertStmt, batchSize, dialect,
> isolationLevel, options)
> }
> ```
> Is it possible to do that for DataSource V2?
>
> [1] https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html
> [2]
> https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala#L877
>
> Thank You
> Best Regards
>
> Mich Talebzadeh  于2023年6月18日周日 18:10写道:
>
>> Is this the point you are trying to implement?
>>
>> I have state data source which enables the state in SS --> Structured 
>> Streaming to be rewritten, which enables repartitioning, schema evolution, 
>> etc via batch query. The writer requires hash partitioning against group 
>> key, with the "desired number of partitions", which is same as what Spark 
>> does read and write against state.
>>
>> This is now implemented as DSv1, and the requirement is *simply done by 
>> calling repartition with the "desired number".*
>>
>> ```
>> val fullPathsForKeyColumns = keySchema.map(key => new 
>> Column(s"key.${key.name}"))
>> data
>>   .repartition(*newPartitions*, fullPathsForKeyColumns: _*)
>>   .queryExecution
>>   .toRdd
>>   .foreachPartition(
>> writeFn(resolvedCpLocation, version, operatorId, storeName, keySchema, 
>> valueSchema,
>>   storeConf, hadoopConfBroadcast, queryId))
>>
>> Well Spark will not know the optimum value of newPartitions and you will 
>> need to work out that from SS size.
>>
>> Is that a correct understanding?
>>
>> HTH
>>
>>
>> Mich Talebzadeh,
>> Lead Solutions Architect/Engineering Lead
>> Palantir Technologies Limited
>> London
>> United Kingdom
>>
>>
>>view my Linkedin profile
>> 
>>
>>
>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> On Sun, 18 Jun 2023 at 10:12, Pengfei Li  wrote:
>>
>>> Hi All,
>>>
>>> I'm developing a DataSource on Spark 3.2 to write data to our system,
>>> and using DataSource V2 API. I want to implement the interface
>>> RequiresDistributionAndOrdering
>>> 
>>>  to
>>> 

Re: implement a distribution without shuffle like RDD.coalesce for DataSource V2 write

2023-06-18 Thread Mich Talebzadeh
Is this the point you are trying to implement?

I have state data source which enables the state in SS --> Structured
Streaming to be rewritten, which enables repartitioning, schema
evolution, etc via batch query. The writer requires hash partitioning
against group key, with the "desired number of partitions", which is
same as what Spark does read and write against state.

This is now implemented as DSv1, and the requirement is *simply done
by calling repartition with the "desired number".*

```
val fullPathsForKeyColumns = keySchema.map(key => new
Column(s"key.${key.name}"))
data
  .repartition(*newPartitions*, fullPathsForKeyColumns: _*)
  .queryExecution
  .toRdd
  .foreachPartition(
writeFn(resolvedCpLocation, version, operatorId, storeName,
keySchema, valueSchema,
  storeConf, hadoopConfBroadcast, queryId))

Well Spark will not know the optimum value of newPartitions and you
will need to work out that from SS size.

Is that a correct understanding?

HTH


Mich Talebzadeh,
Lead Solutions Architect/Engineering Lead
Palantir Technologies Limited
London
United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Sun, 18 Jun 2023 at 10:12, Pengfei Li  wrote:

> Hi All,
>
> I'm developing a DataSource on Spark 3.2 to write data to our system,
> and using DataSource V2 API. I want to implement the interface
> RequiresDistributionAndOrdering
> 
>  to
> set the number of partitions used for write. But I don't know how to
> implement a distribution without shuffle as  RDD.coalesce does. Is there
> any example or advice?
>
> Thank You
> Best Regards
>


implement a distribution without shuffle like RDD.coalesce for DataSource V2 write

2023-06-18 Thread Pengfei Li
Hi All,

I'm developing a DataSource on Spark 3.2 to write data to our system,
and using DataSource V2 API. I want to implement the interface
RequiresDistributionAndOrdering

to
set the number of partitions used for write. But I don't know how to
implement a distribution without shuffle as  RDD.coalesce does. Is there
any example or advice?

Thank You
Best Regards


TAC Applications for Community Over Code North America and Asia now open

2023-06-16 Thread Gavin McDonald
Hi All,

(This email goes out to all our user and dev project mailing lists, so you
may receive this
email more than once.)

The Travel Assistance Committee has opened up applications to help get
people to the following events:


*Community Over Code Asia 2023 - *
*August 18th to August 20th in Beijing , China*

Applications for this event closes on the 6th July so time is short, please
apply as soon as possible. TAC is prioritising applications from the Asia
and Oceania regions.

More details on this event can be found at:
https://apachecon.com/acasia2023/

More information on how to apply please read: https://tac.apache.org/


*Community Over Code North America - *
*October 7th to October 10th in Halifax, Canada*

Applications for this event closes on the 22nd July. We expect many
applications so please do apply as soon as you can. TAC is prioritising
applications from the North and South America regions.

More details on this event can be found at: https://communityovercode.org/

More information on how to apply please read: https://tac.apache.org/


*Have you applied to be a Speaker?*

If you have applied or intend to apply as a Speaker at either of these
events, and think you
may require assistance for Travel and/or Accommodation - TAC advises that
you do not
wait until you have been notified of your speaker status and to apply
early. Should you
not be accepted as a speaker and still wish to attend you can amend you
application to
include Conference fees, or, you may withdraw your application.

The call for presentations for Halifax is here:
https://communityovercode.org/call-for-presentations/
and you have until the 13th of July to apply.

The call for presentations for Beijing is here:
https://apachecon.com/acasia2023/cfp.html
and you have until the 18th June to apply.

*IMPORTANT Note on Visas:*

It is important that you apply for a Visa as soon as possible - do not wait
until you know if you have been accepted for Travel Assistance or not, as
due to current wait times for Interviews in some Countries, waiting that
long may be too late, so please do apply for a Visa right away. Contact
tac-ap...@tac.apache.org if you need any more information or assistance in
this area.

*Spread the Word!!*

TAC encourages you to spread the word about Travel Assistance to get to
these events, so feel free to repost as you see fit on Social Media, at
work, schools, universities etc etc...

Thank You and hope to see you all soon

Gavin McDonald on behalf of the ASF Travel Assistance Committee.


Fwd: iceberg queries

2023-06-15 Thread Gaurav Agarwal
Hi Team,

Sample Merge query:

df.createOrReplaceTempView("source")

MERGE INTO iceberg_hive_cat.iceberg_poc_db.iceberg_tab target
USING (SELECT * FROM source)
ON target.col1 = source.col1// this is my bucket column
WHEN MATCHED  THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *

The source dataset is a temporary view and it contains 1.5 million records
in future can 20 Million rows and with id that have 16 buckets.
The target iceberg table has 16 buckets . The source dataset will only
update if matched and insert if not matched with those id

I have 1700 columns in my table.

spark dataset is using default partitioning , do we need to bucket the
spark dataset on bucket column as well ?

Let me know if you need any further details.

it fails with OOME ,

Regards
Gaurav


Re: Spark using iceberg

2023-06-15 Thread Gaurav Agarwal
> HI
>
> I am using spark with iceberg, updating the table with 1700 columns ,
> We are loading 0.6 Million rows from parquet files ,in future it will be
> 16 Million rows and trying to update the data in the table which has 16
> buckets .
> Using the default partitioner of spark .Also we don't do any
> repartitioning of the dataset.on the bucketing column,
> One of the executor fails with OOME , and it recovers and again fails.when
> we are using Merge Into strategy of iceberg
> Merge into target( select * from source) on Target.id= source.id when
> matched then update set
> When not matched then insert
>
> But when we do append blind append . this works.
>
> Question :
>
> How to find what the issue is ? as we are running spark on EKS cluster
> .when executor gives OOME it dies logs also gone , unable to see the logs.
>
> DO we need to partition of the column in the dataset ? when at the time of
> loading or once the data is loaded .
>
> Need help to understand?
>
>


Spark using iceberg

2023-06-15 Thread Gaurav Agarwal
HI

I am using spark with iceberg, updating the table with 1700 columns ,
We are loading 0.6 Million rows from parquet files ,in future it will be 16
Million rows and trying to update the data in the table which has 16
buckets .
Using the default partitioner of spark .Also we don't do any repartitioning
of the dataset.on the bucketing column,
One of the executor fails with OOME , and it recovers and again fails.when
we are using Merge Into strategy of iceberg
Merge into target( select * from source) on Target.id= source.id when
matched then update set
When not matched then insert

But when we do append blind append . this works.

Question :

How to find what the issue is ? as we are running spark on EKS cluster
.when executor gives OOME it dies logs also gone , unable to see the logs.

DO we need to partition of the column in the dataset ? when at the time of
loading or once the data is loaded .

Need help to understand?


Unsubscribe

2023-06-11 Thread Yu voidy



Re: [Feature Request] create *permanent* Spark View from DataFrame via PySpark

2023-06-09 Thread Wenchen Fan
DataFrame view stores the logical plan, while SQL view stores SQL text. I
don't think we can support this feature until we have a reliable way to
materialize logical plans.

On Sun, Jun 4, 2023 at 10:31 PM Mich Talebzadeh 
wrote:

> Try sending it to d...@spark.apache.org (and join that group)
>
> You need to raise a JIRA for this request plus related doc related
>
>
> Example JIRA
>
> https://issues.apache.org/jira/browse/SPARK-42485
>
> and the related *Spark project improvement proposals (SPIP) *to be filled
> in
>
> https://spark.apache.org/improvement-proposals.html
>
>
> HTH
>
>
> Mich Talebzadeh,
> Lead Solutions Architect/Engineering Lead
> Palantir Technologies Limited
> London
> United Kingdom
>
>
>view my Linkedin profile
> 
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Sun, 4 Jun 2023 at 12:38, keen  wrote:
>
>> Do Spark **devs** read this mailing list?
>> Is there another/a better way to make feature requests?
>> I tried in the past to write a mail to the dev mailing list but it did
>> not show at all.
>>
>> Cheers
>>
>> keen  schrieb am Do., 1. Juni 2023, 07:11:
>>
>>> Hi all,
>>> currently only *temporary* Spark Views can be created from a DataFrame
>>> (df.createOrReplaceTempView or df.createOrReplaceGlobalTempView).
>>>
>>> When I want a *permanent* Spark View I need to specify it via Spark SQL
>>> (CREATE VIEW AS SELECT ...).
>>>
>>> Sometimes it is easier to specify the desired logic of the View through
>>> Spark/PySpark DataFrame API.
>>> Therefore, I'd like to suggest to implement a new PySpark method that
>>> allows creating a *permanent* Spark View from a DataFrame
>>> (df.createOrReplaceView).
>>>
>>> see also:
>>>
>>> https://community.databricks.com/s/question/0D53f1PANVgCAP/is-there-a-way-to-create-a-nontemporary-spark-view-with-pyspark
>>>
>>> Regards
>>> Martin
>>>
>>


Announcing the Community Over Code 2023 Streaming Track

2023-06-09 Thread James Hughes
Hi all,

Community Over Code , the ASF conference,
will be held in Halifax, Nova Scotia October 7-10, 2023. The call for
presentations  is
open now through July 13, 2023.

I am one of the co-chairs for the stream processing track, and we would
love to see you there and hope that you will consider submitting a talk.

About the Streaming track:

There are many top-level ASF projects which focus on and push the envelope
for stream and event processing.  ActiveMQ, Beam, Bookkeeper, Camel, Flink,
Kafka, Pulsar, RocketMQ, and Spark are all house-hold names in the stream
processing and analytics world at this point.  These projects show that
stream processing has unique characteristics requiring deep expertise.  On
the other hand, users need easy to apply solutions.  The streaming track
will host talks focused on the use cases and advances of these projects as
well as other developments in the streaming world.

Thanks and see you in October!

Jim


Re: Apache Spark not reading UTC timestamp from MongoDB correctly

2023-06-08 Thread Enrico Minack
Sean is right, casting timestamps to strings (which is what show() does) 
uses the local timezone, either the Java default zone `user.timezone`, 
the Spark default zone `spark.sql.session.timeZone` or the default 
DataFrameWriter zone `timeZone`(when writing to file).


You say you are in PST, which is UTC - 8 hours. But I think this 
currently observes daylight saving, so PDT, which is UTC - 7 hours.


Then, your UTC timestamp is correctly displayed in local PDT time. Try 
the change above settings to display in different timezones. Inspecting 
the underlying long value as suggested by Sean is best practice to get 
hold of the true timestamp.


Cheers,
Enrico


Am 09.06.23 um 00:53 schrieb Sean Owen:
You sure it is not just that it's displaying in your local TZ? Check 
the actual value as a long for example. That is likely the same time.


On Thu, Jun 8, 2023, 5:50 PM karan alang  wrote:

ref :

https://stackoverflow.com/questions/76436159/apache-spark-not-reading-utc-timestamp-from-mongodb-correctly

Hello All,
I've data stored in MongoDB collection and the timestamp column is
not being read by Apache Spark correctly. I'm running Apache Spark
on GCP Dataproc.

Here is sample data :

-

|In Mongo : timeslot_date : timeslot |timeslot_date |
+--+--
1683527400|{2023-05-08T06:30:00Z}| When I use pyspark to read this
: +--+---+ timeslot |timeslot_date |
+--+---+ 1683527400|2023-05-07 23:30:00|
++---+-|

|-|

|

My understanding is, data in Mongo is in UTC format i.e.
2023-05-08T06:30:00Z is in UTC format. I'm in PST timezone. I'm
not clear why spark is reading it a different timezone format
(neither PST nor UTC) Note - it is not reading it as PST timezone,
if it was doing that it would advance the time by 7 hours, instead
it is doing the opposite.

Where is the default timezone format taken from, when Spark is
reading data from MongoDB ?

Any ideas on this ?

tia!

|




Re: Apache Spark not reading UTC timestamp from MongoDB correctly

2023-06-08 Thread Sean Owen
You sure it is not just that it's displaying in your local TZ? Check the
actual value as a long for example. That is likely the same time.

On Thu, Jun 8, 2023, 5:50 PM karan alang  wrote:

> ref :
> https://stackoverflow.com/questions/76436159/apache-spark-not-reading-utc-timestamp-from-mongodb-correctly
>
> Hello All,
> I've data stored in MongoDB collection and the timestamp column is not
> being read by Apache Spark correctly. I'm running Apache Spark on GCP
> Dataproc.
>
> Here is sample data :
>
> -
>
> In Mongo :
>
> timeslot_date  :
> timeslot  |timeslot_date |
> +--+--1683527400|{2023-05-08T06:30:00Z}|
>
>
> When I use pyspark to read this  :
>
> +--+---+
> timeslot  |timeslot_date  |
> +--+---+1683527400|2023-05-07 23:30:00|
> ++---+-
>
> -
>
> My understanding is, data in Mongo is in UTC format i.e. 2023-05-08T06:30:00Z 
> is in UTC format. I'm in PST timezone. I'm not clear why spark is reading it 
> a different timezone format (neither PST nor UTC) Note - it is not reading it 
> as PST timezone, if it was doing that it would advance the time by 7 hours, 
> instead it is doing the opposite.
>
> Where is the default timezone format taken from, when Spark is reading data 
> from MongoDB ?
>
> Any ideas on this ?
>
> tia!
>
>
>
>
>


Apache Spark not reading UTC timestamp from MongoDB correctly

2023-06-08 Thread karan alang
ref :
https://stackoverflow.com/questions/76436159/apache-spark-not-reading-utc-timestamp-from-mongodb-correctly

Hello All,
I've data stored in MongoDB collection and the timestamp column is not
being read by Apache Spark correctly. I'm running Apache Spark on GCP
Dataproc.

Here is sample data :

-

In Mongo :

timeslot_date  :
timeslot  |timeslot_date |
+--+--1683527400|{2023-05-08T06:30:00Z}|


When I use pyspark to read this  :

+--+---+
timeslot  |timeslot_date  |
+--+---+1683527400|2023-05-07 23:30:00|
++---+-

-

My understanding is, data in Mongo is in UTC format i.e.
2023-05-08T06:30:00Z is in UTC format. I'm in PST timezone. I'm not
clear why spark is reading it a different timezone format (neither PST
nor UTC) Note - it is not reading it as PST timezone, if it was doing
that it would advance the time by 7 hours, instead it is doing the
opposite.

Where is the default timezone format taken from, when Spark is reading
data from MongoDB ?

Any ideas on this ?

tia!


Getting SparkRuntimeException: Unexpected value for length in function slice: length must be greater than or equal to 0

2023-06-06 Thread Bariudin, Daniel
I'm using Pyspark (version 3.2) and I've encountered the following exception 
while trying to perform a slice on array in a DataFrame:
"org.apache.spark.SparkRuntimeException: Unexpected value for length in 
function slice: length must be greater than or equal to 0" but the length is 
grater then 1

Here's the full exception I'm receiving:

```
Caused by: org.apache.spark.SparkRuntimeException: Unexpected value for length 
in function slice: length must be greater than or equal to 0.
at 
org.apache.spark.sql.errors.QueryExecutionErrors$.unexpectedValueForLengthInFunctionError(QueryExecutionErrors.scala:1602)
at 
org.apache.spark.sql.errors.QueryExecutionErrors.unexpectedValueForLengthInFunctionError(QueryExecutionErrors.scala)
at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificPredicate.Slice_0$(Unknown
 Source)
at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificPredicate.subExpr_1$(Unknown
 Source)
at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificPredicate.eval(Unknown
 Source)
at 
org.apache.spark.sql.execution.FilterExec.$anonfun$doExecute$3(basicPhysicalOperators.scala:276)
at 
org.apache.spark.sql.execution.FilterExec.$anonfun$doExecute$3$adapted(basicPhysicalOperators.scala:275)
at 
scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:515)
at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown
 Source)
at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
at 
scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.processNext(Unknown
 Source)
at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
at 
org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:388)
at 
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:888)
at 
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:888)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at 
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
at 
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:92)
at 
org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
at org.apache.spark.scheduler.Task.run(Task.scala:139)
at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
at 
org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
at 
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
... 1 more
```

The DataFrame I'm working with has the following data:

```
'id': '2',
'arr2': [1, 1, 1, 2],
'arr1': [0.0, 1.0, 1.0, 1.0]

```

And the schema of the DataFrame is as follows:

```
root
|-- id: string (nullable = true)
|-- arr2: array (nullable = true)
||-- element: long (containsNull = true)
|-- arr1: array (nullable = true)
||-- element: double (containsNull = true)
```

The code is:

```
spark = 
SparkSession.builder.master("local").appName("test-app").getOrCreate()
data = [
{
'id': '2',
'arr2': [1, 1, 1, 2],
'arr1': [0.0, 1.0, 1.0, 1.0]
}
]
df = spark.createDataFrame(data)


df = df.withColumn('end_index', F.size('arr2') - F.lit(6))
df = df.filter(F.col('end_index') > 0) #Note HERE
df = df.withColumn("trimmed_arr2",F.slice(F.col('arr2'), start=F.lit(1), 
length=F.col('end_index')))
df = df.withColumn("avg_trimmed", F.expr('aggregate(trimmed_arr2, 0L, 
(acc,x) -> acc+x, acc -> acc / end_index)'))
df = df.filter(F.col('avg_trimmed') > 30)
df = df.withColumn('repeated_counts', 
F.size(F.array_distinct('trimmed_arr2')))
df = df.withColumn('ratio', F.col('repeated_counts') / 
F.size('trimmed_arr2'))
df = df.filter(F.col('ratio') > 0.6)
df.show(truncate=False, vertical=True)
```

What's strange is that when I write the DataFrame to disk 

Re: [Feature Request] create *permanent* Spark View from DataFrame via PySpark

2023-06-04 Thread Mich Talebzadeh
Try sending it to d...@spark.apache.org (and join that group)

You need to raise a JIRA for this request plus related doc related


Example JIRA

https://issues.apache.org/jira/browse/SPARK-42485

and the related *Spark project improvement proposals (SPIP) *to be filled in

https://spark.apache.org/improvement-proposals.html


HTH


Mich Talebzadeh,
Lead Solutions Architect/Engineering Lead
Palantir Technologies Limited
London
United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Sun, 4 Jun 2023 at 12:38, keen  wrote:

> Do Spark **devs** read this mailing list?
> Is there another/a better way to make feature requests?
> I tried in the past to write a mail to the dev mailing list but it did not
> show at all.
>
> Cheers
>
> keen  schrieb am Do., 1. Juni 2023, 07:11:
>
>> Hi all,
>> currently only *temporary* Spark Views can be created from a DataFrame
>> (df.createOrReplaceTempView or df.createOrReplaceGlobalTempView).
>>
>> When I want a *permanent* Spark View I need to specify it via Spark SQL
>> (CREATE VIEW AS SELECT ...).
>>
>> Sometimes it is easier to specify the desired logic of the View through
>> Spark/PySpark DataFrame API.
>> Therefore, I'd like to suggest to implement a new PySpark method that
>> allows creating a *permanent* Spark View from a DataFrame
>> (df.createOrReplaceView).
>>
>> see also:
>>
>> https://community.databricks.com/s/question/0D53f1PANVgCAP/is-there-a-way-to-create-a-nontemporary-spark-view-with-pyspark
>>
>> Regards
>> Martin
>>
>


Re: [Feature Request] create *permanent* Spark View from DataFrame via PySpark

2023-06-04 Thread keen
Do Spark **devs** read this mailing list?
Is there another/a better way to make feature requests?
I tried in the past to write a mail to the dev mailing list but it did not
show at all.

Cheers

keen  schrieb am Do., 1. Juni 2023, 07:11:

> Hi all,
> currently only *temporary* Spark Views can be created from a DataFrame
> (df.createOrReplaceTempView or df.createOrReplaceGlobalTempView).
>
> When I want a *permanent* Spark View I need to specify it via Spark SQL
> (CREATE VIEW AS SELECT ...).
>
> Sometimes it is easier to specify the desired logic of the View through
> Spark/PySpark DataFrame API.
> Therefore, I'd like to suggest to implement a new PySpark method that
> allows creating a *permanent* Spark View from a DataFrame
> (df.createOrReplaceView).
>
> see also:
>
> https://community.databricks.com/s/question/0D53f1PANVgCAP/is-there-a-way-to-create-a-nontemporary-spark-view-with-pyspark
>
> Regards
> Martin
>


[Feature Request] create *permanent* Spark View from DataFrame via PySpark

2023-06-01 Thread keen
Hi all,
currently only *temporary* Spark Views can be created from a DataFrame
(df.createOrReplaceTempView or df.createOrReplaceGlobalTempView).

When I want a *permanent* Spark View I need to specify it via Spark SQL
(CREATE VIEW AS SELECT ...).

Sometimes it is easier to specify the desired logic of the View through
Spark/PySpark DataFrame API.
Therefore, I'd like to suggest to implement a new PySpark method that
allows creating a *permanent* Spark View from a DataFrame
(df.createOrReplaceView).

see also:
https://community.databricks.com/s/question/0D53f1PANVgCAP/is-there-a-way-to-create-a-nontemporary-spark-view-with-pyspark

Regards
Martin


<    6   7   8   9   10   11   12   13   14   15   >