Thanks :)

On Wed, Sep 17, 2014 at 2:10 PM, Paul Wais <pw...@yelp.com> wrote:
> Thanks Tim, this is super helpful!
>
> Question about jars and spark-submit:  why do you provide
> myawesomeapp.jar as the program jar but then include other jars via
> the --jars argument?  Have you tried building one uber jar with all
> dependencies and just sending that to Spark as your app jar?

I guess that is mostly because I am Scala/sbt noob :) How do I create
the uber jar? My .sbt file says:
name := "My Awesome App"
version := "1.025"
scalaVersion := "2.10.4"
resolvers += "Apache repo" at
"https://repository.apache.org/content/repositories/releases";
libraryDependencies += "org.apache.spark" %% "spark-core" % "1.0.0"
libraryDependencies += "org.apache.spark" %% "spark-streaming" % "1.0.0"
libraryDependencies += "org.apache.spark" %% "spark-streaming-kafka" % "1.0.0"
libraryDependencies += "org.apache.kafka" %% "kafka" % "0.8.1.1"

Then I run "sbt package" to generate myawesomeapp.jar.

>
> Also, have you ever seen any issues with Spark caching your app jar
> between runs even if it changes?

Not that I can tell but then maybe because I use Yarn, I might be
shielded from some jar distribution bugs in Spark?


