Re: Stable spark streaming app

2014-09-18 Thread Tim Smith
Dibyendu - I am using the Kafka consumer built into Spark streaming.
Pulled the jar from here:
http://search.maven.org/remotecontent?filepath=org/apache/spark/spark-streaming-kafka_2.10/1.0.0/spark-streaming-kafka_2.10-1.0.0.jar

Thanks for the sbt-assembly link, Soumitra.

On Wed, Sep 17, 2014 at 5:50 PM, Dibyendu Bhattacharya
dibyendu.bhattach...@gmail.com wrote:
 Hi Tim

 Just curious to know ; Which Kafka Consumer you have used ?

 Dib

 On Sep 18, 2014 4:40 AM, Tim Smith secs...@gmail.com wrote:

 Thanks :)

 On Wed, Sep 17, 2014 at 2:10 PM, Paul Wais pw...@yelp.com wrote:
  Thanks Tim, this is super helpful!
 
  Question about jars and spark-submit:  why do you provide
  myawesomeapp.jar as the program jar but then include other jars via
  the --jars argument?  Have you tried building one uber jar with all
  dependencies and just sending that to Spark as your app jar?

 I guess that is mostly because I am Scala/sbt noob :) How do I create
 the uber jar? My .sbt file says:
 name := My Awesome App
 version := 1.025
 scalaVersion := 2.10.4
 resolvers += Apache repo at
 https://repository.apache.org/content/repositories/releases;
 libraryDependencies += org.apache.spark %% spark-core % 1.0.0
 libraryDependencies += org.apache.spark %% spark-streaming % 1.0.0
 libraryDependencies += org.apache.spark %% spark-streaming-kafka %
 1.0.0
 libraryDependencies += org.apache.kafka %% kafka % 0.8.1.1

 Then I run sbt package to generate myawesomeapp.jar.

 
  Also, have you ever seen any issues with Spark caching your app jar
  between runs even if it changes?

 Not that I can tell but then maybe because I use Yarn, I might be
 shielded from some jar distribution bugs in Spark?


 
  On Wed, Sep 17, 2014 at 1:11 PM, Tim Smith secs...@gmail.com wrote:
  I don't have anything in production yet but I now at least have a
  stable (running for more than 24 hours) streaming app. Earlier, the
  app would crash for all sorts of reasons. Caveats/setup:
  - Spark 1.0.0 (I have no input flow control unlike Spark 1.1)
  - Yarn for RM
  - Input and Output to Kafka
  - CDH 5.1
  - 11 node cluster with 32-cores and 48G max container size for each
  node (Yarn managed)
  - 5 partition Kafka topic - both in and out
  - Roughly, an average of 25k messages per second
  - App written in Scala (warning: I am a Scala noob)
 
  Few things I had to add/tweak to get the app to be stable:
  - The executor JVMs did not have any GC options set, by default. This
  might be more of a CDH issue. I noticed that while the Yarn container
  and other Spark ancillary tasks had GC options set at launch but none
  for the executors. So I played with different GC options and this
  worked best:
  SPARK_JAVA_OPTS=-XX:MaxPermSize=512m -XX:NewSize=1024m
  -XX:+UseConcMarkSweepGC -XX:CMSInitiatingOccupancyFraction=70
  -XX:+AggressiveHeap -XX:MaxHeapFreeRatio=70 -verbosegc
  -XX:+PrintGCDetails
 
  I tried G1GC but for some reason it just didn't work. I am not a Java
  programmer or expert so my conclusion is purely trial and error based.
  The GC logs, with these flags, go to the stdout file in the Yarn
  container logs on each node/worker. You can set SPARK_JAVA_OPTS in
  spark-env.sh on the driver node and Yarn will respect these. On CDH/CM
  specifically, even though you don't run Spark as a service (since you
  are using Yarn for RM), you can goto Spark Client Advanced
  Configuration Snippet (Safety Valve) for spark-conf/spark-env.sh and
  set SPARK_JAVA_OPTS there.
 
  - Set these two params - spark.yarn.executor.memoryOverhead
  spark.yarn.driver.memoryOverhead. Earlier, my app would get killed
  because the executors running the kafka receivers would get killed by
  Yarn for over utilization of memory. Now, these are my memory settings
  (I will paste the entire app launch params later in the email):
  --driver-memory 2G \
  --executor-memory 16G \
  --spark.yarn.executor.memoryOverhead 4096 \
  --spark.yarn.driver.memoryOverhead 1024 \
 
  Your total executor JVM will consume executor-memory minus
  spark.yarn.executor.memoryOverhead so you should see each executor
  JVM consuming no more than 12G, in this case.
 
  Here is how I launch my app:
  run=`date +%m-%d-%YT%T`; \
  nohup spark-submit --class myAwesomeApp \
  --master yarn myawesomeapp.jar \
  --jars
  spark-streaming-kafka_2.10-1.0.0.jar,kafka_2.10-0.8.1.1.jar,zkclient-0.3.jar,metrics-core-2.2.0.jar,json4s-jackson_2.10-3.2.10.jar
  \
  --driver-memory 2G \
  --executor-memory 16G \
  --executor-cores 16 \
  --num-executors 10 \
  --spark.serializer org.apache.spark.serializer.KryoSerializer \
  --spark.rdd.compress true \
  --spark.io.compression.codec org.apache.spark.io.SnappyCompressionCodec
  \
  --spark.akka.threads 64 \
  --spark.akka.frameSize 500 \
  --spark.task.maxFailures 64 \
  --spark.scheduler.mode FAIR \
  --spark.yarn.executor.memoryOverhead 4096 \
  --spark.yarn.driver.memoryOverhead 1024 \
  --spark.shuffle.consolidateFiles true \
  --spark.default.parallelism 528 \
 

