Re: Naming files while saving a Dataframe

2021-08-12 Thread Eric Beabes
This doesn't work as given here (
https://stackoverflow.com/questions/36107581/change-output-filename-prefix-for-dataframe-write)
but the answer suggests using FileOutputFormat class. Will try that.
Thanks. Regards.

On Sun, Jul 18, 2021 at 12:44 AM Jörn Franke  wrote:

> Spark heavily depends on Hadoop writing files. You can try to set the
> Hadoop property: mapreduce.output.basename
>
>
> https://spark.apache.org/docs/latest/api/java/org/apache/spark/SparkContext.html#hadoopConfiguration--
>
>
> Am 18.07.2021 um 01:15 schrieb Eric Beabes :
>
> 
> Mich - You're suggesting changing the "Path". Problem is that, we've an
> EXTERNAL table created on top of this path so "Path" CANNOT change. If we
> could, it would be easy to solve this problem. My question is about
> changing the "Filename".
>
> As Ayan pointed out, Spark doesn't seem to allow "prefixes" for the
> filenames!
>
> On Sat, Jul 17, 2021 at 1:58 PM Mich Talebzadeh 
> wrote:
>
>> Using this
>>
>> df.write.mode("overwrite").format("parquet").saveAsTable("test.ABCD")
>>
>> That will create a parquet table in the database test. which is
>> essentially a hive partition in the format
>>
>> /user/hive/warehouse/test.db/abcd/00_0
>>
>>
>>view my Linkedin profile
>> 
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> On Sat, 17 Jul 2021 at 20:45, Eric Beabes 
>> wrote:
>>
>>> I am not sure if you've understood the question. Here's how we're saving
>>> the DataFrame:
>>>
>>> df
>>>   .coalesce(numFiles)
>>>   .write
>>>   .partitionBy(partitionDate)
>>>   .mode("overwrite")
>>>   .format("parquet")
>>>
>>>   .save(*someDirectory*)
>>>
>>>
>>> Now where would I add a 'prefix' in this one?
>>>
>>>
>>> On Sat, Jul 17, 2021 at 10:54 AM Mich Talebzadeh <
>>> mich.talebza...@gmail.com> wrote:
>>>
 try it see if it works

 fullyQualifiedTableName = appName+'_'+tableName



view my Linkedin profile
 



 *Disclaimer:* Use it at your own risk. Any and all responsibility for
 any loss, damage or destruction of data or any other property which may
 arise from relying on this email's technical content is explicitly
 disclaimed. The author will in no case be liable for any monetary damages
 arising from such loss, damage or destruction.




 On Sat, 17 Jul 2021 at 18:02, Eric Beabes 
 wrote:

> I don't think Spark allows adding a 'prefix' to the file name, does
> it? If it does, please tell me how. Thanks.
>
> On Sat, Jul 17, 2021 at 9:47 AM Mich Talebzadeh <
> mich.talebza...@gmail.com> wrote:
>
>> Jobs have names in spark. You can prefix it to the file name when
>> writing to directory I guess
>>
>>  val sparkConf = new SparkConf().
>>setAppName(sparkAppName).
>>
>>
>>
>>
>>view my Linkedin profile
>> 
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility
>> for any loss, damage or destruction of data or any other property which 
>> may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> On Sat, 17 Jul 2021 at 17:40, Eric Beabes 
>> wrote:
>>
>>> Reason we've two jobs writing to the same directory is that the data
>>> is partitioned by 'day' (mmdd) but the job runs hourly. Maybe the 
>>> only
>>> way to do this is to create an hourly partition (/mmdd/hh). Is that 
>>> the
>>> only way to solve this?
>>>
>>> On Fri, Jul 16, 2021 at 5:45 PM ayan guha 
>>> wrote:
>>>
 IMHO - this is a bad idea esp in failure scenarios.

 How about creating a subfolder each for the jobs?

 On Sat, 17 Jul 2021 at 9:11 am, Eric Beabes <
 mailinglist...@gmail.com> wrote:

