Hi Joel, Here are the relevant snippets of my code and an OOM error thrown in frameWriter.save(..). Surprisingly, the heap dump is pretty small ~60MB even though I am running with -Xmx10G and 4G executor and driver memory as shown below.
SparkConf sparkConf = new SparkConf() .setAppName("My Service") .setMaster("local[*]") .set("spark.ui.enabled", "true") .set("spark.executor.memory", "4G") .set("spark.driver.memory", "4G"); sparkSessionBuilder = SparkSession.builder().config(sparkConf).enableHiveSupport(); Dataset<Row> events = sparkSession.read() .format("json") .schema(inputConfig.getSchema()) .load(inputFile.getPath()); DataFrameWriter<Row> frameWriter = events.selectExpr(JavaConversions.asScalaBuffer(outputSchema.getColumns())) // select "data.customer AS `customer`", ... .write() .options(outputConfig.getProperties()) // compression=zlib .format("orc") .partitionBy(JavaConversions.asScalaBuffer(outputSchema.getPartitions())) // partition by "customer" .save(outputUri.getPath()); Here is the error log I get at runtime: 17/11/14 03:36:16 INFO CodeGenerator: Code generated in 115.616924 ms 17/11/14 03:36:17 INFO CodecPool: Got brand-new decompressor [.snappy] java.lang.OutOfMemoryError: Java heap space Dumping heap to java_pid3790.hprof ... Heap dump file created [62653841 bytes in 2.212 secs] # # java.lang.OutOfMemoryError: Java heap space # -XX:OnOutOfMemoryError="kill -9 %p" # Executing "kill -9 3790"... And here is the thread from the thread dump that caused OOM: "Executor task launch worker for task 0" daemon prio=5 tid=90 RUNNABLE at java.lang.OutOfMemoryError.<init>(OutOfMemoryError.java:48) at org.apache.hadoop.io.compress.BlockDecompressorStream.getCompressedData(BlockDecompressorStream.java:123) at org.apache.hadoop.io.compress.BlockDecompressorStream.decompress(BlockDecompressorStream.java:98) at org.apache.hadoop.io.compress.DecompressorStream.read(DecompressorStream.java:85) at java.io.InputStream.read(InputStream.java:101) at org.apache.hadoop.util.LineReader.fillBuffer(LineReader.java:180) Local Variable: byte[]#3957 Local Variable: org.apache.hadoop.io.compress.BlockDecompressorStream#1 at org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:216) at org.apache.hadoop.util.LineReader.readLine(LineReader.java:174) Local Variable: org.apache.hadoop.mapreduce.lib.input.SplitLineReader#1 Local Variable: org.apache.hadoop.io.Text#5 at org.apache.hadoop.mapreduce.lib.input.LineRecordReader.skipUtfByteOrderMark(LineRecordReader.java:144) at org.apache.hadoop.mapreduce.lib.input.LineRecordReader.nextKeyValue(LineRecordReader.java:184) Local Variable: org.apache.hadoop.mapreduce.lib.input.LineRecordReader#1 at org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:39) Local Variable: org.apache.spark.sql.execution.datasources.RecordReaderIterator#1 at org.apache.spark.sql.execution.datasources.HadoopFileLinesReader.hasNext(HadoopFileLinesReader.scala:50) Local Variable: org.apache.spark.sql.execution.datasources.HadoopFileLinesReader#1 at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439) Local Variable: scala.collection.Iterator$$anon$12#1 at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:105) at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:177) at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:105) Local Variable: org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1#1 at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at org.apache.spark.sql.execution.UnsafeExternalRowSorter.sort(UnsafeExternalRowSorter.java:190) at org.apache.spark.sql.execution.SortExec$$anonfun$1.apply(SortExec.scala:108) Local Variable: org.apache.spark.sql.execution.UnsafeExternalRowSorter#1 Local Variable: org.apache.spark.executor.TaskMetrics#2 at org.apache.spark.sql.execution.SortExec$$anonfun$1.apply(SortExec.scala:101) Local Variable: org.apache.spark.sql.execution.SortExec$$anonfun$1#2 at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827) Local Variable: scala.collection.Iterator$$anon$11#2 Local Variable: org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25#2 Local Variable: java.lang.Integer#1 at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) Local Variable: org.apache.spark.sql.execution.datasources.FilePartition#2 Local Variable: org.apache.spark.storage.StorageLevel#1 at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) Local Variable: org.apache.spark.rdd.MapPartitionsRDD#4 Local Variable: org.apache.spark.serializer.JavaSerializerInstance#4 Local Variable: scala.Tuple2#1572 Local Variable: org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1#2 Local Variable: scala.Tuple2#1571 Local Variable: org.apache.spark.TaskContextImpl#1 at org.apache.spark.scheduler.Task.run(Task.scala:108) Local Variable: org.apache.spark.scheduler.ResultTask#2 Local Variable: org.apache.spark.metrics.MetricsSystem#1 at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335) Local Variable: org.apache.spark.serializer.JavaSerializerInstance#5 Local Variable: org.apache.spark.memory.TaskMemoryManager#1 Local Variable: sun.management.ThreadImpl#1 at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) Local Variable: java.util.concurrent.ThreadPoolExecutor#6 Local Variable: org.apache.spark.executor.Executor$TaskRunner#1 at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) Local Variable: java.util.concurrent.ThreadPoolExecutor$Worker#26 at java.lang.Thread.run(Thread.java:745) Thanks, Alec On Mon, Nov 13, 2017 at 8:30 PM, Joel D <games2013....@gmail.com> wrote: > Have you tried increasing driver, exec mem (gc overhead too if required)? > > your code snippet and stack trace will be helpful. > > On Mon, Nov 13, 2017 at 7:23 PM Alec Swan <alecs...@gmail.com> wrote: > >> Hello, >> >> I am using the Spark library to convert JSON/Snappy files to ORC/ZLIB >> format. Effectively, my Java service starts up an embedded Spark cluster >> (master=local[*]) and uses Spark SQL to convert JSON to ORC. However, I >> keep getting OOM errors with large (~1GB) files. >> >> I've tried different ways to reduce memory usage, e.g. by partitioning >> data with dataSet.partitionBy("customer).save(filePath), or capping >> memory usage by setting spark.executor.memory=1G, but to no vail. >> >> I am wondering if there is a way to avoid OOM besides splitting the >> source JSON file into multiple smaller ones and processing the small ones >> individually? Does Spark SQL have to read the JSON/Snappy (row-based) file >> in it's entirety before converting it to ORC (columnar)? If so, would it >> make sense to create a custom receiver that reads the Snappy file and use >> Spark streaming for ORC conversion? >> >> Thanks, >> >> Alec >> >> >> >> >>