Re: Stable spark streaming app

2014-09-17 Thread Soumitra Kumar
Hmm, no response to this thread!

Adding to it, please share experiences of building an enterprise grade product 
based on Spark Streaming.

I am exploring Spark Streaming for enterprise software and am cautiously 
optimistic about it. I see huge potential to improve debuggability of Spark.

- Original Message -
From: Tim Smith secs...@gmail.com
To: spark users user@spark.apache.org
Sent: Friday, September 12, 2014 10:09:53 AM
Subject: Stable spark streaming app

Hi,

Anyone have a stable streaming app running in production? Can you
share some overview of the app and setup like number of nodes, events
per second, broad stream processing workflow, config highlights etc?

Thanks,

Tim

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Stable spark streaming app

2014-09-17 Thread Tim Smith
I don't have anything in production yet but I now at least have a
stable (running for more than 24 hours) streaming app. Earlier, the
app would crash for all sorts of reasons. Caveats/setup:
- Spark 1.0.0 (I have no input flow control unlike Spark 1.1)
- Yarn for RM
- Input and Output to Kafka
- CDH 5.1
- 11 node cluster with 32-cores and 48G max container size for each
node (Yarn managed)
- 5 partition Kafka topic - both in and out
- Roughly, an average of 25k messages per second
- App written in Scala (warning: I am a Scala noob)

Few things I had to add/tweak to get the app to be stable:
- The executor JVMs did not have any GC options set, by default. This
might be more of a CDH issue. I noticed that while the Yarn container
and other Spark ancillary tasks had GC options set at launch but none
for the executors. So I played with different GC options and this
worked best:
SPARK_JAVA_OPTS=-XX:MaxPermSize=512m -XX:NewSize=1024m
-XX:+UseConcMarkSweepGC -XX:CMSInitiatingOccupancyFraction=70
-XX:+AggressiveHeap -XX:MaxHeapFreeRatio=70 -verbosegc
-XX:+PrintGCDetails