> We've two (or more) jobs that write data into the same directory
> via a Dataframe.save method. We need to be able to figure out which 
> job
> wrote which file. Maybe provide a 'prefix' to the file names. I was
> wondering if there's any 'option' that allows us to do this. Googling
> didn't come up with any solution so thought of asking the Spark 
> experts on
> this mailing list.
>
> Thanks in advance.
>
 --

Replacing BroadcastNestedLoopJoin

2021-08-12 Thread Eric Beabes
We’ve two datasets that look like this:

Dataset A: App specific data that contains (among other fields): ip_address


Dataset B: Location data that contains start_ip_address_int,
end_ip_address_int, latitude, longitude

We’re (left) joining these two datasets as: A.ip_address >=
B.start_ip_address_int AND A.ip_address <= B.end_ip_address_int. When
there's a match, we pick latitude & longitude from Dataset B.

This works fine but it takes a LONG time (over 20 minutes) to complete for
SMALL datasets.

Dataset A => Usually contains 110,000

Dataset B => Contains 12.5 Million rows. This is “static” data. Hasn’t
changed since August 2020.

When we looked at the DAG, it seems a BroadcastNestedLoopJoin is getting
used which supposedly is very slow. It seems Spark selects it by default
when we have “in equal” conditions such as “greater than”, “less than”.

What’s the best way to speed up this process? Thanks in advance.


Re: K8S submit client vs. cluster

2021-08-12 Thread Mich Talebzadeh
OK amazon not much difference compared to Google Cloud Kubernetes Engines
(GKE).

When I submit a job, you need a powerful compute server to submit the job.
It is another host but you cannot submit from K8s cluster nodes (I am not
aware if one can actually do that).

Anyway you submit something like below

 spark-submit --verbose \
   --properties-file ${property_file} \
   --master k8s://https://$KUBERNETES_MASTER_IP:443 \
  * --deploy-mode cluster \*
   --name pytest \
   --conf
spark.yarn.appMasterEnv.PYSPARK_PYTHON=./pyspark_venv/bin/python \
   --py-files $CODE_DIRECTORY/DSBQ.zip \
   --conf spark.kubernetes.namespace=$NAMESPACE \
   --conf spark.executor.memory=5000m \
   --conf spark.network.timeout=300 \
   --conf spark.executor.instances=3 \
   --conf spark.kubernetes.driver.limit.cores=1 \
   --conf spark.driver.cores=1 \
   --conf spark.executor.cores=1 \
   --conf spark.executor.memory=2000m \
   --conf spark.kubernetes.driver.docker.image=${IMAGEGCP} \
   --conf spark.kubernetes.executor.docker.image=${IMAGEGCP} \
   --conf spark.kubernetes.container.image=${IMAGEGCP} \
   --conf
spark.kubernetes.authenticate.driver.serviceAccountName=spark-bq \
   --conf
spark.driver.extraJavaOptions="-Dio.netty.tryReflectionSetAccessible=true" \
   --conf
spark.executor.extraJavaOptions="-Dio.netty.tryReflectionSetAccessible=true"
\
   --conf spark.sql.execution.arrow.pyspark.enabled="true" \
   $CODE_DIRECTORY/${APPLICATION}

This is a PySpark job and I have told Spark to run it  in cluster mode. The
docker image I built is Spark version 3.1.1 with Java 8. Java 11 would not
work.


However, under the bonnet it is run in a client mode


+ CMD=("$SPARK_HOME/bin/spark-submit" --conf
"spark.driver.bindAddress=$SPARK_DRIVER_BIND_ADDRESS" --deploy-mode client
"$@")

+ exec /usr/bin/tini -s -- /opt/spark/bin/spark-submit --conf
spark.driver.bindAddress=10.64.0.88 *--deploy-mode client*
--properties-file /opt/spark/conf/spark.properties --class
org.apache.spark.deploy.PythonRunner
gs://axial-glow-224522-spark-on-k8s/codes/RandomDataBigQuery.py


So regardless it is run in the client mode. You can see this behaviour with
switch


 spark-submit --verbose


HTH


   view my Linkedin profile




