On 8/9/12 4:37 PM, Flavio Junqueira wrote:
The additional features I see needed are logging and a mechanism to notify 
consumption. Consuming means that an event has been processed. One issue here 
is the one I raised before: not confirming that an event haven't been consumed 
doesn't mean it hasn't, the node might have have crashed before sending the 
message or writing to the journal. In principle, processing the message and 
notifying consumption need to be atomic for exactly-once semantics.

About minimizing errors, I'm still not entirely sure what it buys us. Of course 
we don't want to lose lots of events if we can avoid it, but to my knowledge we 
are not doing poorly in that sense. For applications that strictly need to 
process every event, losing 2 instead of 4 does not make much difference, so 
I'm still stuck on what we gain with point 1 of your incremental approach.

Using TCP indeed helps. But events may also be lost through load shedding at the application level (by default, events are dropped when queues are full).

Dynamic load balancing is a way to minimize load shedding automatically for instance.

But simply reporting comprehensive metrics helps identify bottlenecks and therefore minimize errors through manual tuning, optimization or provisioning. That's always needed!


Even if once-only semantics do not directly rely on those solutions, these look easier to implement and follow S4's initial approach of probabilistic thinking in terms of algorithms and systems. But there is no incompatibility, and, we'll probably be driven by our own use cases here anyway.


Regards,

Matthieu


-Flavio

On Aug 9, 2012, at 11:22 AM, Matthieu Morel wrote:

Hi,

Is it worth trying to achieve exactly-once or even at-least-once event
processing semantics, which are not useful for applications that process
streams in a statistical manner? It seems there are quite a few users
and applications interested in these semantics, and that can tolerate
the exceptional extra latency due to recovery. And indeed combining
checkpointing with an inbound logging mechanism such as something based
on Hedwig/Bookkeeper or Kafka is a possible approach. But it is not
sufficient, since - implementation-wise - we also have to add some
tracking of the messages, a mechanism to avoid load shedding by holding
upstream processes, and probably some kind of coordination messages or
mechanism.

As Leo pointed out, this can get quite complex, and involvement is
use-case driven and therefore depends on use cases
committers/contributors are facing and directly involved with.

So maybe an incremental approach would be worth following:

1. minimize errors (this implies providing metrics, and some other
things Leo may have in mind)
2. control load shedding by notifying upstream processing and
potentially holding upstream processes (optional of course)
3. integrate with replayable inbound logging systems such as Kafka or
Hedwig/logstage,
4. implement a mechanism for coordinating recovery

Each step brings very valuable benefits and we can adapt based on use
cases we are facing, workload, participants, priorities etc...

For now, we already scheduled adding insightful metrics in the next
version. 2 and 3 look fairly easy to implement.


Regards,

Matthieu






On 8/8/12 11:13 PM, Flavio Junqueira wrote:
Roughly applications can either afford to lose events or not. If an app can't 
afford to lose events, then it does not make much good to reduce the amount of 
lost events.   One way to achieve fault tolerance is to log events, using e.g. 
BookKeeper. BookKeeper is both replicated and fast.

Just logging, however, is not sufficient. If you can't stall the source, then 
guaranteeing no event loss might be very difficult in the case recovery can be 
arbitrarily long. In such cases, the amount of storage for logging required is 
unbounded.

Along those lines, Matthieu has developed a system on top of BookKeeper (Hedwig 
to be more precise) called log stage; it might be useful in this context. What 
do you think, Matthieu?

-Flavio

On Aug 7, 2012, at 8:37 PM, Leo Neumeyer wrote:

Hi all. Some thoughts.

Building a fault tolerant system would require queuing N events at the
source. The length of the queue would depend on the frequency of
checkpointing. Post failure, the system would need to restore the state of
the node from checkpointing and re-apply all the events emitted since the
time of checkpointing. To increase reliability, we would also need to
replicate the nodes. One would also need to account for the peak data rate
to make sure there is enough capacity to re-process all the data and comply
with the real-time constrains.

All of this is possible but I'm not sure this is the best platform for
applications that expect zero errors. It was designed for processing large
amounts of data in application that can tolerate a small probability of
error. Improving the platform to reduce the errors is much simpler than
trying to achieve zero errors.

Perhaps a better approach is to have good error detection so the
application can handle the recovery at a higher level (not in real-time).

Rather than try to solve ALL the problems, I think that it is better to
focus on problems that involve statistical processing of massive amounts of
noisy and redundant data where a small probability of errors will not
affect the accuracy of the results. (text processing, signal processing,
sensor data, market data, etc.) The advantage of focusing is that we can
solve one problem well and keep the system as simple as possible.

Regarding the reliability at the communication layer, using TCP should work
fine, I think.