I tried G1GC but for some reason it just didn't work. I am not a Java
programmer or expert so my conclusion is purely trial and error based.
The GC logs, with these flags, go to the stdout file in the Yarn
container logs on each node/worker. You can set SPARK_JAVA_OPTS in
spark-env.sh on the driver node and Yarn will respect these. On CDH/CM
specifically, even though you don't run Spark as a service (since you
are using Yarn for RM), you can goto Spark Client Advanced
Configuration Snippet (Safety Valve) for spark-conf/spark-env.sh and
set SPARK_JAVA_OPTS there.

- Set these two params - spark.yarn.executor.memoryOverhead
spark.yarn.driver.memoryOverhead. Earlier, my app would get killed
because the executors running the kafka receivers would get killed by
Yarn for over utilization of memory. Now, these are my memory settings
(I will paste the entire app launch params later in the email):
--driver-memory 2G \
--executor-memory 16G \
--spark.yarn.executor.memoryOverhead 4096 \
--spark.yarn.driver.memoryOverhead 1024 \

Your total executor JVM will consume executor-memory minus
spark.yarn.executor.memoryOverhead so you should see each executor
JVM consuming no more than 12G, in this case.

Here is how I launch my app:
run=`date +%m-%d-%YT%T`; \
nohup spark-submit --class myAwesomeApp \
--master yarn myawesomeapp.jar \
--jars 
spark-streaming-kafka_2.10-1.0.0.jar,kafka_2.10-0.8.1.1.jar,zkclient-0.3.jar,metrics-core-2.2.0.jar,json4s-jackson_2.10-3.2.10.jar
\
--driver-memory 2G \
--executor-memory 16G \
--executor-cores 16 \
--num-executors 10 \
--spark.serializer org.apache.spark.serializer.KryoSerializer \
--spark.rdd.compress true \
--spark.io.compression.codec org.apache.spark.io.SnappyCompressionCodec \
--spark.akka.threads 64 \
--spark.akka.frameSize 500 \
--spark.task.maxFailures 64 \
--spark.scheduler.mode FAIR \
--spark.yarn.executor.memoryOverhead 4096 \
--spark.yarn.driver.memoryOverhead 1024 \
--spark.shuffle.consolidateFiles true \
--spark.default.parallelism 528 \
logs/normRunLog-$run.log \
2logs/normRunLogError-$run.log  \
echo $!  logs/current-run.pid

Some code optimizations (or, goof ups that I fixed). I did not
scientifically measure the impact of each but I think they helped:
- Made all my classes and objects serializable and then use Kryo (as
you see above)
- I map one receive task for each kafka partition
- Instead of doing a union on all the incoming streams and then
repartition() I now repartition() each incoming stream and process
them separately. I believe this reduces shuffle.
- Reduced number of repartitions. I was doing 128 after doing a
union on all incoming dStreams. I now repartition each of the five
streams separately (in a loop) to 24.
- For each RDD, I set storagelevel to MEMORY_AND_DISK_SER
- Process data per partition instead of per RDD: dataout.foreachRDD(
rdd = rdd.foreachPartition(rec = { myFunc(rec) }) )
- Specific to kafka: when I create new Producer, make sure I close
it else I had a ton of too many files open errors :)
- Use immutable objects as far as possible. If I use mutable objects
within a method/class then I turn them into immutable before passing
onto another class/method.
- For logging, create a LogService object that I then use for other
object/class declarations. Once instantiated, I can make logInfo
calls from within other Objects/Methods/Classes and output goes to the
stderr file in the Yarn container logs. Good for debugging stream
processing logic.

Currently, my processing delay is lower than my dStream time window so
all is good. I get a ton of these errors in my driver logs:
14/09/16 21:17:40 ERROR LiveListenerBus: Listener JobProgressListener
threw an exception

These seem related to: https://issues.apache.org/jira/browse/SPARK-2316

Best I understand and have been told, this does not affect data
integrity but may cause un-necessary recomputes.

Hope this helps,

Tim


On Wed, Sep 17, 2014 

Re: Stable spark streaming app

2014-09-17 Thread Soumitra Kumar
Thanks Tim for the detailed email, would help me a lot.

I have:

. 3 nodes CDH5.1 cluster with 16G memory.

. Majority of code in Scala, some part in Java (using Cascading earlier).

