Thanks all. I am not submitting a spark job explicitly. Instead, I am using the Spark library functionality embedded in my web service as shown in the code I included in the previous email. So, effectively Spark SQL runs in the web service's JVM. Therefore, --driver-memory option would not (and did not) work for me.
I did try setting the following environment variables SPARK_DRIVER_MEMORY=5g;SPARK_EXECUTOR_MEMORY=5g but they didn't have any effect. Passing "-Dspark.executor.memory=6g -Dspark.driver.memory=6g" JVM parameters had the same effect as setting them in SparkConf in the code, i.e. they showed up in Spark UI but I still got OOM. My use case is somewhat strange because I just wanted to use Spark SQL library for it's multi-format (ORC, Parquet, JSON) support but I really didn't really need the rest of Spark functionality. Should I be considering submitting my Spark code as a job (to be run locally) from the web service code? So far, in this thread we've been focusing on configuring larger memory pools. But I wonder if there is a way to stream/batch the content of JSON file in order to convert it to ORC piecemeal and avoid reading the whole JSON file in memory in the first place? Thanks, Alec On Tue, Nov 14, 2017 at 2:58 AM, Sonal Goyal <sonalgoy...@gmail.com> wrote: > If you are running Spark with local[*] as master, there will be a single > process whose memory will be controlled by --driver-memory command line > option to spark submit. Check > > http://spark.apache.org/docs/latest/configuration.html > > spark.driver.memory 1g Amount of memory to use for the driver process, > i.e. where SparkContext is initialized. (e.g. 1g, 2g). > *Note:* In client mode, this config must not be set through the SparkConf > directly > in your application, because the driver JVM has already started at that > point. Instead, please set this through the --driver-memory command line > option or in your default properties file. > > Thanks, > Sonal > Nube Technologies <http://www.nubetech.co> > > <http://in.linkedin.com/in/sonalgoyal> > > > > On Tue, Nov 14, 2017 at 9:37 AM, Alec Swan <alecs...@gmail.com> wrote: > >> Hi Joel, >> >> Here are the relevant snippets of my code and an OOM error thrown >> in frameWriter.save(..). Surprisingly, the heap dump is pretty small ~60MB >> even though I am running with -Xmx10G and 4G executor and driver memory as >> shown below. >> >> SparkConf sparkConf = new SparkConf() >> .setAppName("My Service") >> .setMaster("local[*]") >> .set("spark.ui.enabled", "true") >> .set("spark.executor.memory", "4G") >> .set("spark.driver.memory", "4G"); >> >> sparkSessionBuilder = SparkSession.builder().config( >> sparkConf).enableHiveSupport(); >> >> Dataset<Row> events = sparkSession.read() >> .format("json") >> .schema(inputConfig.getSchema()) >> .load(inputFile.getPath()); >> >> DataFrameWriter<Row> frameWriter = events.selectExpr(JavaConversi >> ons.asScalaBuffer(outputSchema.getColumns())) // select "data.customer >> AS `customer`", ... >> .write() >> .options(outputConfig.getProperties()) >> // compression=zlib >> .format("orc") >> .partitionBy(JavaConversions.a >> sScalaBuffer(outputSchema.getPartitions())) // partition by "customer" >> .save(outputUri.getPath()); >> >> >> Here is the error log I get at runtime: >> >> 17/11/14 03:36:16 INFO CodeGenerator: Code generated in 115.616924 ms >> 17/11/14 03:36:17 INFO CodecPool: Got brand-new decompressor [.snappy] >> java.lang.OutOfMemoryError: Java heap space >> Dumping heap to java_pid3790.hprof ... >> Heap dump file created [62653841 bytes in 2.212 secs] >> # >> # java.lang.OutOfMemoryError: Java heap space >> # -XX:OnOutOfMemoryError="kill -9 %p" >> # Executing "kill -9 3790"... >> >> >> And here is the thread from the thread dump that caused OOM: >> >> "Executor task launch worker for task 0" daemon prio=5 tid=90 RUNNABLE >> at java.lang.OutOfMemoryError.<init>(OutOfMemoryError.java:48) >> at org.apache.hadoop.io.compress.BlockDecompressorStream.getCom >> pressedData(BlockDecompressorStream.java:123) >> at org.apache.hadoop.io.compress.BlockDecompressorStream.decomp >> ress(BlockDecompressorStream.java:98) >> at org.apache.hadoop.io.compress.DecompressorStream.read(Decomp >> ressorStream.java:85) >> at java.io.InputStream.read(InputStream.java:101) >> at org.apache.hadoop.util.LineReader.fillBuffer(LineReader.java:180) >> Local Variable: byte[]#3957 >> Local Variable: org.apache.hadoop.io.compress. >> BlockDecompressorStream#1 >> at org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:216) >> at org.apache.hadoop.util.LineReader.readLine(LineReader.java:174) >> Local Variable: org.apache.hadoop.mapreduce.li >> b.input.SplitLineReader#1 >> Local Variable: org.apache.hadoop.io.Text#5 >> at org.apache.hadoop.mapreduce.lib.input.LineRecordReader.skipU >> tfByteOrderMark(LineRecordReader.java:144) >> at org.apache.hadoop.mapreduce.lib.input.LineRecordReader.nextK >> eyValue(LineRecordReader.java:184) >> Local Variable: org.apache.hadoop.mapreduce.li >> b.input.LineRecordReader#1 >> at org.apache.spark.sql.execution.datasources.RecordReaderItera >> tor.hasNext(RecordReaderIterator.scala:39) >> Local Variable: org.apache.spark.sql.execution >> .datasources.RecordReaderIterator#1 >> at org.apache.spark.sql.execution.datasources.HadoopFileLinesRe >> ader.hasNext(HadoopFileLinesReader.scala:50) >> Local Variable: org.apache.spark.sql.execution >> .datasources.HadoopFileLinesReader#1 >> at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439) >> Local Variable: scala.collection.Iterator$$anon$12#1 >> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) >> at org.apache.spark.sql.execution.datasources.FileScanRDD$$ >> anon$1.hasNext(FileScanRDD.scala:105) >> at org.apache.spark.sql.execution.datasources.FileScanRDD$$ >> anon$1.nextIterator(FileScanRDD.scala:177) >> at org.apache.spark.sql.execution.datasources.FileScanRDD$$ >> anon$1.hasNext(FileScanRDD.scala:105) >> Local Variable: org.apache.spark.sql.execution >> .datasources.FileScanRDD$$anon$1#1 >> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) >> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) >> at org.apache.spark.sql.execution.UnsafeExternalRowSorter.sort( >> UnsafeExternalRowSorter.java:190) >> at org.apache.spark.sql.execution.SortExec$$anonfun$1.apply( >> SortExec.scala:108) >> Local Variable: org.apache.spark.sql.execution >> .UnsafeExternalRowSorter#1 >> Local Variable: org.apache.spark.executor.TaskMetrics#2 >> at org.apache.spark.sql.execution.SortExec$$anonfun$1.apply( >> SortExec.scala:101) >> Local Variable: org.apache.spark.sql.execution.SortExec$$anonfun$1#2 >> at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$ >> anonfun$apply$25.apply(RDD.scala:827) >> at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$ >> anonfun$apply$25.apply(RDD.scala:827) >> Local Variable: scala.collection.Iterator$$anon$11#2 >> Local Variable: org.apache.spark.rdd.RDD$$anon >> fun$mapPartitionsInternal$1$$anonfun$apply$25#2 >> Local Variable: java.lang.Integer#1 >> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsR >> DD.scala:38) >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) >> Local Variable: org.apache.spark.sql.execution >> .datasources.FilePartition#2 >> Local Variable: org.apache.spark.storage.StorageLevel#1 >> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) >> Local Variable: org.apache.spark.rdd.MapPartitionsRDD#4 >> Local Variable: org.apache.spark.serializer.JavaSerializerInstance#4 >> Local Variable: scala.Tuple2#1572 >> Local Variable: org.apache.spark.sql.execution >> .datasources.FileFormatWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1#2 >> Local Variable: scala.Tuple2#1571 >> Local Variable: org.apache.spark.TaskContextImpl#1 >> at org.apache.spark.scheduler.Task.run(Task.scala:108) >> Local Variable: org.apache.spark.scheduler.ResultTask#2 >> Local Variable: org.apache.spark.metrics.MetricsSystem#1 >> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335) >> Local Variable: org.apache.spark.serializer.JavaSerializerInstance#5 >> Local Variable: org.apache.spark.memory.TaskMemoryManager#1 >> Local Variable: sun.management.ThreadImpl#1 >> at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPool >> Executor.java:1142) >> Local Variable: java.util.concurrent.ThreadPoolExecutor#6 >> Local Variable: org.apache.spark.executor.Executor$TaskRunner#1 >> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoo >> lExecutor.java:617) >> Local Variable: java.util.concurrent.ThreadPoolExecutor$Worker#26 >> at java.lang.Thread.run(Thread.java:745) >> >> >> >> Thanks, >> >> Alec >> >> On Mon, Nov 13, 2017 at 8:30 PM, Joel D <games2013....@gmail.com> wrote: >> >>> Have you tried increasing driver, exec mem (gc overhead too if required)? >>> >>> your code snippet and stack trace will be helpful. >>> >>> On Mon, Nov 13, 2017 at 7:23 PM Alec Swan <alecs...@gmail.com> wrote: >>> >>>> Hello, >>>> >>>> I am using the Spark library to convert JSON/Snappy files to ORC/ZLIB >>>> format. Effectively, my Java service starts up an embedded Spark cluster >>>> (master=local[*]) and uses Spark SQL to convert JSON to ORC. However, I >>>> keep getting OOM errors with large (~1GB) files. >>>> >>>> I've tried different ways to reduce memory usage, e.g. by partitioning >>>> data with dataSet.partitionBy("customer).save(filePath), or capping >>>> memory usage by setting spark.executor.memory=1G, but to no vail. >>>> >>>> I am wondering if there is a way to avoid OOM besides splitting the >>>> source JSON file into multiple smaller ones and processing the small ones >>>> individually? Does Spark SQL have to read the JSON/Snappy (row-based) file >>>> in it's entirety before converting it to ORC (columnar)? If so, would it >>>> make sense to create a custom receiver that reads the Snappy file and use >>>> Spark streaming for ORC conversion? >>>> >>>> Thanks, >>>> >>>> Alec >>>> >>>> >>>> >>>> >>>> >> >