auto.offset.reset only applies when there are no starting offsets (either from a checkpoint, or from you providing them explicitly)
On Fri, Jun 19, 2015 at 6:10 AM, bit1...@163.com <bit1...@163.com> wrote: > > I think your observation is correct, you have to take care of these > replayed data at your end,eg,each message has a unique id or something else. > > I am using "I think" in the above sentense, because I am not sure and I > also have a related question: > I am wonderring how direct stream + kakfa is implemented when the Driver > is down and restarted, will it always first replay the checkpointed failed > batch or will it honor Kafka's offset reset policy(auto.offset.reset). If > it honors the reset policy and it is set as "smallest", then it is the at > least once semantics; if it set "largest", then it will be at most once > semantics? > > > ------------------------------ > bit1...@163.com > > > *From:* Haopu Wang <hw...@qilinsoft.com> > *Date:* 2015-06-19 18:47 > *To:* Enno Shioji <eshi...@gmail.com>; Tathagata Das <t...@databricks.com> > *CC:* prajod.vettiyat...@wipro.com; Cody Koeninger <c...@koeninger.org>; > bit1...@163.com; Jordan Pilat <jrpi...@gmail.com>; Will Briggs > <wrbri...@gmail.com>; Ashish Soni <asoni.le...@gmail.com>; ayan guha > <guha.a...@gmail.com>; user@spark.apache.org; Sateesh Kavuri > <sateesh.kav...@gmail.com>; Spark Enthusiast <sparkenthusi...@yahoo.in>; > Sabarish > Sasidharan <sabarish.sasidha...@manthan.com> > *Subject:* RE: RE: Spark or Storm > > My question is not directly related: about the "exactly-once semantic", > the document (copied below) said spark streaming gives exactly-once > semantic, but actually from my test result, with check-point enabled, the > application always re-process the files in last batch after gracefully > restart. > > > > ====== > *Semantics of Received Data* > > Different input sources provide different guarantees, ranging from *at-least > once* to *exactly once*. Read for more details. > *With Files* > > If all of the input data is already present in a fault-tolerant files > system like HDFS, Spark Streaming can always recover from any failure and > process all the data. This gives *exactly-once* semantics, that all the > data will be processed exactly once no matter what fails. > > > > > ------------------------------ > > *From:* Enno Shioji [mailto:eshi...@gmail.com] > *Sent:* Friday, June 19, 2015 5:29 PM > *To:* Tathagata Das > *Cc:* prajod.vettiyat...@wipro.com; Cody Koeninger; bit1...@163.com; > Jordan Pilat; Will Briggs; Ashish Soni; ayan guha; user@spark.apache.org; > Sateesh Kavuri; Spark Enthusiast; Sabarish Sasidharan > *Subject:* Re: RE: Spark or Storm > > > > Fair enough, on second thought, just saying that it should be idempotent > is indeed more confusing. > > > > I guess the crux of the confusion comes from the fact that people tend to > assume the work you described (store batch id and skip etc.) is handled by > the framework, perhaps partly because Storm Trident does handle it (you > just need to let Storm know if the output operation has succeeded or not, > and it handles the batch id storing & skipping business). Whenever I > explain people that one needs to do this additional work you described to > get end-to-end exactly-once semantics, it usually takes a while to convince > them. In my limited experience, they tend to interpret "transactional" in > that sentence to mean that you just have to write to a transactional > storage like ACID RDB. Pointing them to "Semantics of output operations" is > usually sufficient though. > > > > Maybe others like @Ashish can weigh on this; did you interpret it in this > way? > > > > What if we change the statement into: > > "end-to-end exactly-once semantics (if your updates to downstream systems > are idempotent or transactional). To learn how to make your updates > idempotent or transactional, see the "Semantics of output operations" > section in this chapter > <https://spark.apache.org/docs/latest/streaming-programming-guide.html#fault-tolerance-semantics> > " > > > > That way, it's clear that it's not sufficient to merely write to a > "transactional storage" like ACID store. > > > > > > > > > > > > > > > > On Fri, Jun 19, 2015 at 9:08 AM, Tathagata Das <t...@databricks.com> > wrote: > > If the current documentation is confusing, we can definitely improve the > documentation. However, I dont not understand why is the term > "transactional" confusing. If your output operation has to add 5, then the > user has to implement the following mechanism > > > > 1. If the unique id of the batch of data is already present in the store, > then skip the update > > 2. Otherwise atomically do both, the update operation as well as store the > unique id of the batch. This is pretty much the definition of a > transaction. The user has to be aware of the transactional semantics of the > data store while implementing this functionality. > > > > You CAN argue that this effective makes the whole updating sort-a > idempotent, as even if you try doing it multiple times, it will update only > once. But that is not what is generally considered as idempotent. Writing a > fixed count, not an increment, is usually what is called idempotent. And so > just mentioning that the output operation must be idempotent is, in my > opinion, more confusing. > > > > To take a page out of the Storm / Trident guide, even they call this exact > conditional updating of Trident State as "transactional" operation. See > "transactional spout" in the Trident State guide - > https://storm.apache.org/documentation/Trident-state > > > > In the end, I am totally open the suggestions and PRs on how to make the > programming guide easier to understand. :) > > > > TD > > > > On Thu, Jun 18, 2015 at 11:47 PM, Enno Shioji <eshi...@gmail.com> wrote: > > Tbh I find the doc around this a bit confusing. If it says "end-to-end > exactly-once semantics (if your updates to downstream systems are > idempotent or transactional)", I think most people will interpret it that > as long as you use a storage which has atomicity (like MySQL/Postgres > etc.), a successful output operation for a given batch (let's say "+ 5") is > going to be issued exactly-once against the storage. > > > > However, as I understand it that's not what this statement means. What it > is saying is, it will always issue "+5" and never, say "+6", because it > makes sure a message is processed exactly-once internally. However, it > *may* issue "+5" more than once for a given batch, and it is up to the > developer to deal with this by either making the output operation > idempotent (e.g. "set 5"), or "transactional" (e.g. keep track of batch IDs > and skip already applied batches etc.). > > > > I wonder if it makes more sense to drop "or transactional" from the > statement, because if you think about it, ultimately what you are asked to > do is to make the writes idempotent even with the "transactional" approach, > & "transactional" is a bit loaded and would be prone to lead to > misunderstandings (even though in fairness, if you read the fault tolerance > chapter it explicitly explains it). > > > > > > > > On Fri, Jun 19, 2015 at 2:56 AM, <prajod.vettiyat...@wipro.com> wrote: > > More details on the Direct API of Spark 1.3 is at the databricks blog: > https://databricks.com/blog/2015/03/30/improvements-to-kafka-integration-of-spark-streaming.html > > > > Note the use of checkpoints to persist the Kafka offsets in Spark > Streaming itself, and not in zookeeper. > > > > Also this statement:”.. This allows one to build a Spark Streaming + > Kafka pipelines with end-to-end exactly-once semantics (if your updates to > downstream systems are idempotent or transactional).” > > > > > > *From:* Cody Koeninger [mailto:c...@koeninger.org] > *Sent:* 18 June 2015 19:38 > *To:* bit1...@163.com > *Cc:* Prajod S Vettiyattil (WT01 - BAS); jrpi...@gmail.com; > eshi...@gmail.com; wrbri...@gmail.com; asoni.le...@gmail.com; ayan guha; > user; sateesh.kav...@gmail.com; sparkenthusi...@yahoo.in; > sabarish.sasidha...@manthan.com > *Subject:* Re: RE: Spark or Storm > > > > That general description is accurate, but not really a specific issue of > the direct steam. It applies to anything consuming from kafka (or, as > Matei already said, any streaming system really). You can't have exactly > once semantics, unless you know something more about how you're storing > results. > > > > For "some unique id", topicpartition and offset is usually the obvious > choice, which is why it's important that the direct stream gives you access > to the offsets. > > > > See https://github.com/koeninger/kafka-exactly-once for more info > > > > > > > > On Thu, Jun 18, 2015 at 6:47 AM, bit1...@163.com <bit1...@163.com> wrote: > > I am wondering how direct stream api ensures end-to-end exactly once > semantics > > > > I think there are two things involved: > > 1. From the spark streaming end, the driver will replay the Offset range > when it's down and restarted,which means that the new tasks will process > some already processed data. > > 2. From the user end, since tasks may process already processed data, user > end should detect that some data has already been processed,eg, > > use some unique ID. > > > > Not sure if I have understood correctly. > > > > > ------------------------------ > > bit1...@163.com > > > > *From:* prajod.vettiyat...@wipro.com > > *Date:* 2015-06-18 16:56 > > *To:* jrpi...@gmail.com; eshi...@gmail.com > > *CC:* wrbri...@gmail.com; asoni.le...@gmail.com; guha.a...@gmail.com; > user@spark.apache.org; sateesh.kav...@gmail.com; sparkenthusi...@yahoo.in; > sabarish.sasidha...@manthan.com > > *Subject:* RE: Spark or Storm > > >>not being able to read from Kafka using multiple nodes > > > > > Kafka is plenty capable of doing this.. > > > > I faced the same issue before Spark 1.3 was released. > > > > The issue was not with Kafka, but with Spark Streaming’s Kafka connector. > Before Spark 1.3.0 release one Spark worker would get all the streamed > messages. We had to re-partition to distribute the processing. > > > > From Spark 1.3.0 release the Spark Direct API for Kafka supported parallel > reads from Kafka streamed to Spark workers. See the “Approach 2: Direct > Approach” in this page: > http://spark.apache.org/docs/1.3.0/streaming-kafka-integration.html. Note > that is also mentions zero data loss and exactly once semantics for kafka > integration. > > > > > > Prajod > > > > *From:* Jordan Pilat [mailto:jrpi...@gmail.com] > *Sent:* 18 June 2015 03:57 > *To:* Enno Shioji > *Cc:* Will Briggs; asoni.le...@gmail.com; ayan guha; user; Sateesh > Kavuri; Spark Enthusiast; Sabarish Sasidharan > *Subject:* Re: Spark or Storm > > > > >not being able to read from Kafka using multiple nodes > > Kafka is plenty capable of doing this, by clustering together multiple > consumer instances into a consumer group. > If your topic is sufficiently partitioned, the consumer group can consume > the topic in a parallelized fashion. > If it isn't, you still have the fault tolerance associated with clustering > the consumers. > > OK > JRP > > On Jun 17, 2015 1:27 AM, "Enno Shioji" <eshi...@gmail.com> wrote: > > We've evaluated Spark Streaming vs. Storm and ended up sticking with > Storm. > > > > Some of the important draw backs are: > > Spark has no back pressure (receiver rate limit can alleviate this to a > certain point, but it's far from ideal) > > There is also no exactly-once semantics. (updateStateByKey can achieve > this semantics, but is not practical if you have any significant amount of > state because it does so by dumping the entire state on every checkpointing) > > > > There are also some minor drawbacks that I'm sure will be fixed quickly, > like no task timeout, not being able to read from Kafka using multiple > nodes, data loss hazard with Kafka. > > > > It's also not possible to attain very low latency in Spark, if that's what > you need. > > > > The pos for Spark is the concise and IMO more intuitive syntax, especially > if you compare it with Storm's Java API. > > > > I admit I might be a bit biased towards Storm tho as I'm more familiar > with it. > > > > Also, you can do some processing with Kinesis. If all you need to do is > straight forward transformation and you are reading from Kinesis to begin > with, it might be an easier option to just do the transformation in Kinesis. > > > > > > > > > > > > On Wed, Jun 17, 2015 at 7:15 AM, Sabarish Sasidharan < > sabarish.sasidha...@manthan.com> wrote: > > Whatever you write in bolts would be the logic you want to apply on your > events. In Spark, that logic would be coded in map() or similar such > transformations and/or actions. Spark doesn't enforce a structure for > capturing your processing logic like Storm does. > > Regards > Sab > > Probably overloading the question a bit. > > In Storm, Bolts have the functionality of getting triggered on events. Is > that kind of functionality possible with Spark streaming? During each phase > of the data processing, the transformed data is stored to the database and > this transformed data should then be sent to a new pipeline for further > processing > > How can this be achieved using Spark? > > > > On Wed, Jun 17, 2015 at 10:10 AM, Spark Enthusiast < > sparkenthusi...@yahoo.in> wrote: > > I have a use-case where a stream of Incoming events have to be > aggregated and joined to create Complex events. The aggregation will have > to happen at an interval of 1 minute (or less). > > > > The pipeline is : > > send events > enrich event > > Upstream services -------------------> KAFKA ---------> event Stream > Processor ------------> Complex Event Processor ------------> Elastic > Search. > > > > From what I understand, Storm will make a very good ESP and Spark > Streaming will make a good CEP. > > > > But, we are also evaluating Storm with Trident. > > > > How does Spark Streaming compare with Storm with Trident? > > > > Sridhar Chellappa > > > > > > > > > > > > > > On Wednesday, 17 June 2015 10:02 AM, ayan guha <guha.a...@gmail.com> > wrote: > > > > I have a similar scenario where we need to bring data from kinesis to > hbase. Data volecity is 20k per 10 mins. Little manipulation of data will > be required but that's regardless of the tool so we will be writing that > piece in Java pojo. > > All env is on aws. Hbase is on a long running EMR and kinesis on a > separate cluster. > > TIA. > Best > Ayan > > On 17 Jun 2015 12:13, "Will Briggs" <wrbri...@gmail.com> wrote: > > The programming models for the two frameworks are conceptually rather > different; I haven't worked with Storm for quite some time, but based on my > old experience with it, I would equate Spark Streaming more with Storm's > Trident API, rather than with the raw Bolt API. Even then, there are > significant differences, but it's a bit closer. > > If you can share your use case, we might be able to provide better > guidance. > > Regards, > Will > > On June 16, 2015, at 9:46 PM, asoni.le...@gmail.com wrote: > > Hi All, > > I am evaluating spark VS storm ( spark streaming ) and i am not able to > see what is equivalent of Bolt in storm inside spark. > > Any help will be appreciated on this ? > > Thanks , > Ashish > --------------------------------------------------------------------- > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > > > --------------------------------------------------------------------- > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > > > > > > > > The information contained in this electronic message and any attachments > to this message are intended for the exclusive use of the addressee(s) and > may contain proprietary, confidential or privileged information. If you are > not the intended recipient, you should not disseminate, distribute or copy > this e-mail. Please notify the sender immediately and destroy all copies of > this message and any attachments. WARNING: Computer viruses can be > transmitted via email. The recipient should check this email and any > attachments for the presence of viruses. The company accepts no liability > for any damage caused by any virus transmitted by this email. > www.wipro.com > > > > The information contained in this electronic message and any attachments > to this message are intended for the exclusive use of the addressee(s) and > may contain proprietary, confidential or privileged information. If you are > not the intended recipient, you should not disseminate, distribute or copy > this e-mail. Please notify the sender immediately and destroy all copies of > this message and any attachments. WARNING: Computer viruses can be > transmitted via email. The recipient should check this email and any > attachments for the presence of viruses. The company accepts no liability > for any damage caused by any virus transmitted by this email. > www.wipro.com > > > > > > > >