. Inputs are bunch of textFileStream directories.

. Every batch output is going to Parquet files, and to HBase
  Had to set export 
SPARK_CLASSPATH=/opt/cloudera/parcels/CDH/lib/hbase/hbase-protocol.jar fix 
some issue during class loading.
  For some reason 'spark.driver.extraClassPath' did not work.

. I started doing union pretty early in the flow, but did not work because all 
of my classes are not serializable. Each DStream in isolation works but if I 
union them early, then got into serialization issues.

. Took a while to find the log directory, /run/spark/work .

. Deploying at --master spark://mymachine:7077

. Modified '/etc/spark/conf.cloudera.spark/log4j.properties' for logging

. Currently I cannot process  1G file with this configuration. I tried various 
things but could not succeed yet.

- Original Message -
From: Tim Smith secs...@gmail.com
Cc: spark users user@spark.apache.org
Sent: Wednesday, September 17, 2014 1:11:12 PM
Subject: Re: Stable spark streaming app

I don't have anything in production yet but I now at least have a
stable (running for more than 24 hours) streaming app. Earlier, the
app would crash for all sorts of reasons. Caveats/setup:
- Spark 1.0.0 (I have no input flow control unlike Spark 1.1)
- Yarn for RM
- Input and Output to Kafka
- CDH 5.1
- 11 node cluster with 32-cores and 48G max container size for each
node (Yarn managed)
- 5 partition Kafka topic - both in and out
- Roughly, an average of 25k messages per second
- App written in Scala (warning: I am a Scala noob)

Few things I had to add/tweak to get the app to be stable:
- The executor JVMs did not have any GC options set, by default. This
might be more of a CDH issue. I noticed that while the Yarn container
and other Spark ancillary tasks had GC options set at launch but none
for the executors. So I played with different GC options and this
worked best:
SPARK_JAVA_OPTS=-XX:MaxPermSize=512m -XX:NewSize=1024m
-XX:+UseConcMarkSweepGC -XX:CMSInitiatingOccupancyFraction=70
-XX:+AggressiveHeap -XX:MaxHeapFreeRatio=70 -verbosegc
-XX:+PrintGCDetails

I tried G1GC but for some reason it just didn't work. I am not a Java
programmer or expert so my conclusion is purely trial and error based.
The GC logs, with these flags, go to the stdout file in the Yarn
container logs on each node/worker. You can set SPARK_JAVA_OPTS in
spark-env.sh on the driver node and Yarn will respect these. On CDH/CM
specifically, even though you don't run Spark as a service (since you
are using Yarn for RM), you can goto Spark Client Advanced
Configuration Snippet (Safety Valve) for spark-conf/spark-env.sh and
set SPARK_JAVA_OPTS there.

- Set these two params - spark.yarn.executor.memoryOverhead
spark.yarn.driver.memoryOverhead. Earlier, my app would get killed
because the executors running the kafka receivers would get killed by
Yarn for over utilization of memory. Now, these are my memory settings
(I will paste the entire app launch params later in the email):
--driver-memory 2G \
--executor-memory 16G \
--spark.yarn.executor.memoryOverhead 4096 \
--spark.yarn.driver.memoryOverhead 1024 \

Your total executor JVM will consume executor-memory minus
spark.yarn.executor.memoryOverhead so you should see each executor
JVM consuming no more than 12G, in this case.

Here is how I launch my app:
run=`date +%m-%d-%YT%T`; \
nohup spark-submit --class myAwesomeApp \
--master yarn myawesomeapp.jar \
--jars 
spark-streaming-kafka_2.10-1.0.0.jar,kafka_2.10-0.8.1.1.jar,zkclient-0.3.jar,metrics-core-2.2.0.jar,json4s-jackson_2.10-3.2.10.jar
\
--driver-memory 2G \
--executor-memory 16G \
--executor-cores 16 \
--num-executors 10 \
--spark.serializer org.apache.spark.serializer.KryoSerializer \
--spark.rdd.compress true \
--spark.io.compression.codec org.apache.spark.io.SnappyCompressionCodec \
--spark.akka.threads 64 \
--spark.akka.frameSize 500 \
--spark.task.maxFailures 64 \
--spark.scheduler.mode FAIR \
--spark.yarn.executor.memoryOverhead 4096 \
--spark.yarn.driver.memoryOverhead 1024 \
--spark.shuffle.consolidateFiles true \
--spark.default.parallelism 528 \
logs/normRunLog-$run.log \
2logs/normRunLogError-$run.log  \
echo $!  logs/current-run.pid

