Let me try to explain my suggestion better. First, about positions in an ordered stream: Consider a simple stream of events when there is no identifier on each event about its relative position in the stream. But after every 'n' events there is a ticker event carrying a monotonically increasing sequence id.
For eg: after, say, every 4th event, the stream generator inserts a ticker into the stream. Then the stream will be like a, a, a, a, (a1), a, a, a, a, (a2), a, a, a, a (a3) a .... and so on. A reader reading can establish it's position based on these "ticker" events. (like freeway mile markers) Assertion 1: A ticker position in the stream is deterministic across all copies of the stream, if all copies have the same event order . This means reading can be resumed across copies of the same stream , since positions are deterministic. For eg: if reader on copy X says I am at the ticker (a2), then the readers position is at (a2) in every other copy, Y or Z . The reader can stop at reading at (a8) in copy X and resume at (a8) in copy Y, and do so without loss of events. Second, consider a merge operator that merges 'n' _ordered_ input streams and produce one output stream. The operator can be modeled as being fed with with 'n' input readers and emitting one output. There is no buffering, If the operator gets an input, it has to write it to the output before it accepts another input from any of its feeders. This merge operator has 2 properties (1)Input order: the merge operator maintains input order in the output i.e. if input A had (a-x) preceding (a-y), then the output of the merge operator has (a-x) preceding (a-y). (2)No output order: different merge operators can produce arbitrary output orders across the same input feeds i.e. No assumptions can be made that, in the output, (a-x) will precede (b-y), [..or (b-z) or (c-y) or (c-z) or ..] Assertion 2: The merge output can then be represented as an n-tuple of 'n' individual input positions; Since each input is an ordered stream, the position within that input sub-stream is deterministic, and a combination of positions on all inputs is deterministic. It follows that (1) the set of input positions can be transferred from one operator to another, and (2) the output will not lose events across such transfers and (3) output order may change across such transfer Note that there is no assumption or assertions about the _output_ of the merge operator. We are only asserting this about the input. Example -------------- For eg: readers P, Q R, are each reading output of different merge operators. They all process the same three event streams, one generated from A, one from B, and one from C. Then P's merge operator can be represented as three input stream readers Pa, Pb, Pc who feed into the merge operator P. The operator for P may produce a different output than the operator for Q, (say because input readers may progress at different speeds in each operator), but each input stream reader position is deterministic (by Assertion 1) If P has a position at [ Pa(a8), Pb(b1), Pc(c3)] , Q has an equivalent position [Qa(a8), Qb(b1), Qc(c1)] for its input readers. Reader on Q can set up input readers .. Qa to a(8), Qb to b(1) and Qc to c(3), to start feeding into Q ----------- These two assertions are the invariants in my suggestion. The rest is about solving two implementation issues, (1) How to map the input n-tuple of the merge operator into a specific position in the output of the merge operator eg: how to map [Pa(x), Pb(y) Pc(z)] ===> P(j) (2) How to resume a reader across the _outputs_ of two merge operators, without loss of events (and as little duplication), when there are fed with the same input, but at different rates. eg: If [Pa(x), Pb(y) Pc(z)] ==> P(j) , then find position Q(r) <==== [Pa(x), Pb(y) Pc(z)], And my thinking is that these two things can be solved similar to the existing proposal. On Wed, May 1, 2019 at 4:10 PM Matteo Merli <mme...@apache.org> wrote: > On Mon, Apr 29, 2019 at 1:57 PM Joe F <joefranc...@gmail.com> wrote: > > > > I have suggestions for an alternate solution. > > > > If source message-ids were known for replicated messages, a composite > > cursor can be maintained for replicated subscriptions as an n-tuple. > Since > > messages are ordered from a source, it would be possible to restart from > a > > known cursor n-tuple in any cluster by a combination of cursor > > positioning _and_ filtering > > Knowing the source message id alone is not enough to establish the > order relationship across all the clusters. I think that would only > work in the 2 clusters scenario. > > In general, for each message being written in region A, both locally > published or replicated from other regions, we'd have to associate the > highest (original from the source) message id. While that could be > easy in the simplest case (broker maintains hashmap of the highest > source message id from each region), it becomes more difficult to > consider failure cases in which we have to reconstruct that hashmap > from the log. > > Also, that would require to modify each message before persisting it, > in the target region. > > Finally, the N-tuple of message ids would have to either: > * Have a mapping in the broker (local-message -> N-tuple) > * Be pushed to consumers so that they will ack with the complete context > > > > > A simple way to approximate this is for each cluster to insert its own > > ticker marks into the topic. A ticker carries the messsage id as the > > message body. The ticker mark can be inserted every 't ' time interval or > > every 'n' messages as needed. > > > > The n-tuple of the tickers from each cluster is a well-known state that > > can be re-started anywhere by proper positioning and filtering > > > > That is a simpler solution for users to understand and trouble-shoot. It > > would be resilient to cluster failures, and does NOT require all clusters > > to be up, to determine cursor position. No cross-cluster > > communication/ordering is needed. > > I don't think this approach can remove the requirement of "all the > clusters to be up" because the information in A won't have any context > on what was exchanged between B and C. > > One quick example: > * Message c2 was replicated to B but not A. Then C goes offline. > * A tuple will be something like (a3, b4, c1) > * In region B, b4 was actually written *after* c2, but c2 doesn't get > sent from B to A, since it was already replicated itself. > * If we do a failover from A to B considering a3 ~= b4 we would be > missing the message c2 >