*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Thu, 12 Aug 2021 at 17:29, Bode, Meikel, NMA-CFD <
meikel.b...@bertelsmann.de> wrote:

> On EKS…
>
>
>
> *From:* Mich Talebzadeh 
> *Sent:* Donnerstag, 12. August 2021 15:47
> *To:* Bode, Meikel, NMA-CFD 
> *Cc:* user@spark.apache.org
> *Subject:* Re: K8S submit client vs. cluster
>
>
>
> Ok
>
>
>
> As I see it with PySpark even if it is submitted as cluster, it will be
> converted to client mode anyway
>
>
> Are you running this on AWS or GCP?
>
>
>
>view my Linkedin profile
> 
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
>
>
>
> On Thu, 12 Aug 2021 at 12:42, Bode, Meikel, NMA-CFD <
> meikel.b...@bertelsmann.de> wrote:
>
> Hi Mich,
>
>
>
> All PySpark.
>
>
>
> Best,
>
> Meikel
>
>
>
> *From:* Mich Talebzadeh 
> *Sent:* Donnerstag, 12. August 2021 13:41
> *To:* Bode, Meikel, NMA-CFD 
> *Cc:* user@spark.apache.org
> *Subject:* Re: K8S submit client vs. cluster
>
>
>
> Is this Spark or PySpark?
>
>
>
>
>
>
>view my Linkedin profile
> 
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any o

RE: K8S submit client vs. cluster

2021-08-12 Thread Bode, Meikel, NMA-CFD
On EKS...

From: Mich Talebzadeh 
Sent: Donnerstag, 12. August 2021 15:47
To: Bode, Meikel, NMA-CFD 
Cc: user@spark.apache.org
Subject: Re: K8S submit client vs. cluster

Ok

As I see it with PySpark even if it is submitted as cluster, it will be 
converted to client mode anyway


