[ 
https://issues.apache.org/jira/browse/SPARK-43816?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17736811#comment-17736811
 ] 

Yuming Wang commented on SPARK-43816:
-------------------------------------

[~saiallu2020] This is similar to spark.sql.files.maxPartitionBytes.

> Spark Corrupts Data In-Transit for High Volume (> 20 TB/hr) of Data
> -------------------------------------------------------------------
>
>                 Key: SPARK-43816
>                 URL: https://issues.apache.org/jira/browse/SPARK-43816
>             Project: Spark
>          Issue Type: Bug
>          Components: PySpark
>    Affects Versions: 3.3.1
>         Environment: We are using Azure Synapse Analytics. Within that, we 
> have provisioned a Spark Pool with 101 nodes. 100 nodes are used for the 
> executors and 1 node is for the driver. Each node is what Synapse Analytics 
> calls a "Memory Optimized Medium size node". This means each node has 8 
> vCores and 64 GB memory. The Spark Pool does not do dynamic allocation of 
> executors (101 nodes are created at the start and present throughout the 
> Spark job). Synapse has something called "Intelligent Cache," but we disabled 
> it (set to 0%). The nodes all use Spark 3.3.1.5.2-90111858. If you need 
> details on any specific Spark settings, I can get that for you. Mostly we are 
> just using the defaults.
>            Reporter: Sai Allu
>            Priority: Minor
>              Labels: correctness
>
> h1. Bug Context
> Hello! I would like to report a bug that my team noticed while we were using 
> Spark (please see the Environment section to see our exact setup).
> The application we built is meant to convert a large number of JSON files 
> (JSON Lines format) and write them to a Delta table. The JSON files are 
> located in an Azure Data Lake Gen 2 +without+ hierarchical namespacing. The 
> Delta table is in an Azure Data Lake Gen 2 +with+ hierarchical namespacing.
> We have a PySpark notebook in our Synapse Analytics workspace which reads the 
> JSON files into a DataFrame and then writes them to the Delta table. It uses 
> batch processing.
> The JSON files have {+}no corrupt records{+}, we checked them thoroughly. And 
> there are no code flaws in our PySpark notebook, we also checked that.
> Our code reads 15 TB of JSON files (each file is about 400 MB in size) into 
> our PySpark DataFrame in the following way.
> {code:java}
> originalDF = (  
> spark.read
>     .schema(originDataSchema)
>     .option("pathGlobFilter", DESIRED_FILE_PATTERN)
>     .option("mode", "PERMISSIVE")
>     .option("columnNameOfCorruptRecord", "DiscoveredCorruptRecords")
>     .option("badRecordsPath", BAD_RECORDS_PATH)
>     .json(ORIGIN_FILES_PATH)
> ) {code}
> To read this data and then write it to a Delta table takes about 37 minutes.
> The problem that we noticed is that as the data is read into the PySpark 
> DataFrame, a small percent of it becomes corrupted. Only about 1 in 10 
> million records become corrupted. This is just a made-up example to 
> illustrate the point:
> {code:java}
> // The original JSON record looks like this
> { "Name": "Robert", "Email": "b...@gmail.com", "Nickname": "Bob" }
> // When we look in the PySpark DataFrame we see this (for a small percent of 
> records)
> { "Name": "Robertbob@", "Email": "gmail.com", "Nickname": "Bob" }{code}
>  
> Essentially, the spark.read() has some deserialization problem that only 
> emerges for high data throughput (> 20 TB/hr).
> When we tried using a smaller dataset (1/4 the size), it didn't show any 
> signs of corruption.
> When we use the same exact code and then parse just one JSON file which 
> contains the record mentioned above, everything works perfectly fine.
> The spark.read() corruption is also not deterministic. If we re-run the 20 
> TB/hr test, we still see corruption but in different records.
>  
> h1. Our Temporary Solution
> What we noticed is that the "spark.sql.files.maxPartitionBytes" was by 
> default set to 128 MB. This meant that for the average JSON files we were 
> reading - which was 400 MB - Spark was making four calls to the Azure Data 
> Lake and fetching a [byte 
> range|https://learn.microsoft.com/en-us/rest/api/storageservices/get-file#:~:text=Range-,Optional.%20Returns%20file%20data%20only%20from%20the%20specified%20byte%20range.,-x%2Dms%2Drange]
>  (i.e. the 1st call got bytes 0-128MB, the 2nd call got bytes 128MB-256MB, 
> etc.).
> We increased "spark.sql.files.maxPartitionBytes" to a large number (1 GB) and 
> that made the data corruption problem go away.
>  
> h1. How We Think You Can Fix This
> From my understanding, when Spark makes a call for a byte range, it will 
> often "cut off" the data in the middle of a JSON record. Our JSON files are 
> in the JSON Lines format and they contain thousands of lines, each with a 
> JSON record. So calling a byte range from 0 - 128MB will most likely mean 
> that the cutoff point is right in the middle of a JSON record.
> Spark seems to have some code logic which handles this by only processing the 
> "full lines" that are received. But this logic seems to be failing a small 
> percent of the time. Specifically, we have about 50,000 JSON files, that 
> means ~200,000 byte range calls are being made. And spark.read() is creating 
> about 150 corrupt records.
> So we think you should look at the Spark code which is doing this "cut off" 
> handling for byte ranges and see if there's something missing there. Or 
> something in the deserialization logic of spark.read().
> Again, this bug only emerges for high volumes of data transfer (> 20 TB/hr). 
> This could be a "race condition" or some kind of performance-related bug.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to