>
> On Wed, Sep 17, 2014 at 1:11 PM, Tim Smith <secs...@gmail.com> wrote:
>> I don't have anything in production yet but I now at least have a
>> stable (running for more than 24 hours) streaming app. Earlier, the
>> app would crash for all sorts of reasons. Caveats/setup:
>> - Spark 1.0.0 (I have no input flow control unlike Spark 1.1)
>> - Yarn for RM
>> - Input and Output to Kafka
>> - CDH 5.1
>> - 11 node cluster with 32-cores and 48G max container size for each
>> node (Yarn managed)
>> - 5 partition Kafka topic - both in and out
>> - Roughly, an average of 25k messages per second
>> - App written in Scala (warning: I am a Scala noob)
>>
>> Few things I had to add/tweak to get the app to be stable:
>> - The executor JVMs did not have any GC options set, by default. This
>> might be more of a CDH issue. I noticed that while the Yarn container
>> and other Spark ancillary tasks had GC options set at launch but none
>> for the executors. So I played with different GC options and this
>> worked best:
>> SPARK_JAVA_OPTS="-XX:MaxPermSize=512m -XX:NewSize=1024m
>> -XX:+UseConcMarkSweepGC -XX:CMSInitiatingOccupancyFraction=70
>> -XX:+AggressiveHeap -XX:MaxHeapFreeRatio=70 -verbosegc
>> -XX:+PrintGCDetails"
>>
>> I tried G1GC but for some reason it just didn't work. I am not a Java
>> programmer or expert so my conclusion is purely trial and error based.
>> The GC logs, with these flags, go to the "stdout" file in the Yarn
>> container logs on each node/worker. You can set SPARK_JAVA_OPTS in
>> spark-env.sh on the driver node and Yarn will respect these. On CDH/CM
>> specifically, even though you don't run Spark as a service (since you
>> are using Yarn for RM), you can goto "Spark Client Advanced
>> Configuration Snippet (Safety Valve) for spark-conf/spark-env.sh" and
>> set SPARK_JAVA_OPTS there.
>>
>> - Set these two params - "spark.yarn.executor.memoryOverhead"
>> "spark.yarn.driver.memoryOverhead". Earlier, my app would get killed
>> because the executors running the kafka receivers would get killed by
>> Yarn for over utilization of memory. Now, these are my memory settings
>> (I will paste the entire app launch params later in the email):
>> --driver-memory 2G \
>> --executor-memory 16G \
>> --spark.yarn.executor.memoryOverhead 4096 \
>> --spark.yarn.driver.memoryOverhead 1024 \
>>
>> Your total executor JVM will consume "executor-memory" minus
>> "spark.yarn.executor.memoryOverhead" so you should see each executor
>> JVM consuming no more than 12G, in this case.
>>
>> Here is how I launch my app:
>> run=`date +"%m-%d-%YT%T"`; \
>> nohup spark-submit --class myAwesomeApp \
>> --master yarn myawesomeapp.jar \
>> --jars 
>> spark-streaming-kafka_2.10-1.0.0.jar,kafka_2.10-0.8.1.1.jar,zkclient-0.3.jar,metrics-core-2.2.0.jar,json4s-jackson_2.10-3.2.10.jar
>> \
>> --driver-memory 2G \
>> --executor-memory 16G \
>> --executor-cores 16 \
>> --num-executors 10 \
>> --spark.serializer org.apache.spark.serializer.KryoSerializer \
>> --spark.rdd.compress true \
>> --spark.io.compression.codec org.apache.spark.io.SnappyCompressionCodec \
>> --spark.akka.threads 64 \
>> --spark.akka.frameSize 500 \
>> --spark.task.maxFailures 64 \
>> --spark.scheduler.mode FAIR \
>> --spark.yarn.executor.memoryOverhead 4096 \
>> --spark.yarn.driver.memoryOverhead 1024 \
>> --spark.shuffle.consolidateFiles true \
>> --spark.default.parallelism 528 \
>>>logs/normRunLog-$run.log \
>> 2>logs/normRunLogError-$run.log & \
>> echo $! > logs/current-run.pid
>>
>> Some code optimizations (or, goof ups that I fixed). I did not
>> scientifically measure the impact of each but I think they helped:
>> - Made all my classes and objects serializable and then use Kryo (as
>> you see above)
>> - I map one receive task for each kafka partition
>> - Instead of doing a "union" on all the incoming streams and then
>> repartition() I now repartition() each incoming stream and process
>> them separately. I believe this reduces shuffle.
>> - Reduced number of repartitions. I was doing 128 after doing a
>> "union" on all incoming dStreams. I now repartition each of the five
>> streams separately (in a loop) to 24.
>> - For each RDD, I set storagelevel to "MEMORY_AND_DISK_SER"
>> - Process data per partition instead of per RDD: dataout.foreachRDD(
>> rdd => rdd.foreachPartition(rec => { myFunc(rec) }) )
>> - Specific to kafka: when I create "new Producer", make sure I "close"
>> it else I had a ton of "too many files open" errors :)
>> - Use immutable objects as far as possible. If I use mutable objects
>> within a method/class then I turn them into immutable before passing
>> onto another class/method.
>> - For logging, create a LogService object that I then use for other
>> object/class declarations. Once instantiated, I can make "logInfo"
>> calls from within other Objects/Methods/Classes and output goes to the
>> "stderr" file in the Yarn container logs. Good for debugging stream
>> processing logic.
>>
>> Currently, my processing delay is lower than my dStream time window so
>> all is good. I get a ton of these errors in my driver logs:
>> 14/09/16 21:17:40 ERROR LiveListenerBus: Listener JobProgressListener
>> threw an exception
>>
>> These seem related to: https://issues.apache.org/jira/browse/SPARK-2316
>>
>> Best I understand and have been told, this does not affect data
>> integrity but may cause un-necessary recomputes.
>>
>> Hope this helps,
>>
>> Tim
>>
>>
>> On Wed, Sep 17, 2014 at 8:30 AM, Soumitra Kumar
>> <kumar.soumi...@gmail.com> wrote:
>>> Hmm, no response to this thread!
>>>
>>> Adding to it, please share experiences of building an enterprise grade 
>>> product based on Spark Streaming.
>>>
>>> I am exploring Spark Streaming for enterprise software and am cautiously 
>>> optimistic about it. I see huge potential to improve debuggability of Spark.
>>>
>>> ----- Original Message -----
>>> From: "Tim Smith" <secs...@gmail.com>
>>> To: "spark users" <user@spark.apache.org>
>>> Sent: Friday, September 12, 2014 10:09:53 AM
>>> Subject: Stable spark streaming app
>>>
>>> Hi,
>>>
>>> Anyone have a stable streaming app running in "production"? Can you
>>> share some overview of the app and setup like number of nodes, events
>>> per second, broad stream processing workflow, config highlights etc?
>>>
>>> Thanks,
>>>
>>> Tim
>>>
>>> ---------------------------------------------------------------------
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>
>>
>> ---------------------------------------------------------------------
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to