Some code optimizations (or, goof ups that I fixed). I did not
scientifically measure the impact of each but I think they helped:
- Made all my classes and objects serializable and then use Kryo (as
you see above)
- I map one receive task for each kafka partition
- Instead of doing a union on all the incoming streams and then
repartition() I now repartition() each incoming stream and process
them separately. I believe this reduces shuffle.
- Reduced number of repartitions. I was doing 128 after doing a
union on all incoming dStreams. I now repartition each of the five
streams separately (in a loop) to 24.
- For each RDD

RE: Stable spark streaming app

2014-09-17 Thread abraham.jacob
Nice write-up... very helpful!


-Original Message-
From: Tim Smith [mailto:secs...@gmail.com] 
Sent: Wednesday, September 17, 2014 1:11 PM
Cc: spark users
Subject: Re: Stable spark streaming app

I don't have anything in production yet but I now at least have a stable 
(running for more than 24 hours) streaming app. Earlier, the app would crash 
for all sorts of reasons. Caveats/setup:
- Spark 1.0.0 (I have no input flow control unlike Spark 1.1)
- Yarn for RM
- Input and Output to Kafka
- CDH 5.1
- 11 node cluster with 32-cores and 48G max container size for each node (Yarn 
managed)
- 5 partition Kafka topic - both in and out
- Roughly, an average of 25k messages per second
- App written in Scala (warning: I am a Scala noob)

Few things I had to add/tweak to get the app to be stable:
- The executor JVMs did not have any GC options set, by default. This might be 
more of a CDH issue. I noticed that while the Yarn container and other Spark 
ancillary tasks had GC options set at launch but none for the executors. So I 
played with different GC options and this worked best:
SPARK_JAVA_OPTS=-XX:MaxPermSize=512m -XX:NewSize=1024m -XX:+UseConcMarkSweepGC 
-XX:CMSInitiatingOccupancyFraction=70
-XX:+AggressiveHeap -XX:MaxHeapFreeRatio=70 -verbosegc -XX:+PrintGCDetails

I tried G1GC but for some reason it just didn't work. I am not a Java 
programmer or expert so my conclusion is purely trial and error based.
The GC logs, with these flags, go to the stdout file in the Yarn container 
logs on each node/worker. You can set SPARK_JAVA_OPTS in spark-env.sh on the 
driver node and Yarn will respect these. On CDH/CM specifically, even though 
you don't run Spark as a service (since you are using Yarn for RM), you can 
goto Spark Client Advanced Configuration Snippet (Safety Valve) for 
spark-conf/spark-env.sh and set SPARK_JAVA_OPTS there.

- Set these two params - spark.yarn.executor.memoryOverhead
spark.yarn.driver.memoryOverhead. Earlier, my app would get killed because 
the executors running the kafka receivers would get killed by Yarn for over 
utilization of memory. Now, these are my memory settings (I will paste the 
entire app launch params later in the email):
--driver-memory 2G \
--executor-memory 16G \
--spark.yarn.executor.memoryOverhead 4096 \ --spark.yarn.driver.memoryOverhead 
1024 \

Your total executor JVM will consume executor-memory minus 
spark.yarn.executor.memoryOverhead so you should see each executor JVM 
consuming no more than 12G, in this case.

