Re: Re: EXT: Dual Write to HDFS and MinIO in faster way

2024-05-30 Thread Subhasis Mukherjee
Regarding making spark writer fast part, If you are (or can be) on Databricks, 
check this out. It is just out of the oven at Databricks.

https://www.databricks.com/blog/announcing-general-availability-liquid-clustering?utm_source=bambu&utm_medium=social&utm_campaign=advocacy&blaid=6087618




From: Gera Shegalov 
Sent: Wednesday, May 29, 2024 7:57:56 am
To: Prem Sahoo 
Cc: eab...@163.com ; Vibhor Gupta ; 
user @spark 
Subject: Re: Re: EXT: Dual Write to HDFS and MinIO in faster way

I agree with the previous answers that (if requirements allow it) it is much 
easier to just orchestrate a copy either in the same app or sync externally.

A long time ago and not for a Spark app we were solving a similar usecase via 
https://hadoop.apache.org/docs/r3.2.3/hadoop-project-dist/hadoop-hdfs/ViewFs.html#Multi-Filesystem_I.2F0_with_Nfly_Mount_Points
 . It may work with Spark because it is underneath the FileSystem API ...



On Tue, May 21, 2024 at 10:03 PM Prem Sahoo 
mailto:prem.re...@gmail.com>> wrote:
I am looking for writer/comitter optimization which can make the spark write 
faster.

On Tue, May 21, 2024 at 9:15 PM eab...@163.com 
mailto:eab...@163.com>> wrote:
Hi,
I think you should write to HDFS then copy file (parquet or orc) from HDFS 
to MinIO.


eabour

From: Prem Sahoo
Date: 2024-05-22 00:38
To: Vibhor Gupta; 
user
Subject: Re: EXT: Dual Write to HDFS and MinIO in faster way


On Tue, May 21, 2024 at 6:58 AM Prem Sahoo 
mailto:prem.re...@gmail.com>> wrote:
Hello Vibhor,
Thanks for the suggestion .
I am looking for some other alternatives where I can use the same dataframe can 
be written to two destinations without re execution and cache or persist .

Can some one help me in scenario 2 ?
How to make spark write to MinIO faster ?
Sent from my iPhone

On May 21, 2024, at 1:18 AM, Vibhor Gupta 
mailto:vibhor.gu...@walmart.com>> wrote:


Hi Prem,

You can try to write to HDFS then read from HDFS and write to MinIO.

This will prevent duplicate transformation.

You can also try persisting the dataframe using the DISK_ONLY level.

Regards,
Vibhor
From: Prem Sahoo mailto:prem.re...@gmail.com>>
Date: Tuesday, 21 May 2024 at 8:16 AM
To: Spark dev list mailto:d...@spark.apache.org>>
Subject: EXT: Dual Write to HDFS and MinIO in faster way
EXTERNAL: Report suspicious emails to Email Abuse.
Hello Team,
I am planning to write to two datasource at the same time .

Scenario:-

Writing the same dataframe to HDFS and MinIO without re-executing the 
transformations and no cache(). Then how can we make it faster ?

Read the parquet file and do a few transformations and write to HDFS and MinIO.

here in both write spark needs execute the transformation again. Do we know how 
we can avoid re-execution of transformation  without cache()/persist ?

Scenario2 :-
I am writing 3.2G data to HDFS and MinIO which takes ~6mins.
Do we have any way to make writing this faster ?

I don't want to do repartition and write as repartition will have overhead of 
shuffling .

Please provide some inputs.





Re: [s3a] Spark is not reading s3 object content

2024-05-30 Thread Amin Mosayyebzadeh
I will work on the first two possible causes.
For the third one, which I guess is the real problem, Spark treats the
testfile.csv object with the url s3a://test-bucket/testfile.csv as a bucket
to access _spark_metadata with url
s3a://test-bucket/testfile.csv/_spark_metadata
testfile.csv is an object and should not be treated as a bucket. But I am
not sure how to prevent Spark from doing that.


Re: [s3a] Spark is not reading s3 object content

2024-05-30 Thread Mich Talebzadeh
ok

some observations


   - Spark job successfully lists the S3 bucket containing testfile.csv.
   - Spark job can retrieve the file size (33 Bytes) for testfile.csv.
   - Spark job fails to read the actual data from testfile.csv.
   - The printed content from testfile.csv is an empty list.
   - Spark logs show a debug message with an exception related to
   UserGroupInformation while trying to access the _spark_metadata file
   associated with testfile.csv.

