Thanks all. I am not submitting a spark job explicitly. Instead, I am using
the Spark library functionality embedded in my web service as shown in the
code I included in the previous email. So, effectively Spark SQL runs in
the web service's JVM. Therefore, --driver-memory option would not (and did
not) work for me.

I did try setting the following environment variables
SPARK_DRIVER_MEMORY=5g;SPARK_EXECUTOR_MEMORY=5g but they didn't have any
effect. Passing "-Dspark.executor.memory=6g  -Dspark.driver.memory=6g" JVM
parameters had the same effect as setting them in SparkConf in the code,
i.e. they showed up in Spark UI but I still got OOM.

My use case is somewhat strange because I just wanted to use Spark SQL
library for it's multi-format (ORC, Parquet, JSON) support but I really
didn't really need the rest of Spark functionality. Should I be considering
submitting my Spark code as a job (to be run locally) from the web service
code?

So far, in this thread we've been focusing on configuring larger memory
pools. But I wonder if there is a way to stream/batch the content of JSON
file in order to convert it to ORC piecemeal and avoid reading the whole
JSON file in memory in the first place?




Thanks,

Alec

On Tue, Nov 14, 2017 at 2:58 AM, Sonal Goyal <sonalgoy...@gmail.com> wrote:

> If you are running Spark with local[*] as master, there will be a single
> process whose memory will be controlled by --driver-memory command line
> option to spark submit. Check
>
> http://spark.apache.org/docs/latest/configuration.html
>
> spark.driver.memory 1g Amount of memory to use for the driver process,
> i.e. where SparkContext is initialized. (e.g. 1g, 2g).
> *Note:* In client mode, this config must not be set through the SparkConf 
> directly
> in your application, because the driver JVM has already started at that
> point. Instead, please set this through the --driver-memory command line
> option or in your default properties file.
>
> Thanks,
> Sonal
> Nube Technologies <http://www.nubetech.co>
>
> <http://in.linkedin.com/in/sonalgoyal>
>
>
>
> On Tue, Nov 14, 2017 at 9:37 AM, Alec Swan <alecs...@gmail.com> wrote:
>
>> Hi Joel,
>>
>> Here are the relevant snippets of my code and an OOM error thrown
>> in frameWriter.save(..). Surprisingly, the heap dump is pretty small ~60MB
>> even though I am running with -Xmx10G and 4G executor and driver memory as
>> shown below.
>>
>>         SparkConf sparkConf = new SparkConf()
>>                 .setAppName("My Service")
>>                 .setMaster("local[*]")
>>                 .set("spark.ui.enabled", "true")
>>                 .set("spark.executor.memory", "4G")
>>                 .set("spark.driver.memory", "4G");
>>
>>         sparkSessionBuilder = SparkSession.builder().config(
>> sparkConf).enableHiveSupport();
>>
>>         Dataset<Row> events = sparkSession.read()
>>                 .format("json")
>>                 .schema(inputConfig.getSchema())
>>                 .load(inputFile.getPath());
>>
>>         DataFrameWriter<Row> frameWriter = events.selectExpr(JavaConversi
>> ons.asScalaBuffer(outputSchema.getColumns())) // select "data.customer
>> AS `customer`", ...
>>                 .write()
>>                 .options(outputConfig.getProperties())
>> // compression=zlib
>>                 .format("orc")
>>                 .partitionBy(JavaConversions.a
>> sScalaBuffer(outputSchema.getPartitions())) // partition by "customer"
>>                 .save(outputUri.getPath());
>>
>>
>> Here is the error log I get at runtime:
>>
>> 17/11/14 03:36:16 INFO CodeGenerator: Code generated in 115.616924 ms
>> 17/11/14 03:36:17 INFO CodecPool: Got brand-new decompressor [.snappy]
>> java.lang.OutOfMemoryError: Java heap space
>> Dumping heap to java_pid3790.hprof ...
>> Heap dump file created [62653841 bytes in 2.212 secs]
>> #
>> # java.lang.OutOfMemoryError: Java heap space
>> # -XX:OnOutOfMemoryError="kill -9 %p"
>> #   Executing "kill -9 3790"...
>>
>>
>> And here is the thread from the thread dump that caused OOM:
>>
>> "Executor task launch worker for task 0" daemon prio=5 tid=90 RUNNABLE
>> at java.lang.OutOfMemoryError.<init>(OutOfMemoryError.java:48)
>> at org.apache.hadoop.io.compress.BlockDecompressorStream.getCom
>> pressedData(BlockDecompressorStream.java:123)
>> at org.apache.hadoop.io.compress.BlockDecompressorStream.decomp
>> ress(BlockDecompressorStream.java:98)
>> at org.apache.hadoop.io.compress.DecompressorStream.read(Decomp
>> ressorStream.java:85)
>> at java.io.InputStream.read(InputStream.java:101)
>> at org.apache.hadoop.util.LineReader.fillBuffer(LineReader.java:180)
>>    Local Variable: byte[]#3957
>>    Local Variable: org.apache.hadoop.io.compress.
>> BlockDecompressorStream#1
>> at org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:216)
>> at org.apache.hadoop.util.LineReader.readLine(LineReader.java:174)
>>    Local Variable: org.apache.hadoop.mapreduce.li
>> b.input.SplitLineReader#1
>>    Local Variable: org.apache.hadoop.io.Text#5
>> at org.apache.hadoop.mapreduce.lib.input.LineRecordReader.skipU
>> tfByteOrderMark(LineRecordReader.java:144)
>> at org.apache.hadoop.mapreduce.lib.input.LineRecordReader.nextK
>> eyValue(LineRecordReader.java:184)
>>    Local Variable: org.apache.hadoop.mapreduce.li
>> b.input.LineRecordReader#1
>> at org.apache.spark.sql.execution.datasources.RecordReaderItera
>> tor.hasNext(RecordReaderIterator.scala:39)
>>    Local Variable: org.apache.spark.sql.execution
>> .datasources.RecordReaderIterator#1
>> at org.apache.spark.sql.execution.datasources.HadoopFileLinesRe
>> ader.hasNext(HadoopFileLinesReader.scala:50)
>>    Local Variable: org.apache.spark.sql.execution
>> .datasources.HadoopFileLinesReader#1
>> at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439)
>>    Local Variable: scala.collection.Iterator$$anon$12#1
>> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
>> at org.apache.spark.sql.execution.datasources.FileScanRDD$$
>> anon$1.hasNext(FileScanRDD.scala:105)
>> at org.apache.spark.sql.execution.datasources.FileScanRDD$$
>> anon$1.nextIterator(FileScanRDD.scala:177)
>> at org.apache.spark.sql.execution.datasources.FileScanRDD$$
>> anon$1.hasNext(FileScanRDD.scala:105)
>>    Local Variable: org.apache.spark.sql.execution
>> .datasources.FileScanRDD$$anon$1#1
>> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
>> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
>> at org.apache.spark.sql.execution.UnsafeExternalRowSorter.sort(
>> UnsafeExternalRowSorter.java:190)
>> at org.apache.spark.sql.execution.SortExec$$anonfun$1.apply(
>> SortExec.scala:108)
>>    Local Variable: org.apache.spark.sql.execution
>> .UnsafeExternalRowSorter#1
>>    Local Variable: org.apache.spark.executor.TaskMetrics#2
>> at org.apache.spark.sql.execution.SortExec$$anonfun$1.apply(
>> SortExec.scala:101)
>>    Local Variable: org.apache.spark.sql.execution.SortExec$$anonfun$1#2
>> at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$
>> anonfun$apply$25.apply(RDD.scala:827)
>> at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$
>> anonfun$apply$25.apply(RDD.scala:827)
>>    Local Variable: scala.collection.Iterator$$anon$11#2
>>    Local Variable: org.apache.spark.rdd.RDD$$anon
>> fun$mapPartitionsInternal$1$$anonfun$apply$25#2
>>    Local Variable: java.lang.Integer#1
>> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsR
>> DD.scala:38)
>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
>>    Local Variable: org.apache.spark.sql.execution
>> .datasources.FilePartition#2
>>    Local Variable: org.apache.spark.storage.StorageLevel#1
>> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
>>    Local Variable: org.apache.spark.rdd.MapPartitionsRDD#4
>>    Local Variable: org.apache.spark.serializer.JavaSerializerInstance#4
>>    Local Variable: scala.Tuple2#1572
>>    Local Variable: org.apache.spark.sql.execution
>> .datasources.FileFormatWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1#2
>>    Local Variable: scala.Tuple2#1571
>>    Local Variable: org.apache.spark.TaskContextImpl#1
>> at org.apache.spark.scheduler.Task.run(Task.scala:108)
>>    Local Variable: org.apache.spark.scheduler.ResultTask#2
>>    Local Variable: org.apache.spark.metrics.MetricsSystem#1
>> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
>>    Local Variable: org.apache.spark.serializer.JavaSerializerInstance#5
>>    Local Variable: org.apache.spark.memory.TaskMemoryManager#1
>>    Local Variable: sun.management.ThreadImpl#1
>> at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPool
>> Executor.java:1142)
>>    Local Variable: java.util.concurrent.ThreadPoolExecutor#6
>>    Local Variable: org.apache.spark.executor.Executor$TaskRunner#1
>> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoo
>> lExecutor.java:617)
>>    Local Variable: java.util.concurrent.ThreadPoolExecutor$Worker#26
>> at java.lang.Thread.run(Thread.java:745)
>>
>>
>>
>> Thanks,
>>
>> Alec
>>
>> On Mon, Nov 13, 2017 at 8:30 PM, Joel D <games2013....@gmail.com> wrote:
>>
>>> Have you tried increasing driver, exec mem (gc overhead too if required)?
>>>
>>> your code snippet and stack trace will be helpful.
>>>
>>> On Mon, Nov 13, 2017 at 7:23 PM Alec Swan <alecs...@gmail.com> wrote:
>>>
>>>> Hello,
>>>>
>>>> I am using the Spark library to convert JSON/Snappy files to ORC/ZLIB
>>>> format. Effectively, my Java service starts up an embedded Spark cluster
>>>> (master=local[*]) and uses Spark SQL to convert JSON to ORC. However, I
>>>> keep getting OOM errors with large (~1GB) files.
>>>>
>>>> I've tried different ways to reduce memory usage, e.g. by partitioning
>>>> data with dataSet.partitionBy("customer).save(filePath), or capping
>>>> memory usage by setting spark.executor.memory=1G, but to no vail.
>>>>
>>>> I am wondering if there is a way to avoid OOM besides splitting the
>>>> source JSON file into multiple smaller ones and processing the small ones
>>>> individually? Does Spark SQL have to read the JSON/Snappy (row-based) file
>>>> in it's entirety before converting it to ORC (columnar)? If so, would it
>>>> make sense to create a custom receiver that reads the Snappy file and use
>>>> Spark streaming for ORC conversion?
>>>>
>>>> Thanks,
>>>>
>>>> Alec
>>>>
>>>>
>>>>
>>>>
>>>>
>>
>

Reply via email to