Here is how I launch my app:
run=`date +%m-%d-%YT%T`; \
nohup spark-submit --class myAwesomeApp \ --master yarn myawesomeapp.jar \ 
--jars 
spark-streaming-kafka_2.10-1.0.0.jar,kafka_2.10-0.8.1.1.jar,zkclient-0.3.jar,metrics-core-2.2.0.jar,json4s-jackson_2.10-3.2.10.jar
\
--driver-memory 2G \
--executor-memory 16G \
--executor-cores 16 \
--num-executors 10 \
--spark.serializer org.apache.spark.serializer.KryoSerializer \ 
--spark.rdd.compress true \ --spark.io.compression.codec 
org.apache.spark.io.SnappyCompressionCodec \ --spark.akka.threads 64 \ 
--spark.akka.frameSize 500 \ --spark.task.maxFailures 64 \ 
--spark.scheduler.mode FAIR \ --spark.yarn.executor.memoryOverhead 4096 \ 
--spark.yarn.driver.memoryOverhead 1024 \ --spark.shuffle.consolidateFiles true 
\ --spark.default.parallelism 528 \
logs/normRunLog-$run.log \
2logs/normRunLogError-$run.log  \
echo $!  logs/current-run.pid

Some code optimizations (or, goof ups that I fixed). I did not scientifically 
measure the impact of each but I think they helped:
- Made all my classes and objects serializable and then use Kryo (as you see 
above)
- I map one receive task for each kafka partition
- Instead of doing a union on all the incoming streams and then
repartition() I now repartition() each incoming stream and process them 
separately. I believe this reduces shuffle.
- Reduced number of repartitions. I was doing 128 after doing a union on all 
incoming dStreams. I now repartition each of the five streams separately (in a 
loop) to 24.
- For each RDD, I set storagelevel to MEMORY_AND_DISK_SER
- Process data per partition instead of per RDD: dataout.foreachRDD( rdd = 
rdd.foreachPartition(rec = { myFunc(rec) }) )
- Specific to kafka: when I create new Producer, make sure I close
it else I had a ton of too many files open errors :)
- Use immutable objects as far as possible. If I use mutable objects within a 
method/class then I turn them into immutable before passing onto another 
class/method.
- For logging, create a LogService object that I then use for other 
object/class declarations. Once instantiated, I can make logInfo
calls from within other Objects/Methods/Classes and output goes to the stderr 
file in the Yarn container logs. Good for debugging stream processing logic.

Currently, my processing delay is lower than my dStream time window so all is 
good. I get a ton of these errors in my driver logs:
14/09/16 21:17:40 ERROR LiveListenerBus: Listener JobProgressListener threw

Re: Stable spark streaming app

2014-09-17 Thread Tim Smith
Thanks :)

On Wed, Sep 17, 2014 at 2:10 PM, Paul Wais pw...@yelp.com wrote:
 Thanks Tim, this is super helpful!

 Question about jars and spark-submit:  why do you provide
 myawesomeapp.jar as the program jar but then include other jars via
 the --jars argument?  Have you tried building one uber jar with all
 dependencies and just sending that to Spark as your app jar?

I guess that is mostly because I am Scala/sbt noob :) How do I create
the uber jar? My .sbt file says:
name := My Awesome App
version := 1.025
scalaVersion := 2.10.4
resolvers += Apache repo at
https://repository.apache.org/content/repositories/releases;
libraryDependencies += org.apache.spark %% spark-core % 1.0.0
libraryDependencies += org.apache.spark %% spark-streaming % 1.0.0
libraryDependencies += org.apache.spark %% spark-streaming-kafka % 1.0.0
libraryDependencies += org.apache.kafka %% kafka % 0.8.1.1

Then I run sbt package to generate myawesomeapp.jar.


 Also, have you ever seen any issues with Spark caching your app jar
 between runs even if it changes?

