I'd be interested to understand this mechanism as well. But this is the error recovery part of the equation. Consuming from Kafka has two aspects - parallelism and error recovery and I am not sure how either works. For error recovery, I would like to understand how: - A failed receiver gets re-spawned. In 1.0.0, despite settings failed tasks threshold to 64, my job aborts after 4 receiver task failures. - Data loss recovery due to a failed receiver task/executor.
> For parallelism, I would expect a single createStream() to intelligently map a receiver thread somewhere, one for each kafka partition, but in different JVMs. Also, repartition() does not seem to work as advertised. A repartition(512) should get nodes other than the receiver nodes to get some RDDs to process. No? On Sat, Aug 30, 2014 at 7:14 PM, Roger Hoover <roger.hoo...@gmail.com> wrote: > I have this same question. Isn't there somewhere that the Kafka range > metadata can be saved? From my naive perspective, it seems like it should > be very similar to HDFS lineage. The original HDFS blocks are kept > somewhere (in the driver?) so that if an RDD partition is lost, it can be > recomputed. In this case, all we need is the Kafka topic, partition, and > offset range. > > Can someone enlighten us on why two copies of the RDD are needed (or some > other mechanism like a WAL) for fault tolerance when using Kafka but not > when reading from say HDFS? > > > On Fri, Aug 29, 2014 at 8:58 AM, Jonathan Hodges <hodg...@gmail.com> > wrote: > >> 'this 2-node replication is mainly for failover in case the receiver >> dies while data is in flight. there's still chance for data loss as >> there's no write ahead log on the hot path, but this is being addressed.' >> >> Can you comment a little on how this will be addressed, will there be a >> durable WAL? Is there a JIRA for tracking this effort? >> >> I am curious without WAL if you can avoid this data loss with explicit >> management of Kafka offsets e.g. don't commit offset unless data is >> replicated to multiple nodes or maybe not until processed. The incoming >> data will always be durably stored to disk in Kafka so can be replayed in >> failure scenarios to avoid data loss if the offsets are managed properly. >> >> >> >> >> On Thu, Aug 28, 2014 at 12:02 PM, Chris Fregly <ch...@fregly.com> wrote: >> >>> @bharat- >>> >>> overall, i've noticed a lot of confusion about how Spark Streaming >>> scales - as well as how it handles failover and checkpointing, but we can >>> discuss that separately. >>> >>> there's actually 2 dimensions to scaling here: receiving and processing. >>> >>> *Receiving* >>> receiving can be scaled out by submitting new DStreams/Receivers to the >>> cluster as i've done in the Kinesis example. in fact, i purposely chose to >>> submit multiple receivers in my Kinesis example because i feel it should be >>> the norm and not the exception - particularly for partitioned and >>> checkpoint-capable streaming systems like Kafka and Kinesis. it's the >>> only way to scale. >>> >>> a side note here is that each receiver running in the cluster will >>> immediately replicates to 1 other node for fault-tolerance of that specific >>> receiver. this is where the confusion lies. this 2-node replication is >>> mainly for failover in case the receiver dies while data is in flight. >>> there's still chance for data loss as there's no write ahead log on the >>> hot path, but this is being addressed. >>> >>> this in mentioned in the docs here: >>> https://spark.apache.org/docs/latest/streaming-programming-guide.html#level-of-parallelism-in-data-receiving >>> >>> *Processing* >>> once data is received, tasks are scheduled across the Spark cluster just >>> like any other non-streaming task where you can specify the number of >>> partitions for reduces, etc. this is the part of scaling that is sometimes >>> overlooked - probably because it "works just like regular Spark", but it is >>> worth highlighting. >>> >>> Here's a blurb in the docs: >>> https://spark.apache.org/docs/latest/streaming-programming-guide.html#level-of-parallelism-in-data-processing >>> >>> the other thing that's confusing with Spark Streaming is that in Scala, >>> you need to explicitly >>> >>> import >>> org.apache.spark.streaming.StreamingContext.toPairDStreamFunctions >>> >>> in order to pick up the implicits that allow DStream.reduceByKey and >>> such (versus DStream.transform(rddBatch => rddBatch.reduceByKey()) >>> >>> in other words, DStreams appear to be relatively featureless until you >>> discover this implicit. otherwise, you need to operate on the underlying >>> RDD's explicitly which is not ideal. >>> >>> the Kinesis example referenced earlier in the thread uses the DStream >>> implicits. >>> >>> >>> side note to all of this - i've recently convinced my publisher for my >>> upcoming book, Spark In Action, to let me jump ahead and write the Spark >>> Streaming chapter ahead of other more well-understood libraries. early >>> release is in a month or so. sign up @ http://sparkinaction.com if >>> you wanna get notified. >>> >>> shameless plug that i wouldn't otherwise do, but i really think it will >>> help clear a lot of confusion in this area as i hear these questions asked >>> a lot in my talks and such. and i think a clear, crisp story on scaling >>> and fault-tolerance will help Spark Streaming's adoption. >>> >>> hope that helps! >>> >>> -chris >>> >>> >>> >>> >>> On Wed, Aug 27, 2014 at 6:32 PM, Dibyendu Bhattacharya < >>> dibyendu.bhattach...@gmail.com> wrote: >>> >>>> I agree. This issue should be fixed in Spark rather rely on replay of >>>> Kafka messages. >>>> >>>> Dib >>>> On Aug 28, 2014 6:45 AM, "RodrigoB" <rodrigo.boav...@aspect.com> wrote: >>>> >>>>> Dibyendu, >>>>> >>>>> Tnks for getting back. >>>>> >>>>> I believe you are absolutely right. We were under the assumption that >>>>> the >>>>> raw data was being computed again and that's not happening after >>>>> further >>>>> tests. This applies to Kafka as well. >>>>> >>>>> The issue is of major priority fortunately. >>>>> >>>>> Regarding your suggestion, I would maybe prefer to have the problem >>>>> resolved >>>>> within Spark's internals since once the data is replicated we should >>>>> be able >>>>> to access it once more and not having to pool it back again from Kafka >>>>> or >>>>> any other stream that is being affected by this issue. If for example >>>>> there >>>>> is a big amount of batches to be recomputed I would rather have them >>>>> done >>>>> distributed than overloading the batch interval with huge amount of >>>>> Kafka >>>>> messages. >>>>> >>>>> I do not have yet enough know how on where is the issue and about the >>>>> internal Spark code so I can't really how much difficult will be the >>>>> implementation. >>>>> >>>>> tnks, >>>>> Rod >>>>> >>>>> >>>>> >>>>> -- >>>>> View this message in context: >>>>> http://apache-spark-user-list.1001560.n3.nabble.com/Low-Level-Kafka-Consumer-for-Spark-tp11258p12966.html >>>>> Sent from the Apache Spark User List mailing list archive at >>>>> Nabble.com. >>>>> >>>>> --------------------------------------------------------------------- >>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >>>>> For additional commands, e-mail: user-h...@spark.apache.org >>>>> >>>>> >>> >> >