ok

some observations


   - Spark job successfully lists the S3 bucket containing testfile.csv.
   - Spark job can retrieve the file size (33 Bytes) for testfile.csv.
   - Spark job fails to read the actual data from testfile.csv.
   - The printed content from testfile.csv is an empty list.
   - Spark logs show a debug message with an exception related to
   UserGroupInformation while trying to access the _spark_metadata file
   associated with testfile.csv.

possible causes


   - Permission Issues: Spark user (likely ubuntu based on logs) might lack
   the necessary permissions to access the testfile.csv file or the
   _spark_metadata file on S3 storage.
   - Spark Configuration: Issues with Spark's configuration for S3 access,
   such as missing credentials or incorrect security settings.
   - Spark attempting to read unnecessary files: The _spark_metadata file
   might not be essential for your current operation, and Spark's attempt to
   read it could be causing the issue.


HTH

Mich Talebzadeh,
Technologist | Architect | Data Engineer  | Generative AI | FinCrime
PhD <https://en.wikipedia.org/wiki/Doctor_of_Philosophy> Imperial College
London <https://en.wikipedia.org/wiki/Imperial_College_London>
London, United Kingdom


   view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>


 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* The information provided is correct to the best of my
knowledge but of course cannot be guaranteed . It is essential to note
that, as with any advice, quote "one test result is worth one-thousand
expert opinions (Werner  <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von
Braun <https://en.wikipedia.org/wiki/Wernher_von_Braun>)".


On Thu, 30 May 2024 at 22:29, Amin Mosayyebzadeh <mosayyebza...@gmail.com>
wrote:

> The code should read testfile.csv file from s3. and print the content. It
> only prints a empty list although the file has content.
> I have also checked our custom s3 storage (Ceph based) logs and I see only
> LIST operations coming from Spark, there is no GET object operation for
> testfile.csv
>
> The only error I see in DEBUG output is these lines:
>
> =====================================
> 24/05/30 15:39:21 INFO MetadataLogFileIndex: Reading streaming file log
> from s3a://test-bucket/testfile.csv/_spark_metadata
> 24/05/30 15:39:21 DEBUG UserGroupInformation: PrivilegedAction [as: ubuntu
> (auth:SIMPLE)][action: org.apache.hadoop.fs.FileContext$2@7af85238]
> java.lang.Exception
>         at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1875)
>         at
> org.apache.hadoop.fs.FileContext.getAbstractFileSystem(FileContext.java:339)
>         at
> org.apache.hadoop.fs.FileContext.getFileContext(FileContext.java:465)
>         at
> org.apache.spark.sql.execution.streaming.AbstractFileContextBasedCheckpointFileManager.<init>(CheckpointFileManager.scala:311)
>         at
> org.apache.spark.sql.execution.streaming.FileContextBasedCheckpointFileManager.<init>(CheckpointFileManager.scala:352)
>         at
> org.apache.spark.sql.execution.streaming.CheckpointFileManager$.create(CheckpointFileManager.scala:209)
>         at
> org.apache.spark.sql.execution.streaming.HDFSMetadataLog.<init>(HDFSMetadataLog.scala:64)
>         at
> org.apache.spark.sql.execution.streaming.CompactibleFileStreamLog.<init>(CompactibleFileStreamLog.scala:48)
>         at
> org.apache.spark.sql.execution.streaming.FileStreamSinkLog.<init>(FileStreamSinkLog.scala:91)
>         at
> org.apache.spark.sql.execution.streaming.MetadataLogFileIndex.<init>(MetadataLogFileIndex.scala:52)
>         at
> org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:369)
>         at
> org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:229)
>         at
> org.apache.spark.sql.DataFrameReader.$anonfun$load$2(DataFrameReader.scala:211)
>         at scala.Option.getOrElse(Option.scala:201)
>         at
> org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211)
>         at
> org.apache.spark.sql.DataFrameReader.text(DataFrameReader.scala:646)
>         at
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
> Method)
>         at
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>         at
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.base/java.lang.reflect.Method.invoke(Method.java:566)
>         at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
>         at
> py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
>         at py4j.Gateway.invoke(Gateway.java:282)
>         at
> py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
>         at py4j.commands.CallCommand.execute(CallCommand.java:79)
>         at
> py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
>         at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
>         at java.base/java.lang.Thread.run(Thread.java:829)
>
> ===============================
> Which I am not sure if it is related since Spark can see and list the
> bucket (it also returns the correct object size which is 33 Bytes.).
>
> Best,
> Amin
>
>
> On Thu, May 30, 2024 at 4:05 PM Mich Talebzadeh <mich.talebza...@gmail.com>
> wrote:
>
>> Hello,
>>
>> Overall, the exit code of 0 suggests a successful run of your Spark job.
>> Analyze the intended purpose of your code and verify the output or Spark UI
>> for further confirmation.
>>
>> 24/05/30 01:23:43 INFO SparkContext: SparkContext is stopping with
>> exitCode 0.
>>
>> what to check
>>
>>
>>    1. Verify Output: If your Spark job was intended to read data from S3
>>    and process it, you will need to verify the output to ensure the data was
>>    handled correctly. This might involve checking if any results were written
>>    to a designated location or if any transformations were applied
>>    successfully.
>>    2. Review Code:
>>    3. Check Spark UI:
>>
>>
>> HTH
>>
>> Mich Talebzadeh,
>> Technologist | Architect | Data Engineer  | Generative AI | FinCrime
>> PhD <https://en.wikipedia.org/wiki/Doctor_of_Philosophy> Imperial
>> College London <https://en.wikipedia.org/wiki/Imperial_College_London>
>> London, United Kingdom
>>
>>
>>    view my Linkedin profile
>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>
>>
>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>
>>
>>
>> *Disclaimer:* The information provided is correct to the best of my
>> knowledge but of course cannot be guaranteed . It is essential to note
>> that, as with any advice, quote "one test result is worth one-thousand
>> expert opinions (Werner
>> <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von Braun
>> <https://en.wikipedia.org/wiki/Wernher_von_Braun>)".
>>
>>
>> On Thu, 30 May 2024 at 11:56, Amin Mosayyebzadeh <mosayyebza...@gmail.com>
>> wrote:
>>
>>> Hi Mich,
>>>
>>> Thank you for the help and sorry about the late reply.
>>> I ran your provided but I got "exitCode 0". Here is the complete output:
>>>
>>> ===============================
>>>
>>>
>>> 24/05/30 01:23:38 INFO SparkContext: Running Spark version 3.5.0
>>> 24/05/30 01:23:38 INFO SparkContext: OS info Linux, 5.4.0-182-generic,
>>> amd64
>>> 24/05/30 01:23:38 INFO SparkContext: Java version 11.0.22
>>> 24/05/30 01:23:38 WARN NativeCodeLoader: Unable to load native-hadoop
>>> library for your platform... using builtin-java classes where applicable
>>> 24/05/30 01:23:38 INFO ResourceUtils:
>>> ==============================================================
>>> 24/05/30 01:23:38 INFO ResourceUtils: No custom resources configured for
>>> spark.driver.
>>> 24/05/30 01:23:38 INFO ResourceUtils:
>>> ==============================================================
>>> 24/05/30 01:23:38 INFO SparkContext: Submitted application: S3ReadTest
>>> 24/05/30 01:23:38 INFO ResourceProfile: Default ResourceProfile created,
>>> executor resources: Map(cores -> name: cores, amount: 1, script: , vendor:
>>> , memory -> name: memory, amount: 1024, script: , vendor: , offHeap ->
>>> name: offHeap, amount: 0, script: , vendor: ), task resources: Map(cpus ->
>>> name: cpus, amount: 1.0)
>>> 24/05/30 01:23:38 INFO ResourceProfile: Limiting resource is cpu
>>> 24/05/30 01:23:38 INFO ResourceProfileManager: Added ResourceProfile id:
>>> 0
>>> 24/05/30 01:23:38 INFO SecurityManager: Changing view acls to: ubuntu
>>> 24/05/30 01:23:38 INFO SecurityManager: Changing modify acls to: ubuntu
>>> 24/05/30 01:23:38 INFO SecurityManager: Changing view acls groups to:
>>> 24/05/30 01:23:38 INFO SecurityManager: Changing modify acls groups to:
>>> 24/05/30 01:23:38 INFO SecurityManager: SecurityManager: authentication
>>> disabled; ui acls disabled; users with view permissions: ubuntu; groups
>>> with view permissions: EMPTY; users with modify permissions: ubuntu; groups
>>> with modify permissions: EMPTY
>>> 24/05/30 01:23:38 INFO Utils: Successfully started service 'sparkDriver'
>>> on port 46321.
>>> 24/05/30 01:23:38 INFO SparkEnv: Registering MapOutputTracker
>>> 24/05/30 01:23:38 INFO SparkEnv: Registering BlockManagerMaster
>>> 24/05/30 01:23:38 INFO BlockManagerMasterEndpoint: Using
>>> org.apache.spark.storage.DefaultTopologyMapper for getting topology
>>> information
>>> 24/05/30 01:23:38 INFO BlockManagerMasterEndpoint:
>>> BlockManagerMasterEndpoint up
>>> 24/05/30 01:23:38 INFO SparkEnv: Registering BlockManagerMasterHeartbeat
>>> 24/05/30 01:23:38 INFO DiskBlockManager: Created local directory at
>>> /tmp/blockmgr-a1fc37d5-885a-4ed0-b8f2-4eeb930c69ee
>>> 24/05/30 01:23:38 INFO MemoryStore: MemoryStore started with capacity
>>> 2.8 GiB
>>> 24/05/30 01:23:38 INFO SparkEnv: Registering OutputCommitCoordinator
>>> 24/05/30 01:23:39 INFO JettyUtils: Start Jetty 0.0.0.0:4040 for SparkUI
>>> 24/05/30 01:23:39 INFO Utils: Successfully started service 'SparkUI' on
>>> port 4040.
>>> 24/05/30 01:23:39 INFO Executor: Starting executor ID driver on host
>>> MOC-R4PAC08U33-S1C
>>> 24/05/30 01:23:39 INFO Executor: OS info Linux, 5.4.0-182-generic, amd64
>>> 24/05/30 01:23:39 INFO Executor: Java version 11.0.22
>>> 24/05/30 01:23:39 INFO Executor: Starting executor with user classpath
>>> (userClassPathFirst = false): ''
>>> 24/05/30 01:23:39 INFO Executor: Created or updated repl class loader
>>> org.apache.spark.util.MutableURLClassLoader@a45f4d6 for default.
>>> 24/05/30 01:23:39 INFO Utils: Successfully started service
>>> 'org.apache.spark.network.netty.NettyBlockTransferService' on port 39343.
>>> 24/05/30 01:23:39 INFO NettyBlockTransferService: Server created on
>>> MOC-R4PAC08U33-S1C:39343
>>> 24/05/30 01:23:39 INFO BlockManager: Using
>>> org.apache.spark.storage.RandomBlockReplicationPolicy for block replication
>>> policy
>>> 24/05/30 01:23:39 INFO BlockManagerMaster: Registering BlockManager
>>> BlockManagerId(driver, MOC-R4PAC08U33-S1C, 39343, None)
>>> 24/05/30 01:23:39 INFO BlockManagerMasterEndpoint: Registering block
>>> manager MOC-R4PAC08U33-S1C:39343 with 2.8 GiB RAM, BlockManagerId(driver,
>>> MOC-R4PAC08U33-S1C, 39343, None)
>>> 24/05/30 01:23:39 INFO BlockManagerMaster: Registered BlockManager
>>> BlockManagerId(driver, MOC-R4PAC08U33-S1C, 39343, None)
>>> 24/05/30 01:23:39 INFO BlockManager: Initialized BlockManager:
>>> BlockManagerId(driver, MOC-R4PAC08U33-S1C, 39343, None)
>>> 24/05/30 01:23:39 INFO SharedState: Setting hive.metastore.warehouse.dir
>>> ('null') to the value of spark.sql.warehouse.dir.
>>> 24/05/30 01:23:39 INFO SharedState: Warehouse path is
>>> 'file:/home/ubuntu/tpch-spark/spark-warehouse'.
>>> 24/05/30 01:23:40 WARN MetricsConfig: Cannot locate configuration: tried
>>> hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties
>>> 24/05/30 01:23:40 INFO MetricsSystemImpl: Scheduled Metric snapshot
>>> period at 10 second(s).
>>> 24/05/30 01:23:40 INFO MetricsSystemImpl: s3a-file-system metrics system
>>> started
>>> 24/05/30 01:23:41 INFO MetadataLogFileIndex: Reading streaming file log
>>> from s3a://test-bucket/testfile.csv/_spark_metadata
>>> 24/05/30 01:23:41 INFO FileStreamSinkLog: BatchIds found from listing:
>>> 24/05/30 01:23:43 INFO FileSourceStrategy: Pushed Filters:
>>> 24/05/30 01:23:43 INFO FileSourceStrategy: Post-Scan Filters:
>>> 24/05/30 01:23:43 INFO CodeGenerator: Code generated in 188.932153 ms
>>> 24/05/30 01:23:43 INFO MemoryStore: Block broadcast_0 stored as values
>>> in memory (estimated size 201.3 KiB, free 2.8 GiB)
>>> 24/05/30 01:23:43 INFO MemoryStore: Block broadcast_0_piece0 stored as
>>> bytes in memory (estimated size 34.8 KiB, free 2.8 GiB)
>>> 24/05/30 01:23:43 INFO BlockManagerInfo: Added broadcast_0_piece0 in
>>> memory on MOC-R4PAC08U33-S1C:39343 (size: 34.8 KiB, free: 2.8 GiB)
>>> 24/05/30 01:23:43 INFO SparkContext: Created broadcast 0 from showString
>>> at NativeMethodAccessorImpl.java:0
>>> 24/05/30 01:23:43 INFO FileSourceScanExec: Planning scan with bin
>>> packing, max size: 4194304 bytes, open cost is considered as scanning
>>> 4194304 bytes.
>>> +----+----+----+
>>> |name|int1|int2|
>>> +----+----+----+
>>> +----+----+----+
>>>
>>> 24/05/30 01:23:43 INFO SparkContext: SparkContext is stopping with
>>> exitCode 0.
>>> 24/05/30 01:23:43 INFO SparkUI: Stopped Spark web UI at
>>> http://MOC-R4PAC08U33-S1C:4040
>>> 24/05/30 01:23:43 INFO MapOutputTrackerMasterEndpoint:
>>> MapOutputTrackerMasterEndpoint stopped!
>>> 24/05/30 01:23:43 INFO MemoryStore: MemoryStore cleared
>>> 24/05/30 01:23:43 INFO BlockManager: BlockManager stopped
>>> 24/05/30 01:23:43 INFO BlockManagerMaster: BlockManagerMaster stopped
>>> 24/05/30 01:23:43 INFO
>>> OutputCommitCoordinator$OutputCommitCoordinatorEndpoint:
>>> OutputCommitCoordinator stopped!
>>> 24/05/30 01:23:43 INFO SparkContext: Successfully stopped SparkContext
>>> 24/05/30 01:23:44 INFO ShutdownHookManager: Shutdown hook called
>>> 24/05/30 01:23:44 INFO ShutdownHookManager: Deleting directory
>>> /tmp/spark-f26cd915-aeb6-4efc-8960-56ca51ac1a7d
>>> 24/05/30 01:23:44 INFO ShutdownHookManager: Deleting directory
>>> /tmp/spark-91b984d6-62fe-4c6b-9996-36d6873ff5d6
>>> 24/05/30 01:23:44 INFO ShutdownHookManager: Deleting directory
>>> /tmp/spark-91b984d6-62fe-4c6b-9996-36d6873ff5d6/pyspark-bcca58a9-38dc-4359-85d1-81b728d6cf82
>>>
>>>
>>> Best,
>>> Amin
>>>
>>>
>>>
>>> On Thu, May 23, 2024 at 4:20 PM Mich Talebzadeh <
>>> mich.talebza...@gmail.com> wrote:
>>>
>>>> Could be a number of reasons
>>>>
>>>> First test reading the file with a cli
>>>>
>>>> aws s3 cp s3a://input/testfile.csv .
>>>> cat testfile.csv
>>>>
>>>>
>>>> Try this code with debug option to diagnose the problem
>>>>
>>>> from pyspark.sql import SparkSession
>>>> from pyspark.sql.utils import AnalysisException
>>>>
>>>> try:
>>>>     # Initialize Spark session
>>>>     spark = SparkSession.builder \
>>>>         .appName("S3ReadTest") \
>>>>         .config("spark.jars.packages",
>>>> "org.apache.hadoop:hadoop-aws:3.3.6") \
>>>>         .config("spark.hadoop.fs.s3a.access.key", "R*************6") \
>>>>         .config("spark.hadoop.fs.s3a.secret.key", "1***************e") \
>>>>         .config("spark.hadoop.fs.s3a.endpoint", "192.168.52.63:8000") \
>>>>         .config("spark.hadoop.fs.s3a.path.style.access", "true") \
>>>>         .config("spark.hadoop.fs.s3a.impl",
>>>> "org.apache.hadoop.fs.s3a.S3AFileSystem") \
>>>>         .getOrCreate()
>>>>
>>>>     # Read the CSV file from S3
>>>>     df = spark.read \
>>>>         .option("header", "true") \
>>>>         .option("inferSchema", "true") \
>>>>         .option("delimiter", " ") \  # ensure this is apace
>>>>         .csv("s3a://input/testfile.csv")
>>>>
>>>>     # Show the data
>>>>     df.show(n=1)
>>>>
>>>> except AnalysisException as e:
>>>>     print(f"AnalysisException: {e}")
>>>> except Exception as e:
>>>>     print(f"Error: {e}")
>>>> finally:
>>>>     # Stop the Spark session
>>>>     spark.stop()
>>>>
>>>> HTH
>>>>
>>>> Mich Talebzadeh,
>>>> Technologist | Architect | Data Engineer  | Generative AI | FinCrime
>>>> London
>>>> United Kingdom
>>>>
>>>>
>>>>    view my Linkedin profile
>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>>
>>>>
>>>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>>>
>>>>
>>>>
>>>> *Disclaimer:* The information provided is correct to the best of my
>>>> knowledge but of course cannot be guaranteed . It is essential to note
>>>> that, as with any advice, quote "one test result is worth one-thousand
>>>> expert opinions (Werner
>>>> <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von Braun
>>>> <https://en.wikipedia.org/wiki/Wernher_von_Braun>)".
>>>>
>>>>
>>>> On Thu, 23 May 2024 at 20:14, Amin Mosayyebzadeh <
>>>> mosayyebza...@gmail.com> wrote:
>>>>
>>>>> I am trying to read an s3 object from a local S3 storage (Ceph based)
>>>>> using Spark 3.5.1. I see it can access the bucket and list the files (I
>>>>> have verified it on Ceph side by checking its logs), even returning the
>>>>> correct size of the object. But the content is not read.
>>>>>
>>>>> The object url is:
>>>>> s3a://input/testfile.csv (I have also tested a nested bucket:
>>>>> s3a://test1/test2/test3/testfile.csv)
>>>>>
>>>>>
>>>>> Object's content:
>>>>>
>>>>> =====================
>>>>> name int1 int2
>>>>> first 1 2
>>>>> second 3 4
>>>>> =====================
>>>>>
>>>>>
>>>>> Here is the config I have set so far:
>>>>>
>>>>> ("spark.jars.packages", "org.apache.hadoop:hadoop-aws:3.3.6")
>>>>> ("spark.hadoop.fs.s3a.access.key", "R*************6")
>>>>> ("spark.hadoop.fs.s3a.secret.key", "1***************e")
>>>>> ("spark.hadoop.fs.s3a.endpoint", "192.168.52.63:8000")
>>>>> ("spark.hadoop.fs.s3a.path.style.access", "true")
>>>>> ("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
>>>>>
>>>>>
>>>>> The outop for my following Pyspark application:
>>>>> df = spark.read \
>>>>>     .option("header", "true") \
>>>>>     .schema(schema) \
>>>>>     .csv("s3a://input/testfile.csv", sep=' ')
>>>>>
>>>>> df.show(n=1)
>>>>> ==================================
>>>>> 24/05/20 02:35:00 INFO MetricsSystemImpl: s3a-file-system metrics system 
>>>>> started24/05/20 02:35:01 INFO MetadataLogFileIndex: Reading streaming 
>>>>> file log from s3a://input/testfile.csv/_spark_metadata24/05/20 02:35:01 
>>>>> INFO FileStreamSinkLog: BatchIds found from listing:24/05/20 02:35:03 
>>>>> INFO FileSourceStrategy: Pushed Filters:24/05/20 02:35:03 INFO 
>>>>> FileSourceStrategy: Post-Scan Filters:24/05/20 02:35:03 INFO 
>>>>> CodeGenerator: Code generated in 176.139675 ms24/05/20 02:35:03 INFO 
>>>>> MemoryStore: Block broadcast_0 stored as values in memory (estimated size 
>>>>> 496.6 KiB, free 4.1 GiB)24/05/20 02:35:03 INFO MemoryStore: Block 
>>>>> broadcast_0_piece0 stored as bytes in memory (estimated size 54.4 KiB, 
>>>>> free 4.1 GiB)24/05/20 02:35:03 INFO BlockManagerInfo: Added 
>>>>> broadcast_0_piece0 in memory on master:38197 (size: 54.4 KiB, free: 4.1 
>>>>> GiB)24/05/20 02:35:03 INFO SparkContext: Created broadcast 0 from 
>>>>> showString at NativeMethodAccessorImpl.java:024/05/20 02:35:03 INFO 
>>>>> FileSourceScanExec: Planning scan with bin packing, max size: 4194304 
>>>>> bytes, open cost is considered as scanning 4194304 bytes.
>>>>> +----+----+----+
>>>>> |name|int1|int2|
>>>>> +----+----+----+
>>>>> +----+----+----+
>>>>> 24/05/20 02:35:04 INFO SparkContext: Invoking stop() from shutdown 
>>>>> hook24/05/20 02:35:04 INFO SparkContext: SparkContext is stopping with 
>>>>> exitCode 0
>>>>> =========================================
>>>>>
>>>>> Am I missing something here?
>>>>>
>>>>> P.S. I see OP_IS_DIRECTORY is set to 1. Is that a correct behavior?
>>>>>
>>>>>
>>>>> Thanks in advance!
>>>>>
>>>>>

Reply via email to