Not that I can tell but then maybe because I use Yarn, I might be
shielded from some jar distribution bugs in Spark?



 On Wed, Sep 17, 2014 at 1:11 PM, Tim Smith secs...@gmail.com wrote:
 I don't have anything in production yet but I now at least have a
 stable (running for more than 24 hours) streaming app. Earlier, the
 app would crash for all sorts of reasons. Caveats/setup:
 - Spark 1.0.0 (I have no input flow control unlike Spark 1.1)
 - Yarn for RM
 - Input and Output to Kafka
 - CDH 5.1
 - 11 node cluster with 32-cores and 48G max container size for each
 node (Yarn managed)
 - 5 partition Kafka topic - both in and out
 - Roughly, an average of 25k messages per second
 - App written in Scala (warning: I am a Scala noob)

 Few things I had to add/tweak to get the app to be stable:
 - The executor JVMs did not have any GC options set, by default. This
 might be more of a CDH issue. I noticed that while the Yarn container
 and other Spark ancillary tasks had GC options set at launch but none
 for the executors. So I played with different GC options and this
 worked best:
 SPARK_JAVA_OPTS=-XX:MaxPermSize=512m -XX:NewSize=1024m
 -XX:+UseConcMarkSweepGC -XX:CMSInitiatingOccupancyFraction=70
 -XX:+AggressiveHeap -XX:MaxHeapFreeRatio=70 -verbosegc
 -XX:+PrintGCDetails

 I tried G1GC but for some reason it just didn't work. I am not a Java
 programmer or expert so my conclusion is purely trial and error based.
 The GC logs, with these flags, go to the stdout file in the Yarn
 container logs on each node/worker. You can set SPARK_JAVA_OPTS in
 spark-env.sh on the driver node and Yarn will respect these. On CDH/CM
 specifically, even though you don't run Spark as a service (since you
 are using Yarn for RM), you can goto Spark Client Advanced
 Configuration Snippet (Safety Valve) for spark-conf/spark-env.sh and
 set SPARK_JAVA_OPTS there.

 - Set these two params - spark.yarn.executor.memoryOverhead
 spark.yarn.driver.memoryOverhead. Earlier, my app would get killed
 because the executors running the kafka receivers would get killed by
 Yarn for over utilization of memory. Now, these are my memory settings
 (I will paste the entire app launch params later in the email):
 --driver-memory 2G \
 --executor-memory 16G \
 --spark.yarn.executor.memoryOverhead 4096 \
 --spark.yarn.driver.memoryOverhead 1024 \

 Your total executor JVM will consume executor-memory minus
 spark.yarn.executor.memoryOverhead so you should see each executor
 JVM consuming no more than 12G, in this case.

 Here is how I launch my app:
 run=`date +%m-%d-%YT%T`; \
 nohup spark-submit --class myAwesomeApp \
 --master yarn myawesomeapp.jar \
 --jars 
 spark-streaming-kafka_2.10-1.0.0.jar,kafka_2.10-0.8.1.1.jar,zkclient-0.3.jar,metrics-core-2.2.0.jar,json4s-jackson_2.10-3.2.10.jar
 \
 --driver-memory 2G \
 --executor-memory 16G \
 --executor-cores 16 \
 --num-executors 10 \
 --spark.serializer org.apache.spark.serializer.KryoSerializer \
 --spark.rdd.compress true \
 --spark.io.compression.codec org.apache.spark.io.SnappyCompressionCodec \
 --spark.akka.threads 64 \
 --spark.akka.frameSize 500 \
 --spark.task.maxFailures 64 \
 --spark.scheduler.mode FAIR \
 --spark.yarn.executor.memoryOverhead 4096 \
 --spark.yarn.driver.memoryOverhead 1024 \
 --spark.shuffle.consolidateFiles true \
 --spark.default.parallelism 528 \
logs/normRunLog-$run.log \
 2logs/normRunLogError-$run.log  \
 echo $!  logs/current-run.pid

 Some code optimizations (or, goof ups that I fixed). I did not
 scientifically measure the impact of each but I think they helped:
 - Made all my classes and objects serializable and then use Kryo (as
 you see above)
 - I map one receive task for each kafka partition
 - Instead of doing a union on all the incoming streams and then
 repartition() I now repartition() each incoming stream and process
 them separately. I believe this reduces shuffle.
 - Reduced number of repartitions. I was doing 128 after