Could be a number of reasons
First test reading the file with a cli
aws s3 cp s3a://input/testfile.csv .
cat testfile.csv
Try this code with debug option to diagnose the problem
from pyspark.sql import SparkSession
from pyspark.sql.utils import AnalysisException
try:
# Initialize Spark session
spark = SparkSession.builder \
.appName("S3ReadTest") \
.config("spark.jars.packages",
"org.apache.hadoop:hadoop-aws:3.3.6") \
.config("spark.hadoop.fs.s3a.access.key", "R*************6") \
.config("spark.hadoop.fs.s3a.secret.key", "1***************e") \
.config("spark.hadoop.fs.s3a.endpoint", "192.168.52.63:8000") \
.config("spark.hadoop.fs.s3a.path.style.access", "true") \
.config("spark.hadoop.fs.s3a.impl",
"org.apache.hadoop.fs.s3a.S3AFileSystem") \
.getOrCreate()
# Read the CSV file from S3
df = spark.read \
.option("header", "true") \
.option("inferSchema", "true") \
.option("delimiter", " ") \ # ensure this is apace
.csv("s3a://input/testfile.csv")
# Show the data
df.show(n=1)
except AnalysisException as e:
print(f"AnalysisException: {e}")
except Exception as e:
print(f"Error: {e}")
finally:
# Stop the Spark session
spark.stop()
HTH
Mich Talebzadeh,
Technologist | Architect | Data Engineer | Generative AI | FinCrime
London
United Kingdom
view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
https://en.everybodywiki.com/Mich_Talebzadeh
*Disclaimer:* The information provided is correct to the best of my
knowledge but of course cannot be guaranteed . It is essential to note
that, as with any advice, quote "one test result is worth one-thousand
expert opinions (Werner <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von
Braun <https://en.wikipedia.org/wiki/Wernher_von_Braun>)".
On Thu, 23 May 2024 at 20:14, Amin Mosayyebzadeh <[email protected]>
wrote:
> I am trying to read an s3 object from a local S3 storage (Ceph based)
> using Spark 3.5.1. I see it can access the bucket and list the files (I
> have verified it on Ceph side by checking its logs), even returning the
> correct size of the object. But the content is not read.
>
> The object url is:
> s3a://input/testfile.csv (I have also tested a nested bucket:
> s3a://test1/test2/test3/testfile.csv)
>
>
> Object's content:
>
> =====================
> name int1 int2
> first 1 2
> second 3 4
> =====================
>
>
> Here is the config I have set so far:
>
> ("spark.jars.packages", "org.apache.hadoop:hadoop-aws:3.3.6")
> ("spark.hadoop.fs.s3a.access.key", "R*************6")
> ("spark.hadoop.fs.s3a.secret.key", "1***************e")
> ("spark.hadoop.fs.s3a.endpoint", "192.168.52.63:8000")
> ("spark.hadoop.fs.s3a.path.style.access", "true")
> ("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
>
>
> The outop for my following Pyspark application:
> df = spark.read \
> .option("header", "true") \
> .schema(schema) \
> .csv("s3a://input/testfile.csv", sep=' ')
>
> df.show(n=1)
> ==================================
> 24/05/20 02:35:00 INFO MetricsSystemImpl: s3a-file-system metrics system
> started24/05/20 02:35:01 INFO MetadataLogFileIndex: Reading streaming file
> log from s3a://input/testfile.csv/_spark_metadata24/05/20 02:35:01 INFO
> FileStreamSinkLog: BatchIds found from listing:24/05/20 02:35:03 INFO
> FileSourceStrategy: Pushed Filters:24/05/20 02:35:03 INFO FileSourceStrategy:
> Post-Scan Filters:24/05/20 02:35:03 INFO CodeGenerator: Code generated in
> 176.139675 ms24/05/20 02:35:03 INFO MemoryStore: Block broadcast_0 stored as
> values in memory (estimated size 496.6 KiB, free 4.1 GiB)24/05/20 02:35:03
> INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory
> (estimated size 54.4 KiB, free 4.1 GiB)24/05/20 02:35:03 INFO
> BlockManagerInfo: Added broadcast_0_piece0 in memory on master:38197 (size:
> 54.4 KiB, free: 4.1 GiB)24/05/20 02:35:03 INFO SparkContext: Created
> broadcast 0 from showString at NativeMethodAccessorImpl.java:024/05/20
> 02:35:03 INFO FileSourceScanExec: Planning scan with bin packing, max size:
> 4194304 bytes, open cost is considered as scanning 4194304 bytes.
> +----+----+----+
> |name|int1|int2|
> +----+----+----+
> +----+----+----+
> 24/05/20 02:35:04 INFO SparkContext: Invoking stop() from shutdown
> hook24/05/20 02:35:04 INFO SparkContext: SparkContext is stopping with
> exitCode 0
> =========================================
>
> Am I missing something here?
>
> P.S. I see OP_IS_DIRECTORY is set to 1. Is that a correct behavior?
>
>
> Thanks in advance!
>
>