ok some observations
- Spark job successfully lists the S3 bucket containing testfile.csv. - Spark job can retrieve the file size (33 Bytes) for testfile.csv. - Spark job fails to read the actual data from testfile.csv. - The printed content from testfile.csv is an empty list. - Spark logs show a debug message with an exception related to UserGroupInformation while trying to access the _spark_metadata file associated with testfile.csv. possible causes - Permission Issues: Spark user (likely ubuntu based on logs) might lack the necessary permissions to access the testfile.csv file or the _spark_metadata file on S3 storage. - Spark Configuration: Issues with Spark's configuration for S3 access, such as missing credentials or incorrect security settings. - Spark attempting to read unnecessary files: The _spark_metadata file might not be essential for your current operation, and Spark's attempt to read it could be causing the issue. HTH Mich Talebzadeh, Technologist | Architect | Data Engineer | Generative AI | FinCrime PhD <https://en.wikipedia.org/wiki/Doctor_of_Philosophy> Imperial College London <https://en.wikipedia.org/wiki/Imperial_College_London> London, United Kingdom view my Linkedin profile <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> https://en.everybodywiki.com/Mich_Talebzadeh *Disclaimer:* The information provided is correct to the best of my knowledge but of course cannot be guaranteed . It is essential to note that, as with any advice, quote "one test result is worth one-thousand expert opinions (Werner <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von Braun <https://en.wikipedia.org/wiki/Wernher_von_Braun>)". On Thu, 30 May 2024 at 22:29, Amin Mosayyebzadeh <mosayyebza...@gmail.com> wrote: > The code should read testfile.csv file from s3. and print the content. It > only prints a empty list although the file has content. > I have also checked our custom s3 storage (Ceph based) logs and I see only > LIST operations coming from Spark, there is no GET object operation for > testfile.csv > > The only error I see in DEBUG output is these lines: > > ===================================== > 24/05/30 15:39:21 INFO MetadataLogFileIndex: Reading streaming file log > from s3a://test-bucket/testfile.csv/_spark_metadata > 24/05/30 15:39:21 DEBUG UserGroupInformation: PrivilegedAction [as: ubuntu > (auth:SIMPLE)][action: org.apache.hadoop.fs.FileContext$2@7af85238] > java.lang.Exception > at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1875) > at > org.apache.hadoop.fs.FileContext.getAbstractFileSystem(FileContext.java:339) > at > org.apache.hadoop.fs.FileContext.getFileContext(FileContext.java:465) > at > org.apache.spark.sql.execution.streaming.AbstractFileContextBasedCheckpointFileManager.<init>(CheckpointFileManager.scala:311) > at > org.apache.spark.sql.execution.streaming.FileContextBasedCheckpointFileManager.<init>(CheckpointFileManager.scala:352) > at > org.apache.spark.sql.execution.streaming.CheckpointFileManager$.create(CheckpointFileManager.scala:209) > at > org.apache.spark.sql.execution.streaming.HDFSMetadataLog.<init>(HDFSMetadataLog.scala:64) > at > org.apache.spark.sql.execution.streaming.CompactibleFileStreamLog.<init>(CompactibleFileStreamLog.scala:48) > at > org.apache.spark.sql.execution.streaming.FileStreamSinkLog.<init>(FileStreamSinkLog.scala:91) > at > org.apache.spark.sql.execution.streaming.MetadataLogFileIndex.<init>(MetadataLogFileIndex.scala:52) > at > org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:369) > at > org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:229) > at > org.apache.spark.sql.DataFrameReader.$anonfun$load$2(DataFrameReader.scala:211) > at scala.Option.getOrElse(Option.scala:201) > at > org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211) > at > org.apache.spark.sql.DataFrameReader.text(DataFrameReader.scala:646) > at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native > Method) > at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.base/java.lang.reflect.Method.invoke(Method.java:566) > at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) > at > py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374) > at py4j.Gateway.invoke(Gateway.java:282) > at > py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) > at py4j.commands.CallCommand.execute(CallCommand.java:79) > at > py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182) > at py4j.ClientServerConnection.run(ClientServerConnection.java:106) > at java.base/java.lang.Thread.run(Thread.java:829) > > =============================== > Which I am not sure if it is related since Spark can see and list the > bucket (it also returns the correct object size which is 33 Bytes.). > > Best, > Amin > > > On Thu, May 30, 2024 at 4:05 PM Mich Talebzadeh <mich.talebza...@gmail.com> > wrote: > >> Hello, >> >> Overall, the exit code of 0 suggests a successful run of your Spark job. >> Analyze the intended purpose of your code and verify the output or Spark UI >> for further confirmation. >> >> 24/05/30 01:23:43 INFO SparkContext: SparkContext is stopping with >> exitCode 0. >> >> what to check >> >> >> 1. Verify Output: If your Spark job was intended to read data from S3 >> and process it, you will need to verify the output to ensure the data was >> handled correctly. This might involve checking if any results were written >> to a designated location or if any transformations were applied >> successfully. >> 2. Review Code: >> 3. Check Spark UI: >> >> >> HTH >> >> Mich Talebzadeh, >> Technologist | Architect | Data Engineer | Generative AI | FinCrime >> PhD <https://en.wikipedia.org/wiki/Doctor_of_Philosophy> Imperial >> College London <https://en.wikipedia.org/wiki/Imperial_College_London> >> London, United Kingdom >> >> >> view my Linkedin profile >> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> >> >> >> https://en.everybodywiki.com/Mich_Talebzadeh >> >> >> >> *Disclaimer:* The information provided is correct to the best of my >> knowledge but of course cannot be guaranteed . It is essential to note >> that, as with any advice, quote "one test result is worth one-thousand >> expert opinions (Werner >> <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von Braun >> <https://en.wikipedia.org/wiki/Wernher_von_Braun>)". >> >> >> On Thu, 30 May 2024 at 11:56, Amin Mosayyebzadeh <mosayyebza...@gmail.com> >> wrote: >> >>> Hi Mich, >>> >>> Thank you for the help and sorry about the late reply. >>> I ran your provided but I got "exitCode 0". Here is the complete output: >>> >>> =============================== >>> >>> >>> 24/05/30 01:23:38 INFO SparkContext: Running Spark version 3.5.0 >>> 24/05/30 01:23:38 INFO SparkContext: OS info Linux, 5.4.0-182-generic, >>> amd64 >>> 24/05/30 01:23:38 INFO SparkContext: Java version 11.0.22 >>> 24/05/30 01:23:38 WARN NativeCodeLoader: Unable to load native-hadoop >>> library for your platform... using builtin-java classes where applicable >>> 24/05/30 01:23:38 INFO ResourceUtils: >>> ============================================================== >>> 24/05/30 01:23:38 INFO ResourceUtils: No custom resources configured for >>> spark.driver. >>> 24/05/30 01:23:38 INFO ResourceUtils: >>> ============================================================== >>> 24/05/30 01:23:38 INFO SparkContext: Submitted application: S3ReadTest >>> 24/05/30 01:23:38 INFO ResourceProfile: Default ResourceProfile created, >>> executor resources: Map(cores -> name: cores, amount: 1, script: , vendor: >>> , memory -> name: memory, amount: 1024, script: , vendor: , offHeap -> >>> name: offHeap, amount: 0, script: , vendor: ), task resources: Map(cpus -> >>> name: cpus, amount: 1.0) >>> 24/05/30 01:23:38 INFO ResourceProfile: Limiting resource is cpu >>> 24/05/30 01:23:38 INFO ResourceProfileManager: Added ResourceProfile id: >>> 0 >>> 24/05/30 01:23:38 INFO SecurityManager: Changing view acls to: ubuntu >>> 24/05/30 01:23:38 INFO SecurityManager: Changing modify acls to: ubuntu >>> 24/05/30 01:23:38 INFO SecurityManager: Changing view acls groups to: >>> 24/05/30 01:23:38 INFO SecurityManager: Changing modify acls groups to: >>> 24/05/30 01:23:38 INFO SecurityManager: SecurityManager: authentication >>> disabled; ui acls disabled; users with view permissions: ubuntu; groups >>> with view permissions: EMPTY; users with modify permissions: ubuntu; groups >>> with modify permissions: EMPTY >>> 24/05/30 01:23:38 INFO Utils: Successfully started service 'sparkDriver' >>> on port 46321. >>> 24/05/30 01:23:38 INFO SparkEnv: Registering MapOutputTracker >>> 24/05/30 01:23:38 INFO SparkEnv: Registering BlockManagerMaster >>> 24/05/30 01:23:38 INFO BlockManagerMasterEndpoint: Using >>> org.apache.spark.storage.DefaultTopologyMapper for getting topology >>> information >>> 24/05/30 01:23:38 INFO BlockManagerMasterEndpoint: >>> BlockManagerMasterEndpoint up >>> 24/05/30 01:23:38 INFO SparkEnv: Registering BlockManagerMasterHeartbeat >>> 24/05/30 01:23:38 INFO DiskBlockManager: Created local directory at >>> /tmp/blockmgr-a1fc37d5-885a-4ed0-b8f2-4eeb930c69ee >>> 24/05/30 01:23:38 INFO MemoryStore: MemoryStore started with capacity >>> 2.8 GiB >>> 24/05/30 01:23:38 INFO SparkEnv: Registering OutputCommitCoordinator >>> 24/05/30 01:23:39 INFO JettyUtils: Start Jetty 0.0.0.0:4040 for SparkUI >>> 24/05/30 01:23:39 INFO Utils: Successfully started service 'SparkUI' on >>> port 4040. >>> 24/05/30 01:23:39 INFO Executor: Starting executor ID driver on host >>> MOC-R4PAC08U33-S1C >>> 24/05/30 01:23:39 INFO Executor: OS info Linux, 5.4.0-182-generic, amd64 >>> 24/05/30 01:23:39 INFO Executor: Java version 11.0.22 >>> 24/05/30 01:23:39 INFO Executor: Starting executor with user classpath >>> (userClassPathFirst = false): '' >>> 24/05/30 01:23:39 INFO Executor: Created or updated repl class loader >>> org.apache.spark.util.MutableURLClassLoader@a45f4d6 for default. >>> 24/05/30 01:23:39 INFO Utils: Successfully started service >>> 'org.apache.spark.network.netty.NettyBlockTransferService' on port 39343. >>> 24/05/30 01:23:39 INFO NettyBlockTransferService: Server created on >>> MOC-R4PAC08U33-S1C:39343 >>> 24/05/30 01:23:39 INFO BlockManager: Using >>> org.apache.spark.storage.RandomBlockReplicationPolicy for block replication >>> policy >>> 24/05/30 01:23:39 INFO BlockManagerMaster: Registering BlockManager >>> BlockManagerId(driver, MOC-R4PAC08U33-S1C, 39343, None) >>> 24/05/30 01:23:39 INFO BlockManagerMasterEndpoint: Registering block >>> manager MOC-R4PAC08U33-S1C:39343 with 2.8 GiB RAM, BlockManagerId(driver, >>> MOC-R4PAC08U33-S1C, 39343, None) >>> 24/05/30 01:23:39 INFO BlockManagerMaster: Registered BlockManager >>> BlockManagerId(driver, MOC-R4PAC08U33-S1C, 39343, None) >>> 24/05/30 01:23:39 INFO BlockManager: Initialized BlockManager: >>> BlockManagerId(driver, MOC-R4PAC08U33-S1C, 39343, None) >>> 24/05/30 01:23:39 INFO SharedState: Setting hive.metastore.warehouse.dir >>> ('null') to the value of spark.sql.warehouse.dir. >>> 24/05/30 01:23:39 INFO SharedState: Warehouse path is >>> 'file:/home/ubuntu/tpch-spark/spark-warehouse'. >>> 24/05/30 01:23:40 WARN MetricsConfig: Cannot locate configuration: tried >>> hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties >>> 24/05/30 01:23:40 INFO MetricsSystemImpl: Scheduled Metric snapshot >>> period at 10 second(s). >>> 24/05/30 01:23:40 INFO MetricsSystemImpl: s3a-file-system metrics system >>> started >>> 24/05/30 01:23:41 INFO MetadataLogFileIndex: Reading streaming file log >>> from s3a://test-bucket/testfile.csv/_spark_metadata >>> 24/05/30 01:23:41 INFO FileStreamSinkLog: BatchIds found from listing: >>> 24/05/30 01:23:43 INFO FileSourceStrategy: Pushed Filters: >>> 24/05/30 01:23:43 INFO FileSourceStrategy: Post-Scan Filters: >>> 24/05/30 01:23:43 INFO CodeGenerator: Code generated in 188.932153 ms >>> 24/05/30 01:23:43 INFO MemoryStore: Block broadcast_0 stored as values >>> in memory (estimated size 201.3 KiB, free 2.8 GiB) >>> 24/05/30 01:23:43 INFO MemoryStore: Block broadcast_0_piece0 stored as >>> bytes in memory (estimated size 34.8 KiB, free 2.8 GiB) >>> 24/05/30 01:23:43 INFO BlockManagerInfo: Added broadcast_0_piece0 in >>> memory on MOC-R4PAC08U33-S1C:39343 (size: 34.8 KiB, free: 2.8 GiB) >>> 24/05/30 01:23:43 INFO SparkContext: Created broadcast 0 from showString >>> at NativeMethodAccessorImpl.java:0 >>> 24/05/30 01:23:43 INFO FileSourceScanExec: Planning scan with bin >>> packing, max size: 4194304 bytes, open cost is considered as scanning >>> 4194304 bytes. >>> +----+----+----+ >>> |name|int1|int2| >>> +----+----+----+ >>> +----+----+----+ >>> >>> 24/05/30 01:23:43 INFO SparkContext: SparkContext is stopping with >>> exitCode 0. >>> 24/05/30 01:23:43 INFO SparkUI: Stopped Spark web UI at >>> http://MOC-R4PAC08U33-S1C:4040 >>> 24/05/30 01:23:43 INFO MapOutputTrackerMasterEndpoint: >>> MapOutputTrackerMasterEndpoint stopped! >>> 24/05/30 01:23:43 INFO MemoryStore: MemoryStore cleared >>> 24/05/30 01:23:43 INFO BlockManager: BlockManager stopped >>> 24/05/30 01:23:43 INFO BlockManagerMaster: BlockManagerMaster stopped >>> 24/05/30 01:23:43 INFO >>> OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: >>> OutputCommitCoordinator stopped! >>> 24/05/30 01:23:43 INFO SparkContext: Successfully stopped SparkContext >>> 24/05/30 01:23:44 INFO ShutdownHookManager: Shutdown hook called >>> 24/05/30 01:23:44 INFO ShutdownHookManager: Deleting directory >>> /tmp/spark-f26cd915-aeb6-4efc-8960-56ca51ac1a7d >>> 24/05/30 01:23:44 INFO ShutdownHookManager: Deleting directory >>> /tmp/spark-91b984d6-62fe-4c6b-9996-36d6873ff5d6 >>> 24/05/30 01:23:44 INFO ShutdownHookManager: Deleting directory >>> /tmp/spark-91b984d6-62fe-4c6b-9996-36d6873ff5d6/pyspark-bcca58a9-38dc-4359-85d1-81b728d6cf82 >>> >>> >>> Best, >>> Amin >>> >>> >>> >>> On Thu, May 23, 2024 at 4:20 PM Mich Talebzadeh < >>> mich.talebza...@gmail.com> wrote: >>> >>>> Could be a number of reasons >>>> >>>> First test reading the file with a cli >>>> >>>> aws s3 cp s3a://input/testfile.csv . >>>> cat testfile.csv >>>> >>>> >>>> Try this code with debug option to diagnose the problem >>>> >>>> from pyspark.sql import SparkSession >>>> from pyspark.sql.utils import AnalysisException >>>> >>>> try: >>>> # Initialize Spark session >>>> spark = SparkSession.builder \ >>>> .appName("S3ReadTest") \ >>>> .config("spark.jars.packages", >>>> "org.apache.hadoop:hadoop-aws:3.3.6") \ >>>> .config("spark.hadoop.fs.s3a.access.key", "R*************6") \ >>>> .config("spark.hadoop.fs.s3a.secret.key", "1***************e") \ >>>> .config("spark.hadoop.fs.s3a.endpoint", "192.168.52.63:8000") \ >>>> .config("spark.hadoop.fs.s3a.path.style.access", "true") \ >>>> .config("spark.hadoop.fs.s3a.impl", >>>> "org.apache.hadoop.fs.s3a.S3AFileSystem") \ >>>> .getOrCreate() >>>> >>>> # Read the CSV file from S3 >>>> df = spark.read \ >>>> .option("header", "true") \ >>>> .option("inferSchema", "true") \ >>>> .option("delimiter", " ") \ # ensure this is apace >>>> .csv("s3a://input/testfile.csv") >>>> >>>> # Show the data >>>> df.show(n=1) >>>> >>>> except AnalysisException as e: >>>> print(f"AnalysisException: {e}") >>>> except Exception as e: >>>> print(f"Error: {e}") >>>> finally: >>>> # Stop the Spark session >>>> spark.stop() >>>> >>>> HTH >>>> >>>> Mich Talebzadeh, >>>> Technologist | Architect | Data Engineer | Generative AI | FinCrime >>>> London >>>> United Kingdom >>>> >>>> >>>> view my Linkedin profile >>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> >>>> >>>> >>>> https://en.everybodywiki.com/Mich_Talebzadeh >>>> >>>> >>>> >>>> *Disclaimer:* The information provided is correct to the best of my >>>> knowledge but of course cannot be guaranteed . It is essential to note >>>> that, as with any advice, quote "one test result is worth one-thousand >>>> expert opinions (Werner >>>> <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von Braun >>>> <https://en.wikipedia.org/wiki/Wernher_von_Braun>)". >>>> >>>> >>>> On Thu, 23 May 2024 at 20:14, Amin Mosayyebzadeh < >>>> mosayyebza...@gmail.com> wrote: >>>> >>>>> I am trying to read an s3 object from a local S3 storage (Ceph based) >>>>> using Spark 3.5.1. I see it can access the bucket and list the files (I >>>>> have verified it on Ceph side by checking its logs), even returning the >>>>> correct size of the object. But the content is not read. >>>>> >>>>> The object url is: >>>>> s3a://input/testfile.csv (I have also tested a nested bucket: >>>>> s3a://test1/test2/test3/testfile.csv) >>>>> >>>>> >>>>> Object's content: >>>>> >>>>> ===================== >>>>> name int1 int2 >>>>> first 1 2 >>>>> second 3 4 >>>>> ===================== >>>>> >>>>> >>>>> Here is the config I have set so far: >>>>> >>>>> ("spark.jars.packages", "org.apache.hadoop:hadoop-aws:3.3.6") >>>>> ("spark.hadoop.fs.s3a.access.key", "R*************6") >>>>> ("spark.hadoop.fs.s3a.secret.key", "1***************e") >>>>> ("spark.hadoop.fs.s3a.endpoint", "192.168.52.63:8000") >>>>> ("spark.hadoop.fs.s3a.path.style.access", "true") >>>>> ("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") >>>>> >>>>> >>>>> The outop for my following Pyspark application: >>>>> df = spark.read \ >>>>> .option("header", "true") \ >>>>> .schema(schema) \ >>>>> .csv("s3a://input/testfile.csv", sep=' ') >>>>> >>>>> df.show(n=1) >>>>> ================================== >>>>> 24/05/20 02:35:00 INFO MetricsSystemImpl: s3a-file-system metrics system >>>>> started24/05/20 02:35:01 INFO MetadataLogFileIndex: Reading streaming >>>>> file log from s3a://input/testfile.csv/_spark_metadata24/05/20 02:35:01 >>>>> INFO FileStreamSinkLog: BatchIds found from listing:24/05/20 02:35:03 >>>>> INFO FileSourceStrategy: Pushed Filters:24/05/20 02:35:03 INFO >>>>> FileSourceStrategy: Post-Scan Filters:24/05/20 02:35:03 INFO >>>>> CodeGenerator: Code generated in 176.139675 ms24/05/20 02:35:03 INFO >>>>> MemoryStore: Block broadcast_0 stored as values in memory (estimated size >>>>> 496.6 KiB, free 4.1 GiB)24/05/20 02:35:03 INFO MemoryStore: Block >>>>> broadcast_0_piece0 stored as bytes in memory (estimated size 54.4 KiB, >>>>> free 4.1 GiB)24/05/20 02:35:03 INFO BlockManagerInfo: Added >>>>> broadcast_0_piece0 in memory on master:38197 (size: 54.4 KiB, free: 4.1 >>>>> GiB)24/05/20 02:35:03 INFO SparkContext: Created broadcast 0 from >>>>> showString at NativeMethodAccessorImpl.java:024/05/20 02:35:03 INFO >>>>> FileSourceScanExec: Planning scan with bin packing, max size: 4194304 >>>>> bytes, open cost is considered as scanning 4194304 bytes. >>>>> +----+----+----+ >>>>> |name|int1|int2| >>>>> +----+----+----+ >>>>> +----+----+----+ >>>>> 24/05/20 02:35:04 INFO SparkContext: Invoking stop() from shutdown >>>>> hook24/05/20 02:35:04 INFO SparkContext: SparkContext is stopping with >>>>> exitCode 0 >>>>> ========================================= >>>>> >>>>> Am I missing something here? >>>>> >>>>> P.S. I see OP_IS_DIRECTORY is set to 1. Is that a correct behavior? >>>>> >>>>> >>>>> Thanks in advance! >>>>> >>>>>