Are you running this on AWS or GCP?


 
[https://docs.google.com/uc?export=download&id=1-q7RFGRfLMObPuQPWSd9sl_H1UPNFaIZ&revid=0B1BiUVX33unjMWtVUWpINWFCd0ZQTlhTRHpGckh4Wlg4RG80PQ]
   view my Linkedin 
profile



Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
damage or destruction of data or any other property which may arise from 
relying on this email's technical content is explicitly disclaimed. The author 
will in no case be liable for any monetary damages arising from such loss, 
damage or destruction.




On Thu, 12 Aug 2021 at 12:42, Bode, Meikel, NMA-CFD 
mailto:meikel.b...@bertelsmann.de>> wrote:
Hi Mich,

All PySpark.

Best,
Meikel

From: Mich Talebzadeh 
mailto:mich.talebza...@gmail.com>>
Sent: Donnerstag, 12. August 2021 13:41
To: Bode, Meikel, NMA-CFD 
mailto:meikel.b...@bertelsmann.de>>
Cc: user@spark.apache.org
Subject: Re: K8S submit client vs. cluster

Is this Spark or PySpark?





 
[https://docs.google.com/uc?export=download&id=1-q7RFGRfLMObPuQPWSd9sl_H1UPNFaIZ&revid=0B1BiUVX33unjMWtVUWpINWFCd0ZQTlhTRHpGckh4Wlg4RG80PQ]
   view my Linkedin 
profile



Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
damage or destruction of data or any other property which may arise from 
relying on this email's technical content is explicitly disclaimed. The author 
will in no case be liable for any monetary damages arising from such loss, 
damage or destruction.




On Thu, 12 Aug 2021 at 12:35, Bode, Meikel, NMA-CFD 
mailto:meikel.b...@bertelsmann.de>> wrote:
Hi all,

If we schedule a spark job on k8s, how are volume mappings handled?

In client mode I would expect that drivers volumes have to mapped manually in 
the pod template. Executor volumes are attached dynamically based on submit 
parameters. Right...?

I cluster mode I would expect that volumes for drivers/executors are taken from 
submit command and attached to the pods accordingly. Right...?

Any hints appreciated,

Best,
Meikel


Re: [EXTERNAL] [Marketing Mail] Reading SPARK 3.1.x generated parquet in SPARK 2.4.x

2021-08-12 Thread Gourav Sengupta
Hi Saurabh,

a very big note of thanks from Gourav :)

Regards,
Gourav Sengupta

On Thu, Aug 12, 2021 at 4:16 PM Saurabh Gulati
 wrote:

> We had issues with this migration mainly because of changes in spark date
> calendars. See
> 
> We got this working by setting the below params:
>
> ("spark.sql.legacy.parquet.datetimeRebaseModeInRead", "LEGACY"),
> ("spark.sql.legacy.parquet.datetimeRebaseModeInWrite", "CORRECTED"),
> ("spark.sql.legacy.parquet.int96RebaseModeInRead", "LEGACY"),
> ("spark.sql.legacy.parquet.int96RebaseModeInWrite", "CORRECTED")
>
>
>
> But otherwise, it's a change for good. Performance seems better.
> Also, there were bugs in 3.0.1 which have been addressed in 3.1.1.
> --
> *From:* Gourav Sengupta 
> *Sent:* 05 August 2021 10:17
> *To:* user @spark 
> *Subject:* [EXTERNAL] [Marketing Mail] Reading SPARK 3.1.x generated
> parquet in SPARK 2.4.x
>
> *Caution! This email originated outside of FedEx. Please do not open
> attachments or click links from an unknown or suspicious origin*.
> Hi,
>
> we are trying to migrate some of the data lake pipelines to run in SPARK
> 3.x, where as the dependent pipelines using those tables will be still
> running in SPARK 2.4.x for sometime to come.
>
> Does anyone know of any issues that can happen:
> 1. when reading Parquet files written in 3.1.x in SPARK 2.4
> 2. when in the data lake some partitions have parquet files written in
> SPARK 2.4.x and some are in SPARK 3.1.x.
>
> Please note that there are no changes in schema, but later on we might end
> up adding or removing some columns.
>
> I will be really grateful for your kind help on this.
>
> Regards,
> Gourav Sengupta
>


Re: [EXTERNAL] [Marketing Mail] Reading SPARK 3.1.x generated parquet in SPARK 2.4.x

2021-08-12 Thread Saurabh Gulati
We had issues with this migration mainly because of changes in spark date 
calendars. 
See
We got this working by setting the below params:

("spark.sql.legacy.parquet.datetimeRebaseModeInRead", "LEGACY"),
("spark.sql.legacy.parquet.datetimeRebaseModeInWrite", "CORRECTED"),
("spark.sql.legacy.parquet.int96RebaseModeInRead", "LEGACY"),
("spark.sql.legacy.parquet.int96RebaseModeInWrite", "CORRECTED")


But otherwise, it's a change for good. Performance seems better.
Also, there were bugs in 3.0.1 which have been addressed in 3.1.1.

From: Gourav Sengupta 
Sent: 05 August 2021 10:17
To: user @spark 
Subject: [EXTERNAL] [Marketing Mail] Reading SPARK 3.1.x generated parquet in 
SPARK 2.4.x

Caution! This email originated outside of FedEx. Please do not open attachments 
or click links from an unknown or suspicious origin.

Hi,

we are trying to migrate some of the data lake pipelines to run in SPARK 3.x, 
where as the dependent pipelines using those tables will be still running in 
SPARK 2.4.x for sometime to come.

Does anyone know of any issues that can happen:
1. when reading Parquet files written in 3.1.x in SPARK 2.4
2. when in the data lake some partitions have parquet files written in SPARK 
2.4.x and some are in SPARK 3.1.x.

Please note that there are no changes in schema, but later on we might end up 
adding or removing some columns.

I will be really grateful for your kind help on this.

Regards,
Gourav Sengupta


Re: K8S submit client vs. cluster

2021-08-12 Thread Mich Talebzadeh
Ok

As I see it with PySpark even if it is submitted as cluster, it will be
converted to client mode anyway

Are you running this on AWS or GCP?


   view my Linkedin profile




*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Thu, 12 Aug 2021 at 12:42, Bode, Meikel, NMA-CFD <
meikel.b...@bertelsmann.de> wrote:

> Hi Mich,
>
>
>
> All PySpark.
>
>
>
> Best,
>
> Meikel
>
>
>
> *From:* Mich Talebzadeh 
> *Sent:* Donnerstag, 12. August 2021 13:41
> *To:* Bode, Meikel, NMA-CFD 
> *Cc:* user@spark.apache.org
> *Subject:* Re: K8S submit client vs. cluster
>
>
>
> Is this Spark or PySpark?
>
>
>
>
>
>
>view my Linkedin profile
> 
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
>
>
>
> On Thu, 12 Aug 2021 at 12:35, Bode, Meikel, NMA-CFD <
> meikel.b...@bertelsmann.de> wrote:
>
> Hi all,
>
>
>
> If we schedule a spark job on k8s, how are volume mappings handled?
>
>
>
> In client mode I would expect that drivers volumes have to mapped manually
> in the pod template. Executor volumes are attached dynamically based on
> submit parameters. Right…?
>
>
>
> I cluster mode I would expect that volumes for drivers/executors are taken
> from submit command and attached to the pods accordingly. Right…?
>
>
>
> Any hints appreciated,
>
>
>
> Best,
>
> Meikel
>
>


RE: K8S submit client vs. cluster

2021-08-12 Thread Bode, Meikel, NMA-CFD
Hi Mich,

All PySpark.

Best,
Meikel

From: Mich Talebzadeh 
Sent: Donnerstag, 12. August 2021 13:41
To: Bode, Meikel, NMA-CFD 
Cc: user@spark.apache.org
Subject: Re: K8S submit client vs. cluster

Is this Spark or PySpark?





 
[https://docs.google.com/uc?export=download&id=1-q7RFGRfLMObPuQPWSd9sl_H1UPNFaIZ&revid=0B1BiUVX33unjMWtVUWpINWFCd0ZQTlhTRHpGckh4Wlg4RG80PQ]
   view my Linkedin 
profile



Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
damage or destruction of data or any other property which may arise from 
relying on this email's technical content is explicitly disclaimed. The author 
will in no case be liable for any monetary damages arising from such loss, 
damage or destruction.




On Thu, 12 Aug 2021 at 12:35, Bode, Meikel, NMA-CFD 
mailto:meikel.b...@bertelsmann.de>> wrote:
Hi all,

If we schedule a spark job on k8s, how are volume mappings handled?

In client mode I would expect that drivers volumes have to mapped manually in 
the pod template. Executor volumes are attached dynamically based on submit 
parameters. Right...?

I cluster mode I would expect that volumes for drivers/executors are taken from 
submit command and attached to the pods accordingly. Right...?

Any hints appreciated,

Best,
Meikel


Re: K8S submit client vs. cluster

2021-08-12 Thread Mich Talebzadeh
Is this Spark or PySpark?



   view my Linkedin profile




*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Thu, 12 Aug 2021 at 12:35, Bode, Meikel, NMA-CFD <
meikel.b...@bertelsmann.de> wrote:

> Hi all,
>
>
>
> If we schedule a spark job on k8s, how are volume mappings handled?
>
>
>
> In client mode I would expect that drivers volumes have to mapped manually
> in the pod template. Executor volumes are attached dynamically based on
> submit parameters. Right…?
>
>
>
> I cluster mode I would expect that volumes for drivers/executors are taken
> from submit command and attached to the pods accordingly. Right…?
>
>
>
> Any hints appreciated,
>
>
>
> Best,
>
> Meikel
>


K8S submit client vs. cluster

2021-08-12 Thread Bode, Meikel, NMA-CFD
Hi all,

If we schedule a spark job on k8s, how are volume mappings handled?

In client mode I would expect that drivers volumes have to mapped manually in 
the pod template. Executor volumes are attached dynamically based on submit 
parameters. Right...?

I cluster mode I would expect that volumes for drivers/executors are taken from 
submit command and attached to the pods accordingly. Right...?

Any hints appreciated,

Best,
Meikel