My question is not directly related: about the "exactly-once semantic",
the document (copied below) said spark streaming gives exactly-once
semantic, but actually from my test result, with check-point enabled,
the application always re-process the files in last batch after
gracefully restart.

 

======


Semantics of Received Data


Different input sources provide different guarantees, ranging from
at-least once to exactly once. Read for more details.


With Files


If all of the input data is already present in a fault-tolerant files
system like HDFS, Spark Streaming can always recover from any failure
and process all the data. This gives exactly-once semantics, that all
the data will be processed exactly once no matter what fails.

 

 

________________________________

From: Enno Shioji [mailto:eshi...@gmail.com] 
Sent: Friday, June 19, 2015 5:29 PM
To: Tathagata Das
Cc: prajod.vettiyat...@wipro.com; Cody Koeninger; bit1...@163.com;
Jordan Pilat; Will Briggs; Ashish Soni; ayan guha;
user@spark.apache.org; Sateesh Kavuri; Spark Enthusiast; Sabarish
Sasidharan
Subject: Re: RE: Spark or Storm

 

Fair enough, on second thought, just saying that it should be idempotent
is indeed more confusing.

 

I guess the crux of the confusion comes from the fact that people tend
to assume the work you described (store batch id and skip etc.) is
handled by the framework, perhaps partly because Storm Trident does
handle it (you just need to let Storm know if the output operation has
succeeded or not, and it handles the batch id storing & skipping
business). Whenever I explain people that one needs to do this
additional work you described to get end-to-end exactly-once semantics,
it usually takes a while to convince them. In my limited experience,
they tend to interpret "transactional" in that sentence to mean that you
just have to write to a transactional storage like ACID RDB. Pointing
them to "Semantics of output operations" is usually sufficient though.

 

Maybe others like @Ashish can weigh on this; did you interpret it in
this way?

 

What if we change the statement into:

"end-to-end exactly-once semantics (if your updates to downstream
systems are idempotent or transactional). To learn how to make your
updates idempotent or transactional, see the "Semantics of output
operations" section in this chapter
<https://spark.apache.org/docs/latest/streaming-programming-guide.html#f
ault-tolerance-semantics> "

 

That way, it's clear that it's not sufficient to merely write to a
"transactional storage" like ACID store.

 

 

 

 

 

 

 

On Fri, Jun 19, 2015 at 9:08 AM, Tathagata Das <t...@databricks.com>
wrote:

If the current documentation is confusing, we can definitely improve the
documentation. However, I dont not understand why is the term
"transactional" confusing. If your output operation has to add 5, then
the user has to implement the following mechanism

 

1. If the unique id of the batch of data is already present in the
store, then skip the update

2. Otherwise atomically do both, the update operation as well as store
the unique id of the batch. This is pretty much the definition of a
transaction. The user has to be aware of the transactional semantics of
the data store while implementing this functionality. 

 

You CAN argue that this effective makes the whole updating sort-a
idempotent, as even if you try doing it multiple times, it will update
only once. But that is not what is generally considered as idempotent.
Writing a fixed count, not an increment, is usually what is called
idempotent. And so just mentioning that the output operation must be
idempotent is, in my opinion, more confusing.

 

To take a page out of the Storm / Trident guide, even they call this
exact conditional updating of Trident State as "transactional"
operation. See "transactional spout" in the Trident State guide -
https://storm.apache.org/documentation/Trident-state

 

In the end, I am totally open the suggestions and PRs on how to make the
programming guide easier to understand. :)

 

TD

 

On Thu, Jun 18, 2015 at 11:47 PM, Enno Shioji <eshi...@gmail.com> wrote:

