Thanks :) On Wed, Sep 17, 2014 at 2:10 PM, Paul Wais <pw...@yelp.com> wrote: > Thanks Tim, this is super helpful! > > Question about jars and spark-submit: why do you provide > myawesomeapp.jar as the program jar but then include other jars via > the --jars argument? Have you tried building one uber jar with all > dependencies and just sending that to Spark as your app jar?
I guess that is mostly because I am Scala/sbt noob :) How do I create the uber jar? My .sbt file says: name := "My Awesome App" version := "1.025" scalaVersion := "2.10.4" resolvers += "Apache repo" at "https://repository.apache.org/content/repositories/releases" libraryDependencies += "org.apache.spark" %% "spark-core" % "1.0.0" libraryDependencies += "org.apache.spark" %% "spark-streaming" % "1.0.0" libraryDependencies += "org.apache.spark" %% "spark-streaming-kafka" % "1.0.0" libraryDependencies += "org.apache.kafka" %% "kafka" % "0.8.1.1" Then I run "sbt package" to generate myawesomeapp.jar. > > Also, have you ever seen any issues with Spark caching your app jar > between runs even if it changes? Not that I can tell but then maybe because I use Yarn, I might be shielded from some jar distribution bugs in Spark? > > On Wed, Sep 17, 2014 at 1:11 PM, Tim Smith <secs...@gmail.com> wrote: >> I don't have anything in production yet but I now at least have a >> stable (running for more than 24 hours) streaming app. Earlier, the >> app would crash for all sorts of reasons. Caveats/setup: >> - Spark 1.0.0 (I have no input flow control unlike Spark 1.1) >> - Yarn for RM >> - Input and Output to Kafka >> - CDH 5.1 >> - 11 node cluster with 32-cores and 48G max container size for each >> node (Yarn managed) >> - 5 partition Kafka topic - both in and out >> - Roughly, an average of 25k messages per second >> - App written in Scala (warning: I am a Scala noob) >> >> Few things I had to add/tweak to get the app to be stable: >> - The executor JVMs did not have any GC options set, by default. This >> might be more of a CDH issue. I noticed that while the Yarn container >> and other Spark ancillary tasks had GC options set at launch but none >> for the executors. So I played with different GC options and this >> worked best: >> SPARK_JAVA_OPTS="-XX:MaxPermSize=512m -XX:NewSize=1024m >> -XX:+UseConcMarkSweepGC -XX:CMSInitiatingOccupancyFraction=70 >> -XX:+AggressiveHeap -XX:MaxHeapFreeRatio=70 -verbosegc >> -XX:+PrintGCDetails" >> >> I tried G1GC but for some reason it just didn't work. I am not a Java >> programmer or expert so my conclusion is purely trial and error based. >> The GC logs, with these flags, go to the "stdout" file in the Yarn >> container logs on each node/worker. You can set SPARK_JAVA_OPTS in >> spark-env.sh on the driver node and Yarn will respect these. On CDH/CM >> specifically, even though you don't run Spark as a service (since you >> are using Yarn for RM), you can goto "Spark Client Advanced >> Configuration Snippet (Safety Valve) for spark-conf/spark-env.sh" and >> set SPARK_JAVA_OPTS there. >> >> - Set these two params - "spark.yarn.executor.memoryOverhead" >> "spark.yarn.driver.memoryOverhead". Earlier, my app would get killed >> because the executors running the kafka receivers would get killed by >> Yarn for over utilization of memory. Now, these are my memory settings >> (I will paste the entire app launch params later in the email): >> --driver-memory 2G \ >> --executor-memory 16G \ >> --spark.yarn.executor.memoryOverhead 4096 \ >> --spark.yarn.driver.memoryOverhead 1024 \ >> >> Your total executor JVM will consume "executor-memory" minus >> "spark.yarn.executor.memoryOverhead" so you should see each executor >> JVM consuming no more than 12G, in this case. >> >> Here is how I launch my app: >> run=`date +"%m-%d-%YT%T"`; \ >> nohup spark-submit --class myAwesomeApp \ >> --master yarn myawesomeapp.jar \ >> --jars >> spark-streaming-kafka_2.10-1.0.0.jar,kafka_2.10-0.8.1.1.jar,zkclient-0.3.jar,metrics-core-2.2.0.jar,json4s-jackson_2.10-3.2.10.jar >> \ >> --driver-memory 2G \ >> --executor-memory 16G \ >> --executor-cores 16 \ >> --num-executors 10 \ >> --spark.serializer org.apache.spark.serializer.KryoSerializer \ >> --spark.rdd.compress true \ >> --spark.io.compression.codec org.apache.spark.io.SnappyCompressionCodec \ >> --spark.akka.threads 64 \ >> --spark.akka.frameSize 500 \ >> --spark.task.maxFailures 64 \ >> --spark.scheduler.mode FAIR \ >> --spark.yarn.executor.memoryOverhead 4096 \ >> --spark.yarn.driver.memoryOverhead 1024 \ >> --spark.shuffle.consolidateFiles true \ >> --spark.default.parallelism 528 \ >>>logs/normRunLog-$run.log \ >> 2>logs/normRunLogError-$run.log & \ >> echo $! > logs/current-run.pid >> >> Some code optimizations (or, goof ups that I fixed). I did not >> scientifically measure the impact of each but I think they helped: >> - Made all my classes and objects serializable and then use Kryo (as >> you see above) >> - I map one receive task for each kafka partition >> - Instead of doing a "union" on all the incoming streams and then >> repartition() I now repartition() each incoming stream and process >> them separately. I believe this reduces shuffle. >> - Reduced number of repartitions. I was doing 128 after doing a >> "union" on all incoming dStreams. I now repartition each of the five >> streams separately (in a loop) to 24. >> - For each RDD, I set storagelevel to "MEMORY_AND_DISK_SER" >> - Process data per partition instead of per RDD: dataout.foreachRDD( >> rdd => rdd.foreachPartition(rec => { myFunc(rec) }) ) >> - Specific to kafka: when I create "new Producer", make sure I "close" >> it else I had a ton of "too many files open" errors :) >> - Use immutable objects as far as possible. If I use mutable objects >> within a method/class then I turn them into immutable before passing >> onto another class/method. >> - For logging, create a LogService object that I then use for other >> object/class declarations. Once instantiated, I can make "logInfo" >> calls from within other Objects/Methods/Classes and output goes to the >> "stderr" file in the Yarn container logs. Good for debugging stream >> processing logic. >> >> Currently, my processing delay is lower than my dStream time window so >> all is good. I get a ton of these errors in my driver logs: >> 14/09/16 21:17:40 ERROR LiveListenerBus: Listener JobProgressListener >> threw an exception >> >> These seem related to: https://issues.apache.org/jira/browse/SPARK-2316 >> >> Best I understand and have been told, this does not affect data >> integrity but may cause un-necessary recomputes. >> >> Hope this helps, >> >> Tim >> >> >> On Wed, Sep 17, 2014 at 8:30 AM, Soumitra Kumar >> <kumar.soumi...@gmail.com> wrote: >>> Hmm, no response to this thread! >>> >>> Adding to it, please share experiences of building an enterprise grade >>> product based on Spark Streaming. >>> >>> I am exploring Spark Streaming for enterprise software and am cautiously >>> optimistic about it. I see huge potential to improve debuggability of Spark. >>> >>> ----- Original Message ----- >>> From: "Tim Smith" <secs...@gmail.com> >>> To: "spark users" <user@spark.apache.org> >>> Sent: Friday, September 12, 2014 10:09:53 AM >>> Subject: Stable spark streaming app >>> >>> Hi, >>> >>> Anyone have a stable streaming app running in "production"? Can you >>> share some overview of the app and setup like number of nodes, events >>> per second, broad stream processing workflow, config highlights etc? >>> >>> Thanks, >>> >>> Tim >>> >>> --------------------------------------------------------------------- >>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >>> For additional commands, e-mail: user-h...@spark.apache.org >>> >> >> --------------------------------------------------------------------- >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >> For additional commands, e-mail: user-h...@spark.apache.org >> --------------------------------------------------------------------- To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org