Of course sending email is easy, I wish I had more time to put my code
where my mouth is :-)

-leo

On Tue, Aug 7, 2012 at 10:15 AM, Karthik Kambatla <kkamb...@cs.purdue.edu>wrote:

Hi Flavio,

We are in agreement. I was trying to push the discussion further, and get
your inputs to decide on a plausible approach in S4.

Regarding exactly-once semantics, I understand we need to plug multiple
holes in a failing environment. To ensure we actually can support reliable
delivery, should we outline possible failures and how they can be
addressed. I might be able to think threw and list the steps (and possible
failures) later today/tomorrow.

On a side note, I remember Kishore and Leo initiating a conversation on
using ZeroMQ in the comm-layer. ZeroMQ, I have heard, supports reliable
delivery, and they claim to be faster than TCP.

Thanks
Karthik

On Tue, Aug 7, 2012 at 6:53 AM, Flavio Junqueira <f...@yahoo-inc.com>
wrote:

I didn't mean to suggest a different way, I was trying to understand the
definition of exactly-once. As for the use case Benjamin has posted, he
says that not even a single tag scan can be lost, but can we guarantee
that
events are reliably delivered at-least-once with S4?

-Flavio

On Aug 7, 2012, at 7:38 AM, Karthik Kambatla wrote:

Given that nodes crash, it seems essential to build reliability like it
is
in lower layers - think TCP. One trivial approach could be to use
monotonically increasing sequence numbers to identify events in a
stream.

  1. Event ordering: Hold events until all the previous sequence
numbers
  have been received.
  2. Exactly-once: If a sequence number is smaller than previous one,
it
  is a duplicate.
  3. Fault-tolerance: Store the latest sequence number along with the
  checkpoint, replay events from there onwards.

This, of course, comes at a performance overhead and should be
optional.

As I said, this is the first approach that comes to mind. It is indeed
an
interesting problem, and I feel we should not need to re-do stuff that
is
done at lower layers.

Please suggest improvements/alternatives as you see fit.

Thanks
Karthik

On Mon, Aug 6, 2012 at 10:01 PM, Flavio Junqueira <f...@yahoo-inc.com>
wrote:


On Aug 6, 2012, at 10:11 PM, Karthik Kambatla wrote:

Flavio - it is indeed tricky to offer exactly-once semantics. My
understanding is that the underlying comm-layer could filter out
subsequent
duplicate events; however, we need to sacrifice ordering.


I was also thinking that if a node crash and we recover from
checkpoints,
we may end up having messages applied twice.

-Flavio

Thanks
Karthik

On Mon, Aug 6, 2012 at 6:43 AM, "Benjamin Süß" <gothi...@gmx.de>
wrote:

Hi Matthieu,

thank you for your reply. I had a specific use case in mind, indeed:

I am trying to track RFID tags in distributed systems. This means,
that
not even a single tag scan may get lost. And of course, none are to
be
sent
twice or even more often as this would heavily confuse any
surveillance
routines I am going to implement.

Regarding your answers, especially point 3, I do not think this can
be
done with S4 at the moment, can it?

Regards,
Benjamin

-------- Original-Nachricht --------
Datum: Tue, 31 Jul 2012 17:30:27 +0200
Von: Matthieu Morel <mmo...@apache.org>
An: s4-u...@incubator.apache.org
Betreff: Re: Thoughts on adding guaranteed message processing

On 7/31/12 2:54 PM, "Benjamin Süß" wrote:
Hi there,

it is stated in several places that S4 does not include guaranteed
one-time message processing. So my question is: are there currently
any
plans on
adding this to S4? Or is it certain this is not going to happen? If
there
are any plans on this, can I find further information somewhere?


There are typically 3 requirements for guaranteeing one-time message
processing:

1. reliable communication channels

2. replayable input stream: you need an upstream component that is
able
to store/bufferize the whole stream and replay on demand.

3. tracking of messages, using some sort of piggybacking, possibly
requiring manual input from the user.


In S4 0.5.0, we already address 1. by providing communications
through
TCP by default. Requirement 2. is quite straightforward to
implement,
by
adding some machinery to connect to a component such as Apache Kafka
for
instance. We are considering options for 3.


Do you have a specific use case in mind?

Regards,

Matthieu











--
*Leo Neumeyer*
Software, data, algorithms.
leoneume...@gmail.com
*http://www.linkedin.com/in/leoneu*

See who we know in
common<http://www.linkedin.com/e/wwk/27360/?hs=false&tok=3SHIsP1c6dURk1>Want
a signature like
this?<http://www.linkedin.com/e/sig/27360/?hs=false&tok=0r9XKRwP2dURk1>




Reply via email to