[jira] [Updated] (SPARK-22286) OutOfMemoryError caused by memory leak and large serializer batch size in ExternalAppendOnlyMap
[ https://issues.apache.org/jira/browse/SPARK-22286?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sean Owen updated SPARK-22286: -- Priority: Major (was: Critical) > OutOfMemoryError caused by memory leak and large serializer batch size in > ExternalAppendOnlyMap > --- > > Key: SPARK-22286 > URL: https://issues.apache.org/jira/browse/SPARK-22286 > Project: Spark > Issue Type: Bug > Components: Shuffle, Spark Core >Affects Versions: 2.1.1, 2.1.2 >Reporter: Lijie Xu >Priority: Major > > *[Abstract]* > I recently encountered an OOM error in a simple _groupByKey_ application. > After profiling the application, I found the OOM error is related to the > shuffle spill and records (de)serialization. After analyzing the OOM heap > dump, I found the root causes are (1) memory leak in ExternalAppendOnlyMap, > (2) large static serializer batch size (_spark.shuffle.spill.batchSize_ > =1) defined in ExternalAppendOnlyMap, and (3) memory leak in the > deserializer. Since almost all the Spark applications rely on > ExternalAppendOnlyMap to perform shuffle and reduce, this is a critical > bug/defect. In the following sections, I will detail the testing application, > data, environment, failure symptoms, diagnosing procedure, identified root > causes, and potential solutions. > *[Application]* > This is a simple GroupBy application as follows. > {code} > table.map(row => (row.sourceIP[1,7], row)).groupByKey().saveAsTextFile() > {code} > The _sourceIP_ (an IP address like 127.100.101.102) is a column of the > _UserVisits_ table. This application has the same logic as the aggregation > query in Berkeley SQL benchmark (https://amplab.cs.berkeley.edu/benchmark/) > as follows. > {code} > SELECT * FROM UserVisits > GROUP BY SUBSTR(sourceIP, 1, 7); > {code} > The application code is available at \[1\]. > *[Data]* > The UserVisits table size is 16GB (9 columns, 132,000,000 rows) with uniform > distribution. The HDFS block size is 128MB. The data generator is available > at \[2\]. > *[Environment]* > Spark 2.1 (Spark 2.2 may also have this error), Oracle Java Hotspot 1.8.0, 1 > master and 8 workers as follows. > !https://raw.githubusercontent.com/JerryLead/Misc/master/SparkPRFigures/OOM/Workers.png|width=100%! > This application launched 32 executors. Each executor has 1 core and 7GB > memory. The detailed application configuration is > {code} >total-executor-cores = 32 >executor-cores = 1 >executor-memory = 7G >spark.default.parallelism=32 >spark.serializer = JavaSerializer (KryoSerializer also has OOM error) > {code} > *[Failure symptoms]* > This application has a map stage and a reduce stage. An OOM error occurs in a > reduce task (Task-17) as follows. > !https://raw.githubusercontent.com/JerryLead/Misc/master/SparkPRFigures/OOM/Stage.png|width=100%! > !https://raw.githubusercontent.com/JerryLead/Misc/master/SparkPRFigures/OOM/Tasks.png|width=100%! > > Task-17 generated an OOM error. It shuffled ~1GB data and spilled 3.6GB data > onto the disk. > Task-17 log below shows that this task is reading the next record by invoking > _ExternalAppendOnlyMap.hasNext_(). From the OOM stack traces and the above > shuffle metrics, we cannot identify the OOM root causes. > !https://raw.githubusercontent.com/JerryLead/Misc/master/SparkPRFigures/OOM/OOMStackTrace.png|width=100%! > > A question is that why Task-17 still suffered OOM errors even after spilling > large in-memory data onto the disk. > *[Diagnosing procedure]* > Since each executor has 1 core and 7GB, it runs only one task at a time and > the task memory usage is going to exceed 7GB. > *1: Identify the error phase* > I added some debug logs in Spark, and found that the error phase is not the > spill phase but the memory-disk-merge phase. > The memory-disk-merge phase: Spark reads back the spilled records (as shown > in ① Figure 1), merges the spilled records with the in-memory records (as > shown in ②), generates new records, and output the new records onto HDFS (as > shown in ③). > *2. Dataflow and memory usage analysis* > I added some profiling code and obtained dataflow and memory usage metrics as > follows. Ki represents the _i_-th key, Ri represents the _i_-th row in the > table. > !https://raw.githubusercontent.com/JerryLead/Misc/master/SparkPRFigures/OOM/DataflowAndMemoryUsage.png|width=100%! > Figure 1: Dataflow and Memory Usage Analysis (see > https://github.com/JerryLead/Misc/blob/master/SparkPRFigures/OOM/SPARK-22286-OOM.pdf > for the high-definition version) > The concrete phases with metrics are as follows. > *[Shuffle read]* records = 7,540,235, bytes = 903 MB > *[In-memory store]* As shown in the following log, about 5,243,424 o
[jira] [Updated] (SPARK-22286) OutOfMemoryError caused by memory leak and large serializer batch size in ExternalAppendOnlyMap
[ https://issues.apache.org/jira/browse/SPARK-22286?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lijie Xu updated SPARK-22286: - Description: *[Abstract]* I recently encountered an OOM error in a simple _groupByKey_ application. After profiling the application, I found the OOM error is related to the shuffle spill and records (de)serialization. After analyzing the OOM heap dump, I found the root causes are (1) memory leak in ExternalAppendOnlyMap, (2) large static serializer batch size (_spark.shuffle.spill.batchSize_ =1) defined in ExternalAppendOnlyMap, and (3) memory leak in the deserializer. Since almost all the Spark applications rely on ExternalAppendOnlyMap to perform shuffle and reduce, this is a critical bug/defect. In the following sections, I will detail the testing application, data, environment, failure symptoms, diagnosing procedure, identified root causes, and potential solutions. *[Application]* This is a simple GroupBy application as follows. {code} table.map(row => (row.sourceIP[1,7], row)).groupByKey().saveAsTextFile() {code} The _sourceIP_ (an IP address like 127.100.101.102) is a column of the _UserVisits_ table. This application has the same logic as the aggregation query in Berkeley SQL benchmark (https://amplab.cs.berkeley.edu/benchmark/) as follows. {code} SELECT * FROM UserVisits GROUP BY SUBSTR(sourceIP, 1, 7); {code} The application code is available at \[1\]. *[Data]* The UserVisits table size is 16GB (9 columns, 132,000,000 rows) with uniform distribution. The HDFS block size is 128MB. The data generator is available at \[2\]. *[Environment]* Spark 2.1 (Spark 2.2 may also have this error), Oracle Java Hotspot 1.8.0, 1 master and 8 workers as follows. !https://raw.githubusercontent.com/JerryLead/Misc/master/SparkPRFigures/OOM/Workers.png|width=100%! This application launched 32 executors. Each executor has 1 core and 7GB memory. The detailed application configuration is {code} total-executor-cores = 32 executor-cores = 1 executor-memory = 7G spark.default.parallelism=32 spark.serializer = JavaSerializer (KryoSerializer also has OOM error) {code} *[Failure symptoms]* This application has a map stage and a reduce stage. An OOM error occurs in a reduce task (Task-17) as follows. !https://raw.githubusercontent.com/JerryLead/Misc/master/SparkPRFigures/OOM/Stage.png|width=100%! !https://raw.githubusercontent.com/JerryLead/Misc/master/SparkPRFigures/OOM/Tasks.png|width=100%! Task-17 generated an OOM error. It shuffled ~1GB data and spilled 3.6GB data onto the disk. Task-17 log below shows that this task is reading the next record by invoking _ExternalAppendOnlyMap.hasNext_(). From the OOM stack traces and the above shuffle metrics, we cannot identify the OOM root causes. !https://raw.githubusercontent.com/JerryLead/Misc/master/SparkPRFigures/OOM/OOMStackTrace.png|width=100%! A question is that why Task-17 still suffered OOM errors even after spilling large in-memory data onto the disk. *[Diagnosing procedure]* Since each executor has 1 core and 7GB, it runs only one task at a time and the task memory usage is going to exceed 7GB. *1: Identify the error phase* I added some debug logs in Spark, and found that the error phase is not the spill phase but the memory-disk-merge phase. The memory-disk-merge phase: Spark reads back the spilled records (as shown in ① Figure 1), merges the spilled records with the in-memory records (as shown in ②), generates new records, and output the new records onto HDFS (as shown in ③). *2. Dataflow and memory usage analysis* I added some profiling code and obtained dataflow and memory usage metrics as follows. Ki represents the _i_-th key, Ri represents the _i_-th row in the table. !https://raw.githubusercontent.com/JerryLead/Misc/master/SparkPRFigures/OOM/DataflowAndMemoryUsage.png|width=100%! Figure 1: Dataflow and Memory Usage Analysis (see https://github.com/JerryLead/Misc/blob/master/SparkPRFigures/OOM/SPARK-22286-OOM.pdf for the high-definition version) The concrete phases with metrics are as follows. *[Shuffle read]* records = 7,540,235, bytes = 903 MB *[In-memory store]* As shown in the following log, about 5,243,424 of the 7,540,235 records are aggregated to 60 records in AppendOnlyMap. Each record is about 60MB. There are only 60 distinct keys in the shuffled records. !https://raw.githubusercontent.com/JerryLead/Misc/master/SparkPRFigures/OOM/SpilledRecords.png|width=100%! *[Spill]* Since 3.6 GB has achieved the spill threshold, Spark spills the 60 records onto the disk. Since _60 < serializerBatchSize_ (default 10,000), all the 60 records are serialized into the SerializeBuffer and then written onto the disk as a file segment. The 60 serialized records are about 581 MB (this is an estimated size, while the real size maybe larger). !https://raw.githubuserconte
[jira] [Updated] (SPARK-22286) OutOfMemoryError caused by memory leak and large serializer batch size in ExternalAppendOnlyMap
[ https://issues.apache.org/jira/browse/SPARK-22286?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lijie Xu updated SPARK-22286: - Description: *[Abstract]* I recently encountered an OOM error in a simple _groupByKey_ application. After profiling the application, I found the OOM error is related to the shuffle spill and records (de)serialization. After analyzing the OOM heap dump, I found the root causes are (1) memory leak in ExternalAppendOnlyMap, (2) large static serializer batch size (_spark.shuffle.spill.batchSize_ =1) defined in ExternalAppendOnlyMap, and (3) memory leak in the deserializer. Since almost all the Spark applications rely on ExternalAppendOnlyMap to perform shuffle and reduce, this is a critical bug/defect. In the following sections, I will detail the testing application, data, environment, failure symptoms, diagnosing procedure, identified root causes, and potential solutions. *[Application]* This is a simple GroupBy application as follows. {code} table.map(row => (row.sourceIP[1,7], row)).groupByKey().saveAsTextFile() {code} The _sourceIP_ (an IP address like 127.100.101.102) is a column of the _UserVisits_ table. This application has the same logic as the aggregation query in Berkeley SQL benchmark (https://amplab.cs.berkeley.edu/benchmark/) as follows. {code} SELECT * FROM UserVisits GROUP BY SUBSTR(sourceIP, 1, 7); {code} The application code is available at \[1\]. *[Data]* The UserVisits table size is 16GB (9 columns, 132,000,000 rows) with uniform distribution. The HDFS block size is 128MB. The data generator is available at \[2\]. *[Environment]* Spark 2.1 (Spark 2.2 may also have this error), Oracle Java Hotspot 1.8.0, 1 master and 8 workers as follows. !https://raw.githubusercontent.com/JerryLead/Misc/master/SparkPRFigures/OOM/Workers.png|width=100%! This application launched 32 executors. Each executor has 1 core and 7GB memory. The detailed application configuration is {code} total-executor-cores = 32 executor-cores = 1 executor-memory = 7G spark.default.parallelism=32 spark.serializer = JavaSerializer (KryoSerializer also has OOM error) {code} *[Failure symptoms]* This application has a map stage and a reduce stage. An OOM error occurs in a reduce task (Task-17) as follows. !https://raw.githubusercontent.com/JerryLead/Misc/master/SparkPRFigures/OOM/Stage.png|width=100%! !https://raw.githubusercontent.com/JerryLead/Misc/master/SparkPRFigures/OOM/Tasks.png|width=100%! Task-17 generated an OOM error. It shuffled ~1GB data and spilled 3.6GB data onto the disk. Task-17 log below shows that this task is reading the next record by invoking _ExternalAppendOnlyMap.hasNext_(). From the OOM stack traces and the above shuffle metrics, we cannot identify the OOM root causes. !https://raw.githubusercontent.com/JerryLead/Misc/master/SparkPRFigures/OOM/OOMStackTrace.png|width=100%! A question is that why Task-17 still suffered OOM errors even after spilling large in-memory data onto the disk. *[Diagnosing procedure]* Since each executor has 1 core and 7GB, it runs only one task at a time and the task memory usage is going to exceed 7GB. *1: Identify the error phase* I added some debug logs in Spark, and found that the error phase is not the spill phase but the memory-disk-merge phase. The memory-disk-merge phase: Spark reads back the spilled records (as shown in ① Figure 1), merges the spilled records with the in-memory records (as shown in ②), generates new records, and output the new records onto HDFS (as shown in ③). *2. Dataflow and memory usage analysis* I added some profiling code and obtained dataflow and memory usage metrics as follows. Ki represents the _i_-th key, Ri represents the _i_-th row in the table. !https://raw.githubusercontent.com/JerryLead/Misc/master/SparkPRFigures/OOM/DataflowAndMemoryUsage.png|width=100%! Figure 1: Dataflow and Memory Usage Analysis (see https://github.com/JerryLead/Misc/blob/master/SparkPRFigures/OOM/SPARK-22286-OOM.pdf for the high-definition version) The concrete phases with metrics are as follows. *[Shuffle read]* records = 7,540,235, bytes = 903 MB *[In-memory store]* As shown in the following log, about 5,243,424 of the 7,540,235 records are aggregated to 60 records in AppendOnlyMap. Each record is about 60MB. There are only 60 distinct keys in the shuffled records. !https://raw.githubusercontent.com/JerryLead/Misc/master/SparkPRFigures/OOM/SpilledRecords.png|width=100%! *[Spill]* Since 3.6 GB has achieved the spill threshold, Spark spills the 60 records onto the disk. Since _60 < serializerBatchSize_ (default 10,000), all the 60 records are serialized into the SerializeBuffer and then written onto the disk as a file segment. The 60 serialized records are about 581 MB (this is an estimated size, while the real size maybe larger). !https://raw.githubuserconte
[jira] [Updated] (SPARK-22286) OutOfMemoryError caused by memory leak and large serializer batch size in ExternalAppendOnlyMap
[ https://issues.apache.org/jira/browse/SPARK-22286?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lijie Xu updated SPARK-22286: - Description: *[Abstract]* I recently encountered an OOM error in a simple _groupByKey_ application. After profiling the application, I found the OOM error is related to the shuffle spill and records (de)serialization. After analyzing the OOM heap dump, I found the root causes are (1) memory leak in ExternalAppendOnlyMap, (2) large static serializer batch size (_spark.shuffle.spill.batchSize_ =1) defined in ExternalAppendOnlyMap, and (3) memory leak in the deserializer. Since almost all the Spark applications rely on ExternalAppendOnlyMap to perform shuffle and reduce, this is a critical bug/defect. In the following sections, I will detail the testing application, data, environment, failure symptoms, diagnosing procedure, identified root causes, and potential solutions. *[Application]* This is a simple GroupBy application as follows. {code} table.map(row => (row.sourceIP[1,7], row)).groupByKey().saveAsTextFile() {code} The _sourceIP_ (an IP address like 127.100.101.102) is a column of the _UserVisits_ table. This application has the same logic as the aggregation query in Berkeley SQL benchmark (https://amplab.cs.berkeley.edu/benchmark/) as follows. {code} SELECT * FROM UserVisits GROUP BY SUBSTR(sourceIP, 1, 7); {code} The application code is available at \[1\]. *[Data]* The UserVisits table size is 16GB (9 columns, 132,000,000 rows) with uniform distribution. The HDFS block size is 128MB. The data generator is available at \[2\]. *[Environment]* Spark 2.1 (Spark 2.2 may also have this error), Oracle Java Hotspot 1.8.0, 1 master and 8 workers as follows. !https://raw.githubusercontent.com/JerryLead/Misc/master/SparkPRFigures/OOM/Workers.png|width=100%! This application launched 32 executors. Each executor has 1 core and 7GB memory. The detailed application configuration is {code} total-executor-cores = 32 executor-cores = 1 executor-memory = 7G spark.default.parallelism=32 spark.serializer = JavaSerializer (KryoSerializer also has OOM error) {code} *[Failure symptoms]* This application has a map stage and a reduce stage. An OOM error occurs in a reduce task (Task-17) as follows. !https://raw.githubusercontent.com/JerryLead/Misc/master/SparkPRFigures/OOM/Stage.png|width=100%! !https://raw.githubusercontent.com/JerryLead/Misc/master/SparkPRFigures/OOM/Tasks.png|width=100%! Task-17 generated an OOM error. It shuffled ~1GB data and spilled 3.6GB data onto the disk. Task-17 log below shows that this task is reading the next record by invoking _ExternalAppendOnlyMap.hasNext_(). From the OOM stack traces and the above shuffle metrics, we cannot identify the OOM root causes. !https://raw.githubusercontent.com/JerryLead/Misc/master/SparkPRFigures/OOM/OOMStackTrace.png|width=100%! A question is that why Task-17 still suffered OOM errors even after spilling large in-memory data onto the disk. *[Diagnosing procedure]* Since each executor has 1 core and 7GB, it runs only one task at a time and the task memory usage is going to exceed 7GB. *1: Identify the error phase* I added some debug logs in Spark, and found that the error phase is not the spill phase but the memory-disk-merge phase. The memory-disk-merge phase: Spark reads back the spilled records (as shown in ① Figure 1), merges the spilled records with the in-memory records (as shown in ②), generates new records, and output the new records onto HDFS (as shown in ③). *2. Dataflow and memory usage analysis* I added some profiling code and obtained dataflow and memory usage metrics as follows. Ki represents the _i_-th key, Ri represents the _i_-th row in the table. !https://raw.githubusercontent.com/JerryLead/Misc/master/SparkPRFigures/OOM/DataflowAndMemoryUsage.png|width=100%! Figure 1: Dataflow and Memory Usage Analysis (see https://github.com/JerryLead/Misc/blob/master/SparkPRFigures/OOM/SPARK-22286-OOM.pdf for the high-definition version) The concrete phases with metrics are as follows. *[Shuffle read]* records = 7,540,235, bytes = 903 MB *[In-memory store]* As shown in the following log, about 5,243,424 of the 7,540,235 records are aggregated to 60 records in AppendOnlyMap. Each record is about 60MB. There are only 60 distinct keys in the shuffled records. !https://raw.githubusercontent.com/JerryLead/Misc/master/SparkPRFigures/OOM/SpilledRecords.png|width=100%! *[Spill]* Since 3.6 GB has achieved the spill threshold, Spark spills the 60 records onto the disk. Since _60 < serializerBatchSize_ (default 10,000), all the 60 records are serialized into the SerializeBuffer and then written onto the disk as a file segment. The 60 serialized records are about 581 MB (this is an estimated size, while the real size maybe larger). !https://raw.githubuserconte
[jira] [Updated] (SPARK-22286) OutOfMemoryError caused by memory leak and large serializer batch size in ExternalAppendOnlyMap
[ https://issues.apache.org/jira/browse/SPARK-22286?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lijie Xu updated SPARK-22286: - Description: *[Abstract]* I recently encountered an OOM error in a simple _groupByKey_ application. After profiling the application, I found the OOM error is related to the shuffle spill and records (de)serialization. After analyzing the OOM heap dump, I found the root causes are (1) memory leak in ExternalAppendOnlyMap, (2) large static serializer batch size (_spark.shuffle.spill.batchSize_ =1) defined in ExternalAppendOnlyMap, and (3) memory leak in the deserializer. Since almost all the Spark applications rely on ExternalAppendOnlyMap to perform shuffle and reduce, this is a critical bug/defect. In the following sections, I will detail the testing application, data, environment, failure symptoms, diagnosing procedure, identified root causes, and potential solutions. *[Application]* This is a simple GroupBy application as follows. {code} table.map(row => (row.sourceIP[1,7], row)).groupByKey().saveAsTextFile() {code} The _sourceIP_ (an IP address like 127.100.101.102) is a column of the _UserVisits_ table. This application has the same logic as the aggregation query in Berkeley SQL benchmark (https://amplab.cs.berkeley.edu/benchmark/) as follows. {code} SELECT * FROM UserVisits GROUP BY SUBSTR(sourceIP, 1, 7); {code} The application code is available at \[1\]. *[Data]* The UserVisits table size is 16GB (9 columns, 132,000,000 rows) with uniform distribution. The HDFS block size is 128MB. The data generator is available at \[2\]. *[Environment]* Spark 2.1 (Spark 2.2 may also have this error), Oracle Java Hotspot 1.8.0, 1 master and 8 workers as follows. !https://raw.githubusercontent.com/JerryLead/Misc/master/SparkPRFigures/OOM/Workers.png|width=100%! This application launched 32 executors. Each executor has 1 core and 7GB memory. The detailed application configuration is {code} total-executor-cores = 32 executor-cores = 1 executor-memory = 7G spark.default.parallelism=32 spark.serializer = JavaSerializer (KryoSerializer also has OOM error) {code} *[Failure symptoms]* This application has a map stage and a reduce stage. An OOM error occurs in a reduce task (Task-17) as follows. !https://raw.githubusercontent.com/JerryLead/Misc/master/SparkPRFigures/OOM/Stage.png|width=100%! !https://raw.githubusercontent.com/JerryLead/Misc/master/SparkPRFigures/OOM/Tasks.png|width=100%! Task-17 generated an OOM error. It shuffled ~1GB data and spilled 3.6GB data onto the disk. Task-17 log below shows that this task is reading the next record by invoking _ExternalAppendOnlyMap.hasNext_(). From the OOM stack traces and the above shuffle metrics, we cannot identify the OOM root causes. !https://raw.githubusercontent.com/JerryLead/Misc/master/SparkPRFigures/OOM/OOMStackTrace.png|width=100%! A question is that why Task-17 still suffered OOM errors even after spilling large in-memory data onto the disk. *[Diagnosing procedure]* Since each executor has 1 core and 7GB, it runs only one task at a time and the task memory usage is going to exceed 7GB. *1: Identify the error phase* I added some debug logs in Spark, and found that the error phase is not the spill phase but the memory-disk-merge phase. The memory-disk-merge phase: Spark reads back the spilled records (as shown in ① Figure 1), merges the spilled records with the in-memory records (as shown in ②), generates new records, and output the new records onto HDFS (as shown in ③). *2. Dataflow and memory usage analysis* I added some profiling code and obtained dataflow and memory usage metrics as follows. Ki represents the _i_-th key, Ri represents the _i_-th row in the table. !https://raw.githubusercontent.com/JerryLead/Misc/master/SparkPRFigures/OOM/DataflowAndMemoryUsage.png|width=100%! Figure 1: Dataflow and Memory Usage Analysis The concrete phases with metrics are as follows. *[Shuffle read]* records = 7,540,235, bytes = 903 MB *[In-memory store]* As shown in the following log, about 5,243,424 of the 7,540,235 records are aggregated to 60 records in AppendOnlyMap. Each record is about 60MB. There are only 60 distinct keys in the shuffled records. !https://raw.githubusercontent.com/JerryLead/Misc/master/SparkPRFigures/OOM/SpilledRecords.png|width=100%! *[Spill]* Since 3.6 GB has achieved the spill threshold, Spark spills the 60 records onto the disk. Since _60 < serializerBatchSize_ (default 10,000), all the 60 records are serialized into the SerializeBuffer and then written onto the disk as a file segment. The 60 serialized records are about 581 MB (this is an estimated size, while the real size maybe larger). !https://raw.githubusercontent.com/JerryLead/Misc/master/SparkPRFigures/OOM/SpilledRecords2.png|width=100%! !https://raw.githubusercontent.com/JerryLead/
[jira] [Updated] (SPARK-22286) OutOfMemoryError caused by memory leak and large serializer batch size in ExternalAppendOnlyMap
[ https://issues.apache.org/jira/browse/SPARK-22286?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lijie Xu updated SPARK-22286: - Description: *[Abstract]* I recently encountered an OOM error in a simple _groupByKey_ application. After profiling the application, I found the OOM error is related to the shuffle spill and records (de)serialization. After analyzing the OOM heap dump, I found the root causes are (1) memory leak in ExternalAppendOnlyMap, (2) large static serializer batch size (_spark.shuffle.spill.batchSize_ =1) defined in ExternalAppendOnlyMap, and (3) memory leak in the deserializer. Since almost all the Spark applications rely on ExternalAppendOnlyMap to perform shuffle and reduce, this is a critical bug/defect. In the following sections, I will detail the testing application, data, environment, failure symptoms, diagnosing procedure, identified root causes, and potential solutions. *[Application]* This is a simple GroupBy application as follows. {code} table.map(row => (row.sourceIP[1,7], row)).groupByKey().saveAsTextFile() {code} The _sourceIP_ (an IP address like 127.100.101.102) is a column of the _UserVisits_ table. This application has the same logic as the aggregation query in Berkeley SQL benchmark (https://amplab.cs.berkeley.edu/benchmark/) as follows. {code} SELECT * FROM UserVisits GROUP BY SUBSTR(sourceIP, 1, 7); {code} The application code is available at \[1\]. *[Data]* The UserVisits table size is 16GB (9 columns, 132,000,000 rows) with uniform distribution. The HDFS block size is 128MB. The data generator is available at \[2\]. *[Environment]* Spark 2.1 (Spark 2.2 may also have this error), Oracle Java Hotspot 1.8.0, 1 master and 8 workers as follows. !https://raw.githubusercontent.com/JerryLead/Misc/master/SparkPRFigures/OOM/Workers.png|width=100%! This application launched 32 executors. Each executor has 1 core and 7GB memory. The detailed application configuration is {code} total-executor-cores = 32 executor-cores = 1 executor-memory = 7G spark.default.parallelism=32 spark.serializer = JavaSerializer (KryoSerializer also has OOM error) {code} *[Failure symptoms]* This application has a map stage and a reduce stage. An OOM error occurs in a reduce task (Task-17) as follows. !https://raw.githubusercontent.com/JerryLead/Misc/master/SparkPRFigures/OOM/Stage.png|width=100%! !https://raw.githubusercontent.com/JerryLead/Misc/master/SparkPRFigures/OOM/Tasks.png|width=100%! Task-17 generated an OOM error. It shuffled ~1GB data and spilled 3.6GB data onto the disk. Task-17 log below shows that this task is reading the next record by invoking _ExternalAppendOnlyMap.hasNext_(). From the OOM stack traces and the above shuffle metrics, we cannot identify the OOM root causes. !https://raw.githubusercontent.com/JerryLead/Misc/master/SparkPRFigures/OOM/OOMStackTrace.png|width=100%! A question is that why Task-17 still suffered OOM errors even after spilling large in-memory data onto the disk. *[Diagnosing procedure]* Since each executor has 1 core and 7GB, it runs only one task at a time and the task memory usage is going to exceed 7GB. *1: Identify the error phase* I added some debug logs in Spark, and found that the error phase is not the spill phase but the memory-disk-merge phase. The memory-disk-merge phase: Spark reads back the spilled records (as shown in ① Figure 1), merges the spilled records with the in-memory records (as shown in ②), generates new records, and output the new records onto HDFS (as shown in ③). *2. Dataflow and memory usage analysis* I added some profiling code and obtained dataflow and memory usage metrics as follows. Ki represents the _i_-th key, Ri represents the _i_-th row in the table. !https://raw.githubusercontent.com/JerryLead/Misc/master/SparkPRFigures/OOM/DataflowAndMemoryUsage.png|width=100%! Figure 1: Dataflow and Memory Usage Analysis The concrete phases with metrics are as follows. *[Shuffle read]* records = 7,540,235, bytes = 903 MB *[In-memory store]* As shown in the following log, about 5,243,424 of the 7,540,235 records are aggregated to 60 records in AppendOnlyMap. Each record is about 60MB. There are only 60 distinct keys in the shuffled records. !https://raw.githubusercontent.com/JerryLead/Misc/master/SparkPRFigures/OOM/SpilledRecords.png|width=100%! *[Spill]* Since 3.6 GB has achieved the spill threshold, Spark spills the 60 records onto the disk. Since _60 < serializerBatchSize_ (default 10,000), all the 60 records are serialized into the SerializeBuffer and then written onto the disk as a file segment. The 60 serialized records are about 581 MB (this is an estimated size, while the real size maybe larger). !https://raw.githubusercontent.com/JerryLead/Misc/master/SparkPRFigures/OOM/SpilledRecords2.png|width=100%! !https://raw.githubusercontent.com/JerryLead/
[jira] [Updated] (SPARK-22286) OutOfMemoryError caused by memory leak and large serializer batch size in ExternalAppendOnlyMap
[ https://issues.apache.org/jira/browse/SPARK-22286?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lijie Xu updated SPARK-22286: - Description: *[Abstract]* I recently encountered an OOM error in a simple _groupByKey_ application. After profiling the application, I found the OOM error is related to the shuffle spill and records (de)serialization. After analyzing the OOM heap dump, I found the root causes are (1) memory leak in ExternalAppendOnlyMap, (2) large static serializer batch size (_spark.shuffle.spill.batchSize_ =1) defined in ExternalAppendOnlyMap, and (3) memory leak in the deserializer. Since almost all the Spark applications rely on ExternalAppendOnlyMap to perform shuffle and reduce, this is a critical bug/defect. In the following sections, I will detail the testing application, data, environment, failure symptoms, diagnosing procedure, identified root causes, and potential solutions. *[Application]* This is a simple GroupBy application as follows. {code} table.map(row => (row.sourceIP[1,7], row)).groupByKey().saveAsTextFile() {code} The _sourceIP_ (an IP address like 127.100.101.102) is a column of the _UserVisits_ table. This application has the same logic as the aggregation query in Berkeley SQL benchmark (https://amplab.cs.berkeley.edu/benchmark/) as follows. {code} SELECT * FROM UserVisits GROUP BY SUBSTR(sourceIP, 1, 7); {code} The application code is available at \[1\]. *[Data]* The UserVisits table size is 16GB (9 columns, 132,000,000 rows) with uniform distribution. The HDFS block size is 128MB. The data generator is available at \[2\]. *[Environment]* Spark 2.1 (Spark 2.2 may also have this error), Oracle Java Hotspot 1.8.0, 1 master and 8 workers as follows. !https://raw.githubusercontent.com/JerryLead/Misc/master/SparkPRFigures/OOM/Workers.png|width=100%! This application launched 32 executors. Each executor has 1 core and 7GB memory. The detailed application configuration is {code} total-executor-cores = 32 executor-cores = 1 executor-memory = 7G spark.default.parallelism=32 spark.serializer = JavaSerializer (KryoSerializer also has OOM error) {code} *[Failure symptoms]* This application has a map stage and a reduce stage. An OOM error occurs in a reduce task (Task-17) as follows. !https://raw.githubusercontent.com/JerryLead/Misc/master/SparkPRFigures/OOM/Stage.png|width=100%! !https://raw.githubusercontent.com/JerryLead/Misc/master/SparkPRFigures/OOM/Tasks.png|width=100%! Task-17 generated an OOM error. It shuffled ~1GB data and spilled 3.6GB data onto the disk. Task-17 log below shows that this task is reading the next record by invoking _ExternalAppendOnlyMap.hasNext_(). From the OOM stack traces and the above shuffle metrics, we cannot identify the OOM root causes. !https://raw.githubusercontent.com/JerryLead/Misc/master/SparkPRFigures/OOM/OOMStackTrace.png|width=100%! A question is that why Task-17 still suffered OOM errors even after spilling large in-memory data onto the disk. *[Diagnosing procedure]* Since each executor has 1 core and 7GB, it runs only one task at a time and the task memory usage is going to exceed 7GB. *1: Identify the error phase* I added some debug logs in Spark, and found that the error phase is not the spill phase but the memory-disk-merge phase. The memory-disk-merge phase: Spark reads back the spilled records (as shown in ① Figure 1), merges the spilled records with the in-memory records (as shown in ②), generates new records, and output the new records onto HDFS (as shown in ③). *2. Dataflow and memory usage analysis* I added some profiling code and obtained dataflow and memory usage metrics as follows. Ki represents the _i_-th key, Ri represents the _i_-th row in the table. !https://raw.githubusercontent.com/JerryLead/Misc/master/SparkPRFigures/OOM/DataflowAndMemoryUsage.png|width=100%! Figure 1: Dataflow and Memory Usage Analysis The concrete phases with metrics are as follows. *[Shuffle read]* records = 7,540,235, bytes = 903 MB *[In-memory store]* As shown in the following log, about 5,243,424 of the 7,540,235 records are aggregated to 60 records in AppendOnlyMap. Each record is about 60MB. There are only 60 distinct keys in the shuffled records. !https://raw.githubusercontent.com/JerryLead/Misc/master/SparkPRFigures/OOM/SpilledRecords.png|width=100%! *[Spill]* Since 3.6 GB has achieved the spill threshold, Spark spills the 60 records onto the disk. Since _60 < serializerBatchSize_ (default 10,000), all the 60 records are serialized into the SerializeBuffer and then written onto the disk as a file segment. The 60 serialized records are about 581 MB (this is an estimated size, while the real size maybe larger). !https://raw.githubusercontent.com/JerryLead/Misc/master/SparkPRFigures/OOM/SpilledRecords2.png|width=100%! !https://raw.githubusercontent.com/JerryLead/
[jira] [Updated] (SPARK-22286) OutOfMemoryError caused by memory leak and large serializer batch size in ExternalAppendOnlyMap
[ https://issues.apache.org/jira/browse/SPARK-22286?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lijie Xu updated SPARK-22286: - Description: *[Abstract]* I recently encountered an OOM error in a simple _groupByKey_ application. After profiling the application, I found the OOM error is related to the shuffle spill and records (de)serialization. After analyzing the OOM heap dump, I found the root causes are (1) memory leak in ExternalAppendOnlyMap, (2) large static serializer batch size (_spark.shuffle.spill.batchSize_ =1) defined in ExternalAppendOnlyMap, and (3) memory leak in the deserializer. Since almost all the Spark applications rely on ExternalAppendOnlyMap to perform shuffle and reduce, this is a critical bug/defect. In the following sections, I will detail the testing application, data, environment, failure symptoms, diagnosing procedure, identified root causes, and potential solutions. [Application] This is a simple GroupBy application as follows. {code} table.map(row => (row.sourceIP[1,7], row)).groupByKey().saveAsTextFile() {code} The _sourceIP_ (an IP address like 127.100.101.102) is a column of the _UserVisits_ table. This application has the same logic as the aggregation query in Berkeley SQL benchmark (https://amplab.cs.berkeley.edu/benchmark/) as follows. {code} SELECT * FROM UserVisits GROUP BY SUBSTR(sourceIP, 1, 7); {code} The application code is available at \[1\]. [Data] The UserVisits table size is 16GB (9 columns, 132,000,000 rows) with uniform distribution. The HDFS block size is 128MB. The data generator is available at \[2\]. [Environment] Spark 2.1 (Spark 2.2 may also have this error), Oracle Java Hotspot 1.8.0, 1 master and 8 workers as follows. !https://raw.githubusercontent.com/JerryLead/Misc/master/SparkPRFigures/OOM/Workers.png|width=100%! This application launched 32 executors. Each executor has 1 core and 7GB memory. The detailed application configuration is {code} total-executor-cores = 32 executor-cores = 1 executor-memory = 7G spark.default.parallelism=32 spark.serializer = JavaSerializer (KryoSerializer also has OOM error) {code} [Failure symptoms]This application has a map stage and a reduce stage. An OOM error occurs in a reduce task (Task-17) as follows. !https://raw.githubusercontent.com/JerryLead/Misc/master/SparkPRFigures/OOM/Stage.png|width=100%! !https://raw.githubusercontent.com/JerryLead/Misc/master/SparkPRFigures/OOM/Tasks.png|width=100%! Task-17 generated an OOM error. It shuffled ~1GB data and spilled 3.6GB data onto the disk. Task-17 log below shows that this task is reading the next record by invoking _ExternalAppendOnlyMap.hasNext_(). From the OOM stack traces and the above shuffle metrics, we cannot identify the OOM root causes. !https://raw.githubusercontent.com/JerryLead/Misc/master/SparkPRFigures/OOM/OOMStackTrace.png|width=100%! A question is that why Task-17 still suffered OOM errors even after spilling large in-memory data onto the disk. [Diagnosing procedure] Since each executor has 1 core and 7GB, it runs only one task at a time and the task memory usage is going to exceed 7GB. *1: Identify the error phase* I added some debug logs in Spark, and found that the error phase is not the spill phase but the memory-disk-merge phase. The memory-disk-merge phase: Spark reads back the spilled records (as shown in ① Figure 1), merges the spilled records with the in-memory records (as shown in ②), generates new records, and output the new records onto HDFS (as shown in ③). *2. Dataflow and memory usage analysis* I added some profiling code and obtained dataflow and memory usage metrics as follows. Ki represents the _i_-th key, Ri represents the _i_-th row in the table. !https://raw.githubusercontent.com/JerryLead/Misc/master/SparkPRFigures/OOM/DataflowAndMemoryUsage.png|width=100%! Figure 1: Dataflow and Memory Usage Analysis The concrete phases with metrics are as follows. [Shuffle read] records = 7,540,235, bytes = 903 MB [In-memory store] As shown in the following log, about 5,243,424 of the 7,540,235 records are aggregated to 60 records in AppendOnlyMap. Each record is about 60MB. There are only 60 distinct keys in the shuffled records. !https://raw.githubusercontent.com/JerryLead/Misc/master/SparkPRFigures/OOM/SpilledRecords.png|width=100%! [Spill] Since 3.6 GB has achieved the spill threshold, Spark spills the 60 records onto the disk. Since _60 < serializerBatchSize_ (default 10,000), all the 60 records are serialized into the SerializeBuffer and then written onto the disk as a file segment. The 60 serialized records are about 581 MB (this is an estimated size, while the real size maybe larger). !https://raw.githubusercontent.com/JerryLead/Misc/master/SparkPRFigures/OOM/SpilledRecords2.png|width=100%! !https://raw.githubusercontent.com/JerryLead/Misc/master/SparkPRFigures