Let me try to explain my suggestion better.

First, about positions in an ordered stream:  Consider a simple stream  of
events when there is no identifier  on each event about its relative
position in the stream. But after every 'n' events there is a ticker event
carrying a monotonically increasing sequence id.

                      For eg: after, say, every 4th event,  the stream
generator inserts a ticker  into the stream.
                      Then the stream will be like a, a, a,  a,  (a1),  a,
a, a, a, (a2), a, a, a, a (a3) a .... and so on.
                    A reader reading can establish it's position based on
these "ticker" events. (like freeway mile markers)

Assertion 1: A ticker position in the stream is deterministic across all
copies of the stream, if all copies have the same event order .
This means reading can be  resumed across copies of the same stream , since
positions are deterministic. For eg: if reader on copy X says I am at the
ticker (a2), then the readers position is at (a2) in every other copy, Y or
Z  . The reader can   stop at reading at (a8) in copy X and resume at (a8)
in copy Y,  and do so without loss of events.

 Second, consider a  merge operator that merges 'n'  _ordered_ input
streams and produce one output stream.

The operator can be modeled as being fed with with 'n' input readers and
emitting one output. There is no buffering, If the operator gets an input,
it has to write it to the output before it accepts another input from  any
of its feeders.

This merge operator has 2 properties
          (1)Input order: the merge operator maintains input order in the
output
                           i.e. if input A had (a-x) preceding (a-y), then
the output of the merge operator has (a-x) preceding (a-y).
          (2)No output order: different merge operators can produce
arbitrary output orders across the same input feeds
                            i.e. No assumptions can be made that, in the
output,  (a-x) will precede (b-y),   [..or (b-z)  or  (c-y) or (c-z) or ..]

Assertion 2: The merge output can then be  represented as  an n-tuple of
'n' individual input  positions;  Since each input is an ordered stream,
the position  within that input sub-stream  is deterministic, and a
combination of positions on all inputs is deterministic.

It follows that (1) the set of input positions can be transferred from one
operator to another, and (2) the output will not lose events across such
transfers  and (3) output order may change across such transfer

Note that there is no assumption or assertions about the _output_ of the
merge operator. We are only asserting this about the input.

Example
--------------
For eg: readers P, Q R, are each  reading output of different merge
operators. They all process the same three event streams, one generated
from A, one from B, and one from C.

Then P's merge operator can be represented as three input stream readers
Pa, Pb, Pc who feed into  the merge operator P.   The operator for P may
produce a different output than the operator for Q,  (say because input
readers may progress at different speeds in each operator), but each  input
stream reader position is deterministic (by Assertion 1)

If P has a position at [ Pa(a8), Pb(b1), Pc(c3)] ,  Q has an equivalent
position [Qa(a8), Qb(b1), Qc(c1)] for its input readers.
          Reader on Q can set up input readers .. Qa to a(8), Qb to b(1)
and Qc to c(3), to start  feeding into Q

-----------

These two assertions are the invariants in my suggestion. The rest is about
solving two implementation issues,

(1)  How to map the  input n-tuple of the merge operator into  a  specific
position in the output of the merge operator
                        eg: how to map [Pa(x), Pb(y) Pc(z)]  ===>  P(j)

(2) How to resume a reader  across  the _outputs_ of  two merge operators,
without loss of events (and as little duplication),  when there are fed
with the same input, but at different rates.
                     eg: If  [Pa(x), Pb(y) Pc(z)] ==> P(j) , then find
 position Q(r) <==== [Pa(x), Pb(y) Pc(z)],

And my thinking is that these two things can be solved similar to the
existing proposal.

On Wed, May 1, 2019 at 4:10 PM Matteo Merli <mme...@apache.org> wrote:

> On Mon, Apr 29, 2019 at 1:57 PM Joe F <joefranc...@gmail.com> wrote:
> >
> > I have suggestions for an alternate solution.
> >
> > If source message-ids were known for replicated messages, a composite
> > cursor can be maintained for replicated subscriptions as an n-tuple.
> Since
> > messages are ordered from a source, it would be possible to restart from
> a
> > known cursor n-tuple in any cluster by  a combination of cursor
> > positioning  _and_ filtering
>
> Knowing the source message id alone is not enough to establish the
> order relationship across all the clusters. I think that would only
> work in the 2 clusters scenario.
>
> In general, for each message being written in region A, both locally
> published or replicated from other regions, we'd have to associate the
> highest (original from the source) message id. While that could be
> easy in the simplest case (broker maintains hashmap of the highest
> source message id from each region), it becomes more difficult to
> consider failure cases in which we have to reconstruct that hashmap
> from the log.
>
> Also, that would require to modify each message before persisting it,
> in the target region.
>
> Finally, the N-tuple of message ids would have to either:
>  * Have a mapping in the broker (local-message -> N-tuple)
>  * Be pushed to consumers so that they will ack with the complete context
>
> >
> > A simple way to approximate this is for each cluster to insert its own
> > ticker marks into the topic. A ticker carries the messsage id as the
> > message body. The ticker mark can be inserted every 't ' time interval or
> > every 'n' messages as needed.
> >
> > The n-tuple of the tickers from each cluster is a well-known state  that
> > can be re-started anywhere by proper positioning and filtering
> >
> > That is a simpler solution for users to understand and trouble-shoot.  It
> > would be resilient to cluster failures, and does NOT require all clusters
> > to be up, to determine cursor position. No cross-cluster
> > communication/ordering is needed.
>
> I don't think this approach can remove the requirement of "all the
> clusters to be up" because the information in A won't have any context
> on what was exchanged between B and C.
>
> One quick example:
>  * Message c2 was replicated to B but not A. Then C goes offline.
>  * A tuple will be something like (a3, b4, c1)
>  * In region B, b4 was actually written *after* c2, but c2 doesn't get
> sent from B to A, since it was already replicated itself.
>  * If we do a failover from A to B considering a3 ~= b4 we would be
> missing the message c2
>

Reply via email to