Sai Allu created SPARK-43816:
--------------------------------

             Summary: Spark Corrupts Data In-Transit for High Volume (> 20 
TB/hr) of Data
                 Key: SPARK-43816
                 URL: https://issues.apache.org/jira/browse/SPARK-43816
             Project: Spark
          Issue Type: Bug
          Components: PySpark
    Affects Versions: 3.3.1
         Environment: We are using Azure Synapse Analytics. Within that, we 
have provisioned a Spark Pool with 101 nodes. 100 nodes are used for the 
executors and 1 node is for the driver. Each node is what Synapse Analytics 
calls a "Memory Optimized Medium size node". This means each node has 8 vCores 
and 64 GB memory. The Spark Pool does not do dynamic allocation of executors 
(101 nodes are created at the start and present throughout the Spark job). 
Synapse has something called "Intelligent Cache," but we disabled it (set to 
0%). The nodes all use Spark 3.3.1.5.2-90111858. If you need details on any 
specific Spark settings, I can get that for you. Mostly we are just using the 
defaults.
            Reporter: Sai Allu


h1. Bug Context

Hello! I would like to report a bug that my team noticed while we were using 
Spark (please see the Environment section to see our exact setup).

The application we built is meant to convert a large number of JSON files (JSON 
Lines format) and write them to a Delta table. The JSON files are located in an 
Azure Data Lake Gen 2 +without+ hierarchical namespacing. The Delta table is in 
an Azure Data Lake Gen 2 +with+ hierarchical namespacing.

We have a PySpark notebook in our Synapse Analytics workspace which reads the 
JSON files into a DataFrame and then writes them to the Delta table. It uses 
batch processing.

The JSON files have {+}no corrupt records{+}, we checked them thoroughly. And 
there are no code flaws in our PySpark notebook, we also checked that.

Our code reads 15 TB of JSON files (each file is about 400 MB in size) into our 
PySpark DataFrame in the following way.
{code:java}
originalDF = (  
spark.read
    .schema(originDataSchema)
    .option("pathGlobFilter", DESIRED_FILE_PATTERN)
    .option("mode", "PERMISSIVE")
    .option("columnNameOfCorruptRecord", "DiscoveredCorruptRecords")
    .option("badRecordsPath", BAD_RECORDS_PATH)
    .json(ORIGIN_FILES_PATH)
) {code}
To read this data and then write it to a Delta table takes about 37 minutes.

The problem that we noticed is that as the data is read into the PySpark 
DataFrame, a small percent of it becomes corrupted. Only about 1 in 10 million 
records become corrupted. This is just a made-up example to illustrate the 
point:
{code:java}
// The original JSON record looks like this
{ "Name": "Robert", "Email": "b...@gmail.com", "Nickname": "Bob" }

// When we look in the PySpark DataFrame we see this (for a small percent of 
records)
{ "Name": "Robertbob@", "Email": "gmail.com", "Nickname": "Bob" }{code}
 

Essentially, the spark.read() has some deserialization problem that only 
emerges for high data throughput (> 20 TB/hr).

When we tried using a smaller dataset (1/4 the size), it didn't show any signs 
of corruption.

When we use the same exact code and then parse just one JSON file which 
contains the record mentioned above, everything works perfectly fine.

The spark.read() corruption is also not deterministic. If we re-run the 20 
TB/hr test, we still see corruption but in different records.

 
h1. Our Temporary Solution

What we noticed is that the "spark.sql.files.maxPartitionBytes" was by default 
set to 128 MB. This meant that for the average JSON files we were reading - 
which was 400 MB - Spark was making four calls to the Azure Data Lake and 
fetching a [byte 
range|https://learn.microsoft.com/en-us/rest/api/storageservices/get-file#:~:text=Range-,Optional.%20Returns%20file%20data%20only%20from%20the%20specified%20byte%20range.,-x%2Dms%2Drange]
 (i.e. the 1st call got bytes 0-128MB, the 2nd call got bytes 128MB-256MB, 
etc.).

We increased "spark.sql.files.maxPartitionBytes" to a large number (1 GB) and 
that made the data corruption problem go away.

 
h1. How We Think You Can Fix This

>From my understanding, when Spark makes a call for a byte range, it will often 
>"cut off" the data in the middle of a JSON record. Our JSON files are in the 
>JSON Lines format and they contain thousands of lines, each with a JSON 
>record. So calling a byte range from 0 - 128MB will most likely mean that the 
>cutoff point is right in the middle of a JSON record.

Spark seems to have some code logic which handles this by only processing the 
"full lines" that are received. But this logic seems to be failing a small 
percent of the time. Specifically, we have about 50,000 JSON files, that means 
~200,000 byte range calls are being made. And spark.read() is creating about 
150 corrupt records.

So we think you should look at the Spark code which is doing this "cut off" 
handling for byte ranges and see if there's something missing there. Or 
something in the deserialization logic of spark.read().

Again, this bug only emerges for high volumes of data transfer (> 20 TB/hr).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to