Spark column headings, camelCase or snake case?

2024-04-11 Thread Mich Talebzadeh
I know this is a bit of a silly question. But what is the norm for
 Sparkcolumn headings? Is it camelCase or snakec_ase. For example here "
someone suggested and I quote
SumTotalInMillionGBP" accurately conveys the meaning but is a bit long and
uses camelCase, which is not the standard convention for Spark DataFrames
(usually snake_case). Use snake_case for better readability like:
"total_price_in_millions_gbp"

So this is the gist

+--+-+---+
|district  |NumberOfOffshoreOwned|total_price_in_millions_gbp|
+--+-+---+
|CITY OF WESTMINSTER   |4452 |21472.5|
|KENSINGTON AND CHELSEA|2403 |6544.8 |
|CAMDEN|1023 |4275.9 |
|SOUTHWARK |1080 |3938.0 |
|ISLINGTON |627  |3062.0 |
|TOWER HAMLETS |1715 |3008.0 |
|HAMMERSMITH AND FULHAM|765  |2137.2 |

Now I recently saw a note (if i recall correctly) that Spark should be
using camelCase in new spark related documents. What are the accepted views
or does it matter?

Thanks
Mich Talebzadeh,

Technologist | Solutions Architect | Data Engineer  | Generative AI

London
United Kingdom


   view my Linkedin profile


 https://en.everybodywiki.com/Mich_Talebzadeh



Disclaimer: The information provided is correct to the best of my knowledge
but of course cannot be guaranteed . It is essential to note that, as with
any advice, quote "one test result is worth one-thousand expert opinions
(Werner Von Braun)".


Re: External Spark shuffle service for k8s

2024-04-11 Thread Bjørn Jørgensen
I think this answers your question about what to do if you need more space
on nodes.

https://spark.apache.org/docs/latest/running-on-kubernetes.html#local-storage

Local Storage


Spark supports using volumes to spill data during shuffles and other
operations. To use a volume as local storage, the volume’s name should
starts with spark-local-dir-, for example:

--conf 
spark.kubernetes.driver.volumes.[VolumeType].spark-local-dir-[VolumeName].mount.path=
--conf 
spark.kubernetes.driver.volumes.[VolumeType].spark-local-dir-[VolumeName].mount.readOnly=false

Specifically, you can use persistent volume claims if the jobs require
large shuffle and sorting operations in executors.

spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-1.options.claimName=OnDemand
spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-1.options.storageClass=gp
spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-1.options.sizeLimit=500Gi
spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-1.mount.path=/data
spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-1.mount.readOnly=false

To enable shuffle data recovery feature via the built-in
KubernetesLocalDiskShuffleDataIO plugin, we need to have the followings.
You may want to enable
spark.kubernetes.driver.waitToReusePersistentVolumeClaim additionally.

spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-1.mount.path=/data/spark-x/executor-x
spark.shuffle.sort.io.plugin.class=org.apache.spark.shuffle.KubernetesLocalDiskShuffleDataIO

If no volume is set as local storage, Spark uses temporary scratch space to
spill data to disk during shuffles and other operations. When using
Kubernetes as the resource manager the pods will be created with an emptyDir
 volume
mounted for each directory listed in spark.local.dir or the environment
variable SPARK_LOCAL_DIRS . If no directories are explicitly specified then
a default directory is created and configured appropriately.

emptyDir volumes use the ephemeral storage feature of Kubernetes and do not
persist beyond the life of the pod.

tor. 11. apr. 2024 kl. 10:29 skrev Bjørn Jørgensen :

> " In the end for my usecase I started using pvcs and pvc aware scheduling
> along with decommissioning. So far performance is good with this choice."
> How did you do this?
>
>
> tor. 11. apr. 2024 kl. 04:13 skrev Arun Ravi :
>
>> Hi Everyone,
>>
>> I had to explored IBM's and AWS's S3 shuffle plugins (some time back), I
>> had also explored AWS FSX lustre in few of my production jobs which has
>> ~20TB of shuffle operations with 200-300 executors. What I have observed is
>> S3 and fax behaviour was fine during the write phase, however I faced iops
>> throttling during the read phase(read taking forever to complete). I think
>> this might be contributed by the heavy use of shuffle index file (I didn't
>> perform any extensive research on this), so I believe the shuffle manager
>> logic have to be intelligent enough to reduce the fetching of files from
>> object store. In the end for my usecase I started using pvcs and pvc aware
>> scheduling along with decommissioning. So far performance is good with this
>> choice.
>>
>> Thank you
>>
>> On Tue, 9 Apr 2024, 15:17 Mich Talebzadeh, 
>> wrote:
>>
>>> Hi,
>>>
>>> First thanks everyone for their contributions
>>>
>>> I was going to reply to @Enrico Minack   but
>>> noticed additional info. As I understand for example,  Apache Uniffle is an
>>> incubating project aimed at providing a pluggable shuffle service for
>>> Spark. So basically, all these "external shuffle services" have in common
>>> is to offload shuffle data management to external services, thus reducing
>>> the memory and CPU overhead on Spark executors. That is great.  While
>>> Uniffle and others enhance shuffle performance and scalability, it would be
>>> great to integrate them with Spark UI. This may require additional
>>> development efforts. I suppose  the interest would be to have these
>>> external matrices incorporated into Spark with one look and feel. This may
>>> require customizing the UI to fetch and display metrics or statistics from
>>> the external shuffle services. Has any project done this?
>>>
>>> Thanks
>>>
>>> Mich Talebzadeh,
>>> Technologist | Solutions Architect | Data Engineer  | Generative AI
>>> London
>>> United Kingdom
>>>
>>>
>>>view my Linkedin profile
>>> 
>>>
>>>
>>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>>
>>>
>>>
>>> *Disclaimer:* The information provided is correct to the best of my
>>> knowledge but of course cannot be guaranteed . It is essential to note
>>> that, as with any advice, quote "one test result is worth one-thousand
>>> expert opinions (Werner
>>> 

Re: [Spark SQL]: Source code for PartitionedFile

2024-04-11 Thread Ashley McManamon
Hi Mich,

Thanks for the reply.

I did come across that file but it didn't align with the appearance of
`PartitionedFile`:
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/PartitionedFileUtil.scala

In fact, the code snippet you shared also references the type
`PartitionedFile`.

There's actually this javadoc.io page for a `PartitionedFile`
at org.apache.spark.sql.execution.datasources for spark-sql_2.12:3.0.2:
https://javadoc.io/doc/org.apache.spark/spark-sql_2.12/3.0.2/org/apache/spark/sql/execution/datasources/PartitionedFile.html.
I double checked the source code for version 3.0.2 and doesn't seem to
exist there either:
https://github.com/apache/spark/tree/v3.0.2/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources

Ashley


On Mon, 8 Apr 2024 at 22:41, Mich Talebzadeh 
wrote:

> Hi,
>
> I believe this is the package
>
>
> https://raw.githubusercontent.com/apache/spark/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FilePartition.scala
>
> And the code
>
> case class FilePartition(index: Int, files: Array[PartitionedFile])
>   extends Partition with InputPartition {
>   override def preferredLocations(): Array[String] = {
> // Computes total number of bytes that can be retrieved from each host.
> val hostToNumBytes = mutable.HashMap.empty[String, Long]
> files.foreach { file =>
>   file.locations.filter(_ != "localhost").foreach { host =>
> hostToNumBytes(host) = hostToNumBytes.getOrElse(host, 0L) +
> file.length
>   }
> }
>
> // Selects the first 3 hosts with the most data to be retrieved.
> hostToNumBytes.toSeq.sortBy {
>   case (host, numBytes) => numBytes
> }.reverse.take(3).map {
>   case (host, numBytes) => host
> }.toArray
>   }
> }
>
> HTH
>
> Mich Talebzadeh,
> Technologist | Solutions Architect | Data Engineer  | Generative AI
> London
> United Kingdom
>
>
>view my Linkedin profile
> 
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* The information provided is correct to the best of my
> knowledge but of course cannot be guaranteed . It is essential to note
> that, as with any advice, quote "one test result is worth one-thousand
> expert opinions (Werner  Von
> Braun )".
>
>
> On Mon, 8 Apr 2024 at 20:31, Ashley McManamon <
> ashley.mcmana...@quantcast.com> wrote:
>
>> Hi All,
>>
>> I've been diving into the source code to get a better understanding of
>> how file splitting works from a user perspective. I've hit a deadend at
>> `PartitionedFile`, for which I cannot seem to find a definition? It appears
>> though it should be found at
>> org.apache.spark.sql.execution.datasources but I find no definition in
>> the entire source code. Am I missing something?
>>
>> I appreciate there may be an obvious answer here, apologies if I'm being
>> naive.
>>
>> Thanks,
>> Ashley McManamon
>>
>>


Re: External Spark shuffle service for k8s

2024-04-11 Thread Bjørn Jørgensen
" In the end for my usecase I started using pvcs and pvc aware scheduling
along with decommissioning. So far performance is good with this choice."
How did you do this?


tor. 11. apr. 2024 kl. 04:13 skrev Arun Ravi :

> Hi Everyone,
>
> I had to explored IBM's and AWS's S3 shuffle plugins (some time back), I
> had also explored AWS FSX lustre in few of my production jobs which has
> ~20TB of shuffle operations with 200-300 executors. What I have observed is
> S3 and fax behaviour was fine during the write phase, however I faced iops
> throttling during the read phase(read taking forever to complete). I think
> this might be contributed by the heavy use of shuffle index file (I didn't
> perform any extensive research on this), so I believe the shuffle manager
> logic have to be intelligent enough to reduce the fetching of files from
> object store. In the end for my usecase I started using pvcs and pvc aware
> scheduling along with decommissioning. So far performance is good with this
> choice.
>
> Thank you
>
> On Tue, 9 Apr 2024, 15:17 Mich Talebzadeh, 
> wrote:
>
>> Hi,
>>
>> First thanks everyone for their contributions
>>
>> I was going to reply to @Enrico Minack   but
>> noticed additional info. As I understand for example,  Apache Uniffle is an
>> incubating project aimed at providing a pluggable shuffle service for
>> Spark. So basically, all these "external shuffle services" have in common
>> is to offload shuffle data management to external services, thus reducing
>> the memory and CPU overhead on Spark executors. That is great.  While
>> Uniffle and others enhance shuffle performance and scalability, it would be
>> great to integrate them with Spark UI. This may require additional
>> development efforts. I suppose  the interest would be to have these
>> external matrices incorporated into Spark with one look and feel. This may
>> require customizing the UI to fetch and display metrics or statistics from
>> the external shuffle services. Has any project done this?
>>
>> Thanks
>>
>> Mich Talebzadeh,
>> Technologist | Solutions Architect | Data Engineer  | Generative AI
>> London
>> United Kingdom
>>
>>
>>view my Linkedin profile
>> 
>>
>>
>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>
>>
>>
>> *Disclaimer:* The information provided is correct to the best of my
>> knowledge but of course cannot be guaranteed . It is essential to note
>> that, as with any advice, quote "one test result is worth one-thousand
>> expert opinions (Werner
>> Von Braun
>> )".
>>
>>
>> On Mon, 8 Apr 2024 at 14:19, Vakaris Baškirov <
>> vakaris.bashki...@gmail.com> wrote:
>>
>>> I see that both Uniffle and Celebron support S3/HDFS backends which is
>>> great.
>>> In the case someone is using S3/HDFS, I wonder what would be the
>>> advantages of using Celebron or Uniffle vs IBM shuffle service plugin
>>>  or Cloud Shuffle Storage
>>> Plugin from AWS
>>> 
>>> ?
>>>
>>> These plugins do not require deploying a separate service. Are there any
>>> advantages to using Uniffle/Celebron in the case of using S3 backend, which
>>> would require deploying a separate service?
>>>
>>> Thanks
>>> Vakaris
>>>
>>> On Mon, Apr 8, 2024 at 10:03 AM roryqi  wrote:
>>>
 Apache Uniffle (incubating) may be another solution.
 You can see
 https://github.com/apache/incubator-uniffle

 https://uniffle.apache.org/blog/2023/07/21/Uniffle%20-%20New%20chapter%20for%20the%20shuffle%20in%20the%20cloud%20native%20era

 Mich Talebzadeh  于2024年4月8日周一 07:15写道:

> Splendid
>
> The configurations below can be used with k8s deployments of Spark.
> Spark applications running on k8s can utilize these configurations to
> seamlessly access data stored in Google Cloud Storage (GCS) and Amazon S3.
>
> For Google GCS we may have
>
> spark_config_gcs = {
> "spark.kubernetes.authenticate.driver.serviceAccountName":
> "service_account_name",
> "spark.hadoop.fs.gs.impl":
> "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem",
> "spark.hadoop.google.cloud.auth.service.account.enable": "true",
> "spark.hadoop.google.cloud.auth.service.account.json.keyfile":
> "/path/to/keyfile.json",
> }
>
> For Amazon S3 similar
>
> spark_config_s3 = {
> "spark.kubernetes.authenticate.driver.serviceAccountName":
> "service_account_name",
> "spark.hadoop.fs.s3a.impl":
> "org.apache.hadoop.fs.s3a.S3AFileSystem",
> "spark.hadoop.fs.s3a.access.key": "s3_access_key",
> "spark.hadoop.fs.s3a.secret.key": "secret_key",
> }
>
>
> To implement these configurations and enable Spark applications to
> interact with GCS and S3, I