[ https://issues.apache.org/jira/browse/SPARK-43816?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17736811#comment-17736811 ]
Yuming Wang commented on SPARK-43816: ------------------------------------- [~saiallu2020] This is similar to spark.sql.files.maxPartitionBytes. > Spark Corrupts Data In-Transit for High Volume (> 20 TB/hr) of Data > ------------------------------------------------------------------- > > Key: SPARK-43816 > URL: https://issues.apache.org/jira/browse/SPARK-43816 > Project: Spark > Issue Type: Bug > Components: PySpark > Affects Versions: 3.3.1 > Environment: We are using Azure Synapse Analytics. Within that, we > have provisioned a Spark Pool with 101 nodes. 100 nodes are used for the > executors and 1 node is for the driver. Each node is what Synapse Analytics > calls a "Memory Optimized Medium size node". This means each node has 8 > vCores and 64 GB memory. The Spark Pool does not do dynamic allocation of > executors (101 nodes are created at the start and present throughout the > Spark job). Synapse has something called "Intelligent Cache," but we disabled > it (set to 0%). The nodes all use Spark 3.3.1.5.2-90111858. If you need > details on any specific Spark settings, I can get that for you. Mostly we are > just using the defaults. > Reporter: Sai Allu > Priority: Minor > Labels: correctness > > h1. Bug Context > Hello! I would like to report a bug that my team noticed while we were using > Spark (please see the Environment section to see our exact setup). > The application we built is meant to convert a large number of JSON files > (JSON Lines format) and write them to a Delta table. The JSON files are > located in an Azure Data Lake Gen 2 +without+ hierarchical namespacing. The > Delta table is in an Azure Data Lake Gen 2 +with+ hierarchical namespacing. > We have a PySpark notebook in our Synapse Analytics workspace which reads the > JSON files into a DataFrame and then writes them to the Delta table. It uses > batch processing. > The JSON files have {+}no corrupt records{+}, we checked them thoroughly. And > there are no code flaws in our PySpark notebook, we also checked that. > Our code reads 15 TB of JSON files (each file is about 400 MB in size) into > our PySpark DataFrame in the following way. > {code:java} > originalDF = ( > spark.read > .schema(originDataSchema) > .option("pathGlobFilter", DESIRED_FILE_PATTERN) > .option("mode", "PERMISSIVE") > .option("columnNameOfCorruptRecord", "DiscoveredCorruptRecords") > .option("badRecordsPath", BAD_RECORDS_PATH) > .json(ORIGIN_FILES_PATH) > ) {code} > To read this data and then write it to a Delta table takes about 37 minutes. > The problem that we noticed is that as the data is read into the PySpark > DataFrame, a small percent of it becomes corrupted. Only about 1 in 10 > million records become corrupted. This is just a made-up example to > illustrate the point: > {code:java} > // The original JSON record looks like this > { "Name": "Robert", "Email": "b...@gmail.com", "Nickname": "Bob" } > // When we look in the PySpark DataFrame we see this (for a small percent of > records) > { "Name": "Robertbob@", "Email": "gmail.com", "Nickname": "Bob" }{code} > > Essentially, the spark.read() has some deserialization problem that only > emerges for high data throughput (> 20 TB/hr). > When we tried using a smaller dataset (1/4 the size), it didn't show any > signs of corruption. > When we use the same exact code and then parse just one JSON file which > contains the record mentioned above, everything works perfectly fine. > The spark.read() corruption is also not deterministic. If we re-run the 20 > TB/hr test, we still see corruption but in different records. > > h1. Our Temporary Solution > What we noticed is that the "spark.sql.files.maxPartitionBytes" was by > default set to 128 MB. This meant that for the average JSON files we were > reading - which was 400 MB - Spark was making four calls to the Azure Data > Lake and fetching a [byte > range|https://learn.microsoft.com/en-us/rest/api/storageservices/get-file#:~:text=Range-,Optional.%20Returns%20file%20data%20only%20from%20the%20specified%20byte%20range.,-x%2Dms%2Drange] > (i.e. the 1st call got bytes 0-128MB, the 2nd call got bytes 128MB-256MB, > etc.). > We increased "spark.sql.files.maxPartitionBytes" to a large number (1 GB) and > that made the data corruption problem go away. > > h1. How We Think You Can Fix This > From my understanding, when Spark makes a call for a byte range, it will > often "cut off" the data in the middle of a JSON record. Our JSON files are > in the JSON Lines format and they contain thousands of lines, each with a > JSON record. So calling a byte range from 0 - 128MB will most likely mean > that the cutoff point is right in the middle of a JSON record. > Spark seems to have some code logic which handles this by only processing the > "full lines" that are received. But this logic seems to be failing a small > percent of the time. Specifically, we have about 50,000 JSON files, that > means ~200,000 byte range calls are being made. And spark.read() is creating > about 150 corrupt records. > So we think you should look at the Spark code which is doing this "cut off" > handling for byte ranges and see if there's something missing there. Or > something in the deserialization logic of spark.read(). > Again, this bug only emerges for high volumes of data transfer (> 20 TB/hr). > This could be a "race condition" or some kind of performance-related bug. -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org