Dibyendu - I am using the Kafka consumer built into Spark streaming.
Pulled the jar from here:
http://search.maven.org/remotecontent?filepath=org/apache/spark/spark-streaming-kafka_2.10/1.0.0/spark-streaming-kafka_2.10-1.0.0.jar

Thanks for the sbt-assembly link, Soumitra.

On Wed, Sep 17, 2014 at 5:50 PM, Dibyendu Bhattacharya
<dibyendu.bhattach...@gmail.com> wrote:
> Hi Tim
>
> Just curious to know ; Which Kafka Consumer you have used ?
>
> Dib
>
> On Sep 18, 2014 4:40 AM, "Tim Smith" <secs...@gmail.com> wrote:
>>
>> Thanks :)
>>
>> On Wed, Sep 17, 2014 at 2:10 PM, Paul Wais <pw...@yelp.com> wrote:
>> > Thanks Tim, this is super helpful!
>> >
>> > Question about jars and spark-submit:  why do you provide
>> > myawesomeapp.jar as the program jar but then include other jars via
>> > the --jars argument?  Have you tried building one uber jar with all
>> > dependencies and just sending that to Spark as your app jar?
>>
>> I guess that is mostly because I am Scala/sbt noob :) How do I create
>> the uber jar? My .sbt file says:
>> name := "My Awesome App"
>> version := "1.025"
>> scalaVersion := "2.10.4"
>> resolvers += "Apache repo" at
>> "https://repository.apache.org/content/repositories/releases";
>> libraryDependencies += "org.apache.spark" %% "spark-core" % "1.0.0"
>> libraryDependencies += "org.apache.spark" %% "spark-streaming" % "1.0.0"
>> libraryDependencies += "org.apache.spark" %% "spark-streaming-kafka" %
>> "1.0.0"
>> libraryDependencies += "org.apache.kafka" %% "kafka" % "0.8.1.1"
>>
>> Then I run "sbt package" to generate myawesomeapp.jar.
>>
>> >
>> > Also, have you ever seen any issues with Spark caching your app jar
>> > between runs even if it changes?
>>
>> Not that I can tell but then maybe because I use Yarn, I might be
>> shielded from some jar distribution bugs in Spark?
>>
>>
>> >
>> > On Wed, Sep 17, 2014 at 1:11 PM, Tim Smith <secs...@gmail.com> wrote:
>> >> I don't have anything in production yet but I now at least have a
>> >> stable (running for more than 24 hours) streaming app. Earlier, the
>> >> app would crash for all sorts of reasons. Caveats/setup:
>> >> - Spark 1.0.0 (I have no input flow control unlike Spark 1.1)
>> >> - Yarn for RM
>> >> - Input and Output to Kafka
>> >> - CDH 5.1
>> >> - 11 node cluster with 32-cores and 48G max container size for each
>> >> node (Yarn managed)
>> >> - 5 partition Kafka topic - both in and out
>> >> - Roughly, an average of 25k messages per second
>> >> - App written in Scala (warning: I am a Scala noob)
>> >>
>> >> Few things I had to add/tweak to get the app to be stable:
>> >> - The executor JVMs did not have any GC options set, by default. This
>> >> might be more of a CDH issue. I noticed that while the Yarn container
>> >> and other Spark ancillary tasks had GC options set at launch but none
>> >> for the executors. So I played with different GC options and this
>> >> worked best:
>> >> SPARK_JAVA_OPTS="-XX:MaxPermSize=512m -XX:NewSize=1024m
>> >> -XX:+UseConcMarkSweepGC -XX:CMSInitiatingOccupancyFraction=70
>> >> -XX:+AggressiveHeap -XX:MaxHeapFreeRatio=70 -verbosegc
>> >> -XX:+PrintGCDetails"
>> >>
>> >> I tried G1GC but for some reason it just didn't work. I am not a Java
>> >> programmer or expert so my conclusion is purely trial and error based.
>> >> The GC logs, with these flags, go to the "stdout" file in the Yarn
>> >> container logs on each node/worker. You can set SPARK_JAVA_OPTS in
>> >> spark-env.sh on the driver node and Yarn will respect these. On CDH/CM
>> >> specifically, even though you don't run Spark as a service (since you
>> >> are using Yarn for RM), you can goto "Spark Client Advanced
>> >> Configuration Snippet (Safety Valve) for spark-conf/spark-env.sh" and
>> >> set SPARK_JAVA_OPTS there.
>> >>
>> >> - Set these two params - "spark.yarn.executor.memoryOverhead"
>> >> "spark.yarn.driver.memoryOverhead". Earlier, my app would get killed
>> >> because the executors running the kafka receivers would get killed by
>> >> Yarn for over utilization of memory. Now, these are my memory settings
>> >> (I will paste the entire app launch params later in the email):
>> >> --driver-memory 2G \
>> >> --executor-memory 16G \
>> >> --spark.yarn.executor.memoryOverhead 4096 \
>> >> --spark.yarn.driver.memoryOverhead 1024 \
>> >>
>> >> Your total executor JVM will consume "executor-memory" minus
>> >> "spark.yarn.executor.memoryOverhead" so you should see each executor
>> >> JVM consuming no more than 12G, in this case.
>> >>
>> >> Here is how I launch my app:
>> >> run=`date +"%m-%d-%YT%T"`; \
>> >> nohup spark-submit --class myAwesomeApp \
>> >> --master yarn myawesomeapp.jar \
>> >> --jars
>> >> spark-streaming-kafka_2.10-1.0.0.jar,kafka_2.10-0.8.1.1.jar,zkclient-0.3.jar,metrics-core-2.2.0.jar,json4s-jackson_2.10-3.2.10.jar
>> >> \
>> >> --driver-memory 2G \
>> >> --executor-memory 16G \
>> >> --executor-cores 16 \
>> >> --num-executors 10 \
>> >> --spark.serializer org.apache.spark.serializer.KryoSerializer \
>> >> --spark.rdd.compress true \
>> >> --spark.io.compression.codec org.apache.spark.io.SnappyCompressionCodec
>> >> \
>> >> --spark.akka.threads 64 \
>> >> --spark.akka.frameSize 500 \
>> >> --spark.task.maxFailures 64 \
>> >> --spark.scheduler.mode FAIR \
>> >> --spark.yarn.executor.memoryOverhead 4096 \
>> >> --spark.yarn.driver.memoryOverhead 1024 \
>> >> --spark.shuffle.consolidateFiles true \
>> >> --spark.default.parallelism 528 \
>> >>>logs/normRunLog-$run.log \
>> >> 2>logs/normRunLogError-$run.log & \
>> >> echo $! > logs/current-run.pid
>> >>
>> >> Some code optimizations (or, goof ups that I fixed). I did not
>> >> scientifically measure the impact of each but I think they helped:
>> >> - Made all my classes and objects serializable and then use Kryo (as
>> >> you see above)
>> >> - I map one receive task for each kafka partition
>> >> - Instead of doing a "union" on all the incoming streams and then
>> >> repartition() I now repartition() each incoming stream and process
>> >> them separately. I believe this reduces shuffle.
>> >> - Reduced number of repartitions. I was doing 128 after doing a
>> >> "union" on all incoming dStreams. I now repartition each of the five
>> >> streams separately (in a loop) to 24.
>> >> - For each RDD, I set storagelevel to "MEMORY_AND_DISK_SER"
>> >> - Process data per partition instead of per RDD: dataout.foreachRDD(
>> >> rdd => rdd.foreachPartition(rec => { myFunc(rec) }) )
>> >> - Specific to kafka: when I create "new Producer", make sure I "close"
>> >> it else I had a ton of "too many files open" errors :)
>> >> - Use immutable objects as far as possible. If I use mutable objects
>> >> within a method/class then I turn them into immutable before passing
>> >> onto another class/method.
>> >> - For logging, create a LogService object that I then use for other
>> >> object/class declarations. Once instantiated, I can make "logInfo"
>> >> calls from within other Objects/Methods/Classes and output goes to the
>> >> "stderr" file in the Yarn container logs. Good for debugging stream
>> >> processing logic.
>> >>
>> >> Currently, my processing delay is lower than my dStream time window so
>> >> all is good. I get a ton of these errors in my driver logs:
>> >> 14/09/16 21:17:40 ERROR LiveListenerBus: Listener JobProgressListener
>> >> threw an exception
>> >>
>> >> These seem related to: https://issues.apache.org/jira/browse/SPARK-2316
>> >>
>> >> Best I understand and have been told, this does not affect data
>> >> integrity but may cause un-necessary recomputes.
>> >>
>> >> Hope this helps,
>> >>
>> >> Tim
>> >>
>> >>
>> >> On Wed, Sep 17, 2014 at 8:30 AM, Soumitra Kumar
>> >> <kumar.soumi...@gmail.com> wrote:
>> >>> Hmm, no response to this thread!
>> >>>
>> >>> Adding to it, please share experiences of building an enterprise grade
>> >>> product based on Spark Streaming.
>> >>>
>> >>> I am exploring Spark Streaming for enterprise software and am
>> >>> cautiously optimistic about it. I see huge potential to improve
>> >>> debuggability of Spark.
>> >>>
>> >>> ----- Original Message -----
>> >>> From: "Tim Smith" <secs...@gmail.com>
>> >>> To: "spark users" <user@spark.apache.org>
>> >>> Sent: Friday, September 12, 2014 10:09:53 AM
>> >>> Subject: Stable spark streaming app
>> >>>
>> >>> Hi,
>> >>>
>> >>> Anyone have a stable streaming app running in "production"? Can you
>> >>> share some overview of the app and setup like number of nodes, events
>> >>> per second, broad stream processing workflow, config highlights etc?
>> >>>
>> >>> Thanks,
>> >>>
>> >>> Tim
>> >>>
>> >>> ---------------------------------------------------------------------
>> >>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> >>> For additional commands, e-mail: user-h...@spark.apache.org
>> >>>
>> >>
>> >> ---------------------------------------------------------------------
>> >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> >> For additional commands, e-mail: user-h...@spark.apache.org
>> >>
>>
>> ---------------------------------------------------------------------
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to