Tbh I find the doc around this a bit confusing. If it says "end-to-end
exactly-once semantics (if your updates to downstream systems are
idempotent or transactional)", I think most people will interpret it
that as long as you use a storage which has atomicity (like
MySQL/Postgres etc.), a successful output operation for a given batch
(let's say "+ 5") is going to be issued exactly-once against the
storage.

 

However, as I understand it that's not what this statement means. What
it is saying is, it will always issue "+5" and never, say "+6", because
it makes sure a message is processed exactly-once internally. However,
it *may* issue "+5" more than once for a given batch, and it is up to
the developer to deal with this by either making the output operation
idempotent (e.g. "set 5"), or "transactional" (e.g. keep track of batch
IDs and skip already applied batches etc.).

 

I wonder if it makes more sense to drop "or transactional" from the
statement, because if you think about it, ultimately what you are asked
to do is to make the writes idempotent even with the "transactional"
approach, & "transactional" is a bit loaded and would be prone to lead
to misunderstandings (even though in fairness, if you read the fault
tolerance chapter it explicitly explains it).

 

 

 

On Fri, Jun 19, 2015 at 2:56 AM, <prajod.vettiyat...@wipro.com> wrote:

More details on the Direct API of Spark 1.3 is at the databricks blog:
https://databricks.com/blog/2015/03/30/improvements-to-kafka-integration
-of-spark-streaming.html

 

Note the use of checkpoints to persist the Kafka offsets in Spark
Streaming itself, and not in zookeeper. 

 

Also this statement:".. This allows one to build a Spark Streaming +
Kafka pipelines with end-to-end exactly-once semantics (if your updates
to downstream systems are idempotent or transactional)."

 

 

From: Cody Koeninger [mailto:c...@koeninger.org] 
Sent: 18 June 2015 19:38
To: bit1...@163.com
Cc: Prajod S Vettiyattil (WT01 - BAS); jrpi...@gmail.com;
eshi...@gmail.com; wrbri...@gmail.com; asoni.le...@gmail.com; ayan guha;
user; sateesh.kav...@gmail.com; sparkenthusi...@yahoo.in;
sabarish.sasidha...@manthan.com
Subject: Re: RE: Spark or Storm

 

That general description is accurate, but not really a specific issue of
the direct steam.  It applies to anything consuming from kafka (or, as
Matei already said, any streaming system really).  You can't have
exactly once semantics, unless you know something more about how you're
storing results.

 

For "some unique id", topicpartition and offset is usually the obvious
choice, which is why it's important that the direct stream gives you
access to the offsets.

 

See https://github.com/koeninger/kafka-exactly-once for more info

 

 

 

On Thu, Jun 18, 2015 at 6:47 AM, bit1...@163.com <bit1...@163.com>
wrote:

        I am wondering how direct stream api ensures end-to-end exactly
once semantics

         

        I think there are two things involved:

        1. From the spark streaming end, the driver will replay the
Offset range when it's down and restarted,which means that the new tasks
will process some already processed data.

        2. From the user end, since tasks may process already processed
data, user end should detect that some data has already been
processed,eg,

        use some unique ID.

         

        Not sure if I have understood correctly.

         

         

        
________________________________


        bit1...@163.com

                 

                From: prajod.vettiyat...@wipro.com

                Date: 2015-06-18 16:56

                To: jrpi...@gmail.com; eshi...@gmail.com

                CC: wrbri...@gmail.com; asoni.le...@gmail.com;
guha.a...@gmail.com; user@spark.apache.org; sateesh.kav...@gmail.com;
sparkenthusi...@yahoo.in; sabarish.sasidha...@manthan.com

                Subject: RE: Spark or Storm

                >>not being able to read from Kafka using multiple nodes

                 

                > Kafka is plenty capable of doing this..

                 

                I faced the same issue before Spark 1.3 was released.

                 

                The issue was not with Kafka, but with Spark Streaming's
Kafka connector. Before Spark 1.3.0 release one Spark worker would get
all the streamed messages. We had to re-partition to distribute the
processing.

                 

                From Spark 1.3.0 release the Spark Direct API for Kafka
supported parallel reads from Kafka streamed to Spark workers. See the
"Approach 2: Direct Approach" in this page:
http://spark.apache.org/docs/1.3.0/streaming-kafka-integration.html.
Note that is also mentions zero data loss and exactly once semantics for
kafka integration.

                 

                 

                Prajod

                 

                From: Jordan Pilat [mailto:jrpi...@gmail.com] 
                Sent: 18 June 2015 03:57
                To: Enno Shioji
                Cc: Will Briggs; asoni.le...@gmail.com; ayan guha; user;
Sateesh Kavuri; Spark Enthusiast; Sabarish Sasidharan
                Subject: Re: Spark or Storm

                 

                >not being able to read from Kafka using multiple nodes

                Kafka is plenty capable of doing this,  by clustering
together multiple consumer instances into a consumer group.
                If your topic is sufficiently partitioned, the consumer
group can consume the topic in a parallelized fashion.
                If it isn't, you still have the fault tolerance
associated with clustering the consumers.

                OK
                JRP

                On Jun 17, 2015 1:27 AM, "Enno Shioji"
<eshi...@gmail.com> wrote:

                        We've evaluated Spark Streaming vs. Storm and
ended up sticking with Storm.

                         

                        Some of the important draw backs are:

                        Spark has no back pressure (receiver rate limit
can alleviate this to a certain point, but it's far from ideal)

                        There is also no exactly-once semantics.
(updateStateByKey can achieve this semantics, but is not practical if
you have any significant amount of state because it does so by dumping
the entire state on every checkpointing)

                         

                        There are also some minor drawbacks that I'm
sure will be fixed quickly, like no task timeout, not being able to read
from Kafka using multiple nodes, data loss hazard with Kafka.

                         

                        It's also not possible to attain very low
latency in Spark, if that's what you need.

                         

                        The pos for Spark is the concise and IMO more
intuitive syntax, especially if you compare it with Storm's Java API.

                         

                        I admit I might be a bit biased towards Storm
tho as I'm more familiar with it.

                         

                        Also, you can do some processing with Kinesis.
If all you need to do is straight forward transformation and you are
reading from Kinesis to begin with, it might be an easier option to just
do the transformation in Kinesis.

                         

                         

                         

                         

                         

                        On Wed, Jun 17, 2015 at 7:15 AM, Sabarish
Sasidharan <sabarish.sasidha...@manthan.com> wrote:

                                Whatever you write in bolts would be the
logic you want to apply on your events. In Spark, that logic would be
coded in map() or similar such  transformations and/or actions. Spark
doesn't enforce a structure for capturing your processing logic like
Storm does.

                                Regards
                                Sab

                                Probably overloading the question a bit.

                                In Storm, Bolts have the functionality
of getting triggered on events. Is that kind of functionality possible
with Spark streaming? During each phase of the data processing, the
transformed data is stored to the database and this transformed data
should then be sent to a new pipeline for further processing

                                How can this be achieved using Spark?

                                 

                                On Wed, Jun 17, 2015 at 10:10 AM, Spark
Enthusiast <sparkenthusi...@yahoo.in> wrote:

                                I have a use-case where a stream of
Incoming events have to be aggregated and joined to create Complex
events. The aggregation will have to happen at an interval of 1 minute
(or less).

                                 

                                The pipeline is :

                                                                  send
events                                          enrich event

                                Upstream services ------------------->
KAFKA ---------> event Stream Processor ------------> Complex Event
Processor ------------> Elastic Search.

                                 

                                From what I understand, Storm will make
a very good ESP and Spark Streaming will make a good CEP.

                                 

                                But, we are also evaluating Storm with
Trident.

                                 

                                How does Spark Streaming compare with
Storm with Trident?

                                 

                                Sridhar Chellappa

                                 

                                 

                                 

                                 

                                 

                                 

                                On Wednesday, 17 June 2015 10:02 AM,
ayan guha <guha.a...@gmail.com> wrote:

                                 

                                I have a similar scenario where we need
to bring data from kinesis to hbase. Data volecity is 20k per 10 mins.
Little manipulation of data will be required but that's regardless of
the tool so we will be writing that piece in Java pojo. 

                                All env is on aws. Hbase is on a long
running EMR and kinesis on a separate cluster.

                                TIA.
                                Best
                                Ayan

                                On 17 Jun 2015 12:13, "Will Briggs"
<wrbri...@gmail.com> wrote:

                                The programming models for the two
frameworks are conceptually rather different; I haven't worked with
Storm for quite some time, but based on my old experience with it, I
would equate Spark Streaming more with Storm's Trident API, rather than
with the raw Bolt API. Even then, there are significant differences, but
it's a bit closer.
                                
                                If you can share your use case, we might
be able to provide better guidance.
                                
                                Regards,
                                Will
                                
                                On June 16, 2015, at 9:46 PM,
asoni.le...@gmail.com wrote:
                                
                                Hi All,
                                
                                I am evaluating spark VS storm ( spark
streaming  ) and i am not able to see what is equivalent of Bolt in
storm inside spark.
                                
                                Any help will be appreciated on this ?
                                
                                Thanks ,
                                Ashish
        
---------------------------------------------------------------------
                                To unsubscribe, e-mail:
user-unsubscr...@spark.apache.org
                                For additional commands, e-mail:
user-h...@spark.apache.org
                                
                                
        
---------------------------------------------------------------------
                                To unsubscribe, e-mail:
user-unsubscr...@spark.apache.org
                                For additional commands, e-mail:
user-h...@spark.apache.org

                                 

                                 

                         

                The information contained in this electronic message and
any attachments to this message are intended for the exclusive use of
the addressee(s) and may contain proprietary, confidential or privileged
information. If you are not the intended recipient, you should not
disseminate, distribute or copy this e-mail. Please notify the sender
immediately and destroy all copies of this message and any attachments.
WARNING: Computer viruses can be transmitted via email. The recipient
should check this email and any attachments for the presence of viruses.
The company accepts no liability for any damage caused by any virus
transmitted by this email. www.wipro.com 

 

The information contained in this electronic message and any attachments
to this message are intended for the exclusive use of the addressee(s)
and may contain proprietary, confidential or privileged information. If
you are not the intended recipient, you should not disseminate,
distribute or copy this e-mail. Please notify the sender immediately and
destroy all copies of this message and any attachments. WARNING:
Computer viruses can be transmitted via email. The recipient should
check this email and any attachments for the presence of viruses. The
company accepts no liability for any damage caused by any virus
transmitted by this email. www.wipro.com 

 

 

 

Reply via email to