possible causes


   - Permission Issues: Spark user (likely ubuntu based on logs) might lack
   the necessary permissions to access the testfile.csv file or the
   _spark_metadata file on S3 storage.
   - Spark Configuration: Issues with Spark's configuration for S3 access,
   such as missing credentials or incorrect security settings.
   - Spark attempting to read unnecessary files: The _spark_metadata file
   might not be essential for your current operation, and Spark's attempt to
   read it could be causing the issue.


HTH

Mich Talebzadeh,
Technologist | Architect | Data Engineer  | Generative AI | FinCrime
PhD  Imperial College
London 
London, United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* The information provided is correct to the best of my
knowledge but of course cannot be guaranteed . It is essential to note
that, as with any advice, quote "one test result is worth one-thousand
expert opinions (Werner  Von
Braun )".


On Thu, 30 May 2024 at 22:29, Amin Mosayyebzadeh 
wrote:

> The code should read testfile.csv file from s3. and print the content. It
> only prints a empty list although the file has content.
> I have also checked our custom s3 storage (Ceph based) logs and I see only
> LIST operations coming from Spark, there is no GET object operation for
> testfile.csv
>
> The only error I see in DEBUG output is these lines:
>
> =
> 24/05/30 15:39:21 INFO MetadataLogFileIndex: Reading streaming file log
> from s3a://test-bucket/testfile.csv/_spark_metadata
> 24/05/30 15:39:21 DEBUG UserGroupInformation: PrivilegedAction [as: ubuntu
> (auth:SIMPLE)][action: org.apache.hadoop.fs.FileContext$2@7af85238]
> java.lang.Exception
> at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1875)
> at
> org.apache.hadoop.fs.FileContext.getAbstractFileSystem(FileContext.java:339)
> at
> org.apache.hadoop.fs.FileContext.getFileContext(FileContext.java:465)
> at
> org.apache.spark.sql.execution.streaming.AbstractFileContextBasedCheckpointFileManager.(CheckpointFileManager.scala:311)
> at
> org.apache.spark.sql.execution.streaming.FileContextBasedCheckpointFileManager.(CheckpointFileManager.scala:352)
> at
> org.apache.spark.sql.execution.streaming.CheckpointFileManager$.create(CheckpointFileManager.scala:209)
> at
> org.apache.spark.sql.execution.streaming.HDFSMetadataLog.(HDFSMetadataLog.scala:64)
> at
> org.apache.spark.sql.execution.streaming.CompactibleFileStreamLog.(CompactibleFileStreamLog.scala:48)
> at
> org.apache.spark.sql.execution.streaming.FileStreamSinkLog.(FileStreamSinkLog.scala:91)
> at
> org.apache.spark.sql.execution.streaming.MetadataLogFileIndex.(MetadataLogFileIndex.scala:52)
> at
> org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:369)
> at
> org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:229)
> at
> org.apache.spark.sql.DataFrameReader.$anonfun$load$2(DataFrameReader.scala:211)
> at scala.Option.getOrElse(Option.scala:201)
> at
> org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211)
> at
> org.apache.spark.sql.DataFrameReader.text(DataFrameReader.scala:646)
> at
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
> Method)
> at
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.base/java.lang.reflect.Method.invoke(Method.java:566)
> at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
> at
> py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
> at py4j.Gateway.invoke(Gateway.java:282)
> at
> py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
> at py4j.commands.CallCommand.execute(CallCommand.java:79)
> at
> py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
> at py4j.ClientServerConnection.run(ClientSer

Re: [s3a] Spark is not reading s3 object content

2024-05-30 Thread Amin Mosayyebzadeh
The code should read testfile.csv file from s3. and print the content. It
only prints a empty list although the file has content.
I have also checked our custom s3 storage (Ceph based) logs and I see only
LIST operations coming from Spark, there is no GET object operation for
testfile.csv

The only error I see in DEBUG output is these lines:

=
24/05/30 15:39:21 INFO MetadataLogFileIndex: Reading streaming file log
from s3a://test-bucket/testfile.csv/_spark_metadata
24/05/30 15:39:21 DEBUG UserGroupInformation: PrivilegedAction [as: ubuntu
(auth:SIMPLE)][action: org.apache.hadoop.fs.FileContext$2@7af85238]
java.lang.Exception
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1875)
at
org.apache.hadoop.fs.FileContext.getAbstractFileSystem(FileContext.java:339)
at
org.apache.hadoop.fs.FileContext.getFileContext(FileContext.java:465)
at
org.apache.spark.sql.execution.streaming.AbstractFileContextBasedCheckpointFileManager.(CheckpointFileManager.scala:311)
at
org.apache.spark.sql.execution.streaming.FileContextBasedCheckpointFileManager.(CheckpointFileManager.scala:352)
at
org.apache.spark.sql.execution.streaming.CheckpointFileManager$.create(CheckpointFileManager.scala:209)
at
org.apache.spark.sql.execution.streaming.HDFSMetadataLog.(HDFSMetadataLog.scala:64)
at
org.apache.spark.sql.execution.streaming.CompactibleFileStreamLog.(CompactibleFileStreamLog.scala:48)
at
org.apache.spark.sql.execution.streaming.FileStreamSinkLog.(FileStreamSinkLog.scala:91)
at
org.apache.spark.sql.execution.streaming.MetadataLogFileIndex.(MetadataLogFileIndex.scala:52)
at
org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:369)
at
org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:229)
at
org.apache.spark.sql.DataFrameReader.$anonfun$load$2(DataFrameReader.scala:211)
at scala.Option.getOrElse(Option.scala:201)
at
org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211)
at
org.apache.spark.sql.DataFrameReader.text(DataFrameReader.scala:646)
at
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
Method)
at
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at
py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
at py4j.Gateway.invoke(Gateway.java:282)
at
py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at
py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
at java.base/java.lang.Thread.run(Thread.java:829)

===
Which I am not sure if it is related since Spark can see and list the
bucket (it also returns the correct object size which is 33 Bytes.).

Best,
Amin


On Thu, May 30, 2024 at 4:05 PM Mich Talebzadeh 
wrote:

> Hello,
>
> Overall, the exit code of 0 suggests a successful run of your Spark job.
> Analyze the intended purpose of your code and verify the output or Spark UI
> for further confirmation.
>
> 24/05/30 01:23:43 INFO SparkContext: SparkContext is stopping with
> exitCode 0.
>
> what to check
>
>
>1. Verify Output: If your Spark job was intended to read data from S3
>and process it, you will need to verify the output to ensure the data was
>handled correctly. This might involve checking if any results were written
>to a designated location or if any transformations were applied
>successfully.
>2. Review Code:
>3. Check Spark UI:
>
>
> HTH
>
> Mich Talebzadeh,
> Technologist | Architect | Data Engineer  | Generative AI | FinCrime
> PhD  Imperial College
> London 
> London, United Kingdom
>
>
>view my Linkedin profile
> 
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* The information provided is correct to the best of my
> knowledge but of course cannot be guaranteed . It is essential to note
> that, as with any advice, quote "one test result is worth one-thousand
> expert opinions (Werner  Von
> Braun )".
>
>
> On Thu, 30 May 2024 at 11:56, Amin Mosayyebzadeh 
> wrote:
>
>> Hi Mich,
>>
>> Thank you for the help and sorry about the late reply.
>> I ran

Re: [s3a] Spark is not reading s3 object content

2024-05-30 Thread Mich Talebzadeh
Hello,

Overall, the exit code of 0 suggests a successful run of your Spark job.
Analyze the intended purpose of your code and verify the output or Spark UI
for further confirmation.

24/05/30 01:23:43 INFO SparkContext: SparkContext is stopping with exitCode
0.

what to check


   1. Verify Output: If your Spark job was intended to read data from S3
   and process it, you will need to verify the output to ensure the data was
   handled correctly. This might involve checking if any results were written
   to a designated location or if any transformations were applied
   successfully.
   2. Review Code:
   3. Check Spark UI:


HTH

Mich Talebzadeh,
Technologist | Architect | Data Engineer  | Generative AI | FinCrime
PhD  Imperial College
London 
London, United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* The information provided is correct to the best of my
knowledge but of course cannot be guaranteed . It is essential to note
that, as with any advice, quote "one test result is worth one-thousand
expert opinions (Werner  Von
Braun )".


On Thu, 30 May 2024 at 11:56, Amin Mosayyebzadeh 
wrote:

> Hi Mich,
>
> Thank you for the help and sorry about the late reply.
> I ran your provided but I got "exitCode 0". Here is the complete output:
>
> ===
>
>
> 24/05/30 01:23:38 INFO SparkContext: Running Spark version 3.5.0
> 24/05/30 01:23:38 INFO SparkContext: OS info Linux, 5.4.0-182-generic,
> amd64
> 24/05/30 01:23:38 INFO SparkContext: Java version 11.0.22
> 24/05/30 01:23:38 WARN NativeCodeLoader: Unable to load native-hadoop
> library for your platform... using builtin-java classes where applicable
> 24/05/30 01:23:38 INFO ResourceUtils:
> ==
> 24/05/30 01:23:38 INFO ResourceUtils: No custom resources configured for
> spark.driver.
> 24/05/30 01:23:38 INFO ResourceUtils:
> ==
> 24/05/30 01:23:38 INFO SparkContext: Submitted application: S3ReadTest
> 24/05/30 01:23:38 INFO ResourceProfile: Default ResourceProfile created,
> executor resources: Map(cores -> name: cores, amount: 1, script: , vendor:
> , memory -> name: memory, amount: 1024, script: , vendor: , offHeap ->
> name: offHeap, amount: 0, script: , vendor: ), task resources: Map(cpus ->
> name: cpus, amount: 1.0)
> 24/05/30 01:23:38 INFO ResourceProfile: Limiting resource is cpu
> 24/05/30 01:23:38 INFO ResourceProfileManager: Added ResourceProfile id: 0
> 24/05/30 01:23:38 INFO SecurityManager: Changing view acls to: ubuntu
> 24/05/30 01:23:38 INFO SecurityManager: Changing modify acls to: ubuntu
> 24/05/30 01:23:38 INFO SecurityManager: Changing view acls groups to:
> 24/05/30 01:23:38 INFO SecurityManager: Changing modify acls groups to:
> 24/05/30 01:23:38 INFO SecurityManager: SecurityManager: authentication
> disabled; ui acls disabled; users with view permissions: ubuntu; groups
> with view permissions: EMPTY; users with modify permissions: ubuntu; groups
> with modify permissions: EMPTY
> 24/05/30 01:23:38 INFO Utils: Successfully started service 'sparkDriver'
> on port 46321.
> 24/05/30 01:23:38 INFO SparkEnv: Registering MapOutputTracker
> 24/05/30 01:23:38 INFO SparkEnv: Registering BlockManagerMaster
> 24/05/30 01:23:38 INFO BlockManagerMasterEndpoint: Using
> org.apache.spark.storage.DefaultTopologyMapper for getting topology
> information
> 24/05/30 01:23:38 INFO BlockManagerMasterEndpoint:
> BlockManagerMasterEndpoint up
> 24/05/30 01:23:38 INFO SparkEnv: Registering BlockManagerMasterHeartbeat
> 24/05/30 01:23:38 INFO DiskBlockManager: Created local directory at
> /tmp/blockmgr-a1fc37d5-885a-4ed0-b8f2-4eeb930c69ee
> 24/05/30 01:23:38 INFO MemoryStore: MemoryStore started with capacity 2.8
> GiB
> 24/05/30 01:23:38 INFO SparkEnv: Registering OutputCommitCoordinator
> 24/05/30 01:23:39 INFO JettyUtils: Start Jetty 0.0.0.0:4040 for SparkUI
> 24/05/30 01:23:39 INFO Utils: Successfully started service 'SparkUI' on
> port 4040.
> 24/05/30 01:23:39 INFO Executor: Starting executor ID driver on host
> MOC-R4PAC08U33-S1C
> 24/05/30 01:23:39 INFO Executor: OS info Linux, 5.4.0-182-generic, amd64
> 24/05/30 01:23:39 INFO Executor: Java version 11.0.22
> 24/05/30 01:23:39 INFO Executor: Starting executor with user classpath
> (userClassPathFirst = false): ''
> 24/05/30 01:23:39 INFO Executor: Created or updated repl class loader
> org.apache.spark.util.MutableURLClassLoader@a45f4d6 for default.
> 24/05/30 01:23:39 INFO Utils: Successfully started service
> 'org.apache.spark.network.netty.NettyBlockTransferService' on port 39343.
> 24/05/30 01:23:39 INFO NettyBlockTransferService: Server