I have suggestions for an alternate solution.

If source message-ids were known for replicated messages, a composite
cursor can be maintained for replicated subscriptions as an n-tuple.  Since
messages are ordered from a source, it would be possible to restart from a
known cursor n-tuple in any cluster by  a combination of cursor
positioning  _and_ filtering

A simple way to approximate this is for each cluster to insert its own
ticker marks into the topic. A ticker carries the messsage id as the
message body. The ticker mark can be inserted every 't ' time interval or
every 'n' messages as needed.

The n-tuple of the tickers from each cluster is a well-known state  that
can be re-started anywhere by proper positioning and filtering

That is a simpler solution for users to understand and trouble-shoot.  It
would be resilient to cluster failures, and does NOT require all clusters
to be up, to determine cursor position. No cross-cluster
communication/ordering is needed.

 But it will require skipping messages from specific sources as needed, and
storing  the n-tuple as part of cursor state

Joe

On Mon, Mar 25, 2019 at 10:24 PM Sijie Guo <guosi...@gmail.com> wrote:

> On Mon, Mar 25, 2019 at 4:14 PM Matteo Merli <mme...@apache.org> wrote:
>
> > On Sun, Mar 24, 2019 at 9:54 PM Sijie Guo <guosi...@gmail.com> wrote:
> > >
> > > Ivan, Matteo, thank you for the writeup!
> > >
> > > I have a few more questions
> > >
> > > - How does this handle ack individual vs ack cumulatively? It seems it
> is
> > > ignored at this moment. But it is good to have some discussions around
> > how
> > > to extend the approach to support them and how easy to do that.
> >
> > Yes, it's stated that current proposal only replicates the mark-delete
> > position.
> > (Will clarify it better in the wiki)
> >
> > Of course the markers approach works well with cumulative acks (since
> they
> > just moves the mark-delete position), but it will work with
> > individual-acks too
> > in most of the scenarios.
> >
> > Keep in mind that in all cases a cluster failover will involve some
> number
> > of duplicates (configurable with the frequency of the snapshot).
> >
> > With individual acks, if all messages are acked within a short amount of
> > time,
> > for example, 1 second, comparable to the snapshot frequency, then there
> > will be no problem and no practical difference from the cumulative ack
> > scenario.
> >
> > Conversely, if some messages can stay unacked for much longer amount
> > of time,  while other messages are being acked, that will lead to a
> larger
> > amount of duplicates during cluster failover.
> >
> > Regarding at how support this case better, I replied below in the
> > "alternative
> > design" answer.
> >
> > > - Do we need to change the dispatcher, and what are the changes?
> >
> > This approach does not require any change in the dispatcher code. The
> > only change in the consumer handling is to filter out the marker messages
> > since they don't need to go back to consumers.
> >
>
>
> How does this
>
>
> >
> > > - If a region goes down, the approach can't take any snapshots. Does it
> > > mean "acknowledge" will be kind of suspended until the the region is
> > > brought back? I guess it is related to how dispatcher is changed to
> react
> > > this snapshot.  It it unclear to me from the proposal. It would be good
> > if
> > > we have more clarifications around it.
> >
> > First off, to clarify, this issue is only relevant when there are 3 or
> > more clusters
> > in the replication set.
> >
> > If one of the cluster is not reachable, the snapshots will not be
> > taken. A consumer
> > will still keep acknowledging locally but these acks won't be
> > replicated in the other
> > clusters. Therefore in case of a cluster failover, the subscription
> > will be rolled back
> > to a much earlier position.
>
>
> > This is not a problem with 2 clusters since if the other cluster is down,
> > we
> > we cannot failover to it anyway.
> >
>
> The question here will be more about how to fail back. If a snapshot is not
> taken, then nothing is *exchanged*
> between the clusters. How does this proposal handle failing back?
>
> In other words, what are the sequences for people to do failover and
> failback?
> It would be good to have an example to demonstrate the sequences, so that
> users will have a clear picture on how to use this feature.
>
>
> >
> > When we have 3+ clusters, though, we can only sustain 1 cluster
> > failure because, after that,
> > the snapshot will not make progress.
> >
> > Typically, though, the purpose of replicated subscriptions is to have
> > the option to fail out
> > of a failed cluster, which in this case it will work.
> >
> > What it won't work would be failing over from A to C when cluster B is
> > down. To define "won't work"
> > is that consumer will go to C but will find the cursor to an older
> > position. No data loss, but potentially
> > a big number of dups.
> >
> > This is to protect for the case of messages exchanged between B and C
> > clusters before B cluster
> > went down.
> >
> > In practice we can have operation tools to do a manually override and
> > ensure snapshots are
> > taken. It would be interesting to see how this feature would be
> > getting used and what the
> > operational pain points will be, before overthinking these problems
> > upfront and dig too much
> > in a direction that might not be too relevant in practice.
> >
> > > Have you guys considered any other alternatives? If you have considered
> > > other alternatives, it might be worth listing out the alternatives for
> > > comparisons.
> >
> > (good point will add to the wiki)
> >
> > Yes, the main other approach would be to attach the "Original-Message-Id"
> > when
> > replicating a message.
> > That would allow to basically keep an additional range-set based on
> > the Original-Message-Id
> > as well as the local message id.
> >
> > The main drawbacks of this approach (compared to the proposal) are:
> >   * It would only work for individual-acks but not for cumulative acks
> >   * We need to make more changes to propagate the OriginalMessageId to
> > consumers,
> >      so that when it acks we don't have to maintain a mapping
> >
> > This require some bit of changes in the rangeset tracker. It shouldn't
> > be terribly hard to
> > do though, and it's a very "localized" change (easy to create
> > substantial amount of unit
> > tests around this custom data structure).
> >
> > The idea is that this approach would be a good candidate to extend the
> > current proposal
> > to support individual-acks on top of the markers to move the
> > mark-delete position.
> >
> > > A lot of the challenges are introduced because messages are interleaved
> > > during cross-region replication. Interleaving might be working for some
> > > cross-region setup and failover strategies. But it also has challenges
> in
> > > supporting other features. I think it might be a good time to rethink
> the
> > > cross-region replication and allow different types of cross-region
> > > replication mechanism exists. So we can introduce a new cross-region
> > > replication mechanism which avoid interleaving (e.g. via having a
> > > per-region partition), so that people can choose which replication
> > > mechanism to use based on their cross-region setup and requirements.
> > >
> > > In a non-interleaved cross-region replication mechanism, we can do
> > > precisely entry-to-entry replication at managed ledger level (via BK's
> > > LedgerHandleAdv) and maintain 1:1 message id mapping during
> replication.
> > > Once we can maintain 1:1 message id mapping, the replicated
> subscription
> > > can be simply done via replicating cursors. This approach can also
> > provide
> > > a few other benefits like a replicated subscription can selectively
> > consume
> > > messages from a given region only (local region or a remote region). It
> > > also provides more flexibilities for consumers to do region failover.
> >
> > I think there are few challenges in the per-region approach:
> >
> >  1. This would remove a fundamental guarantee that currently Pulsar
> > replicated topics
> >      provide: consistency within a single region.
> >      If we have N logs instead of 1 single log, there's no way to
> > replay the log and
> >      reach the same state, or to have 2 consumers in same region to be
> > consistent.
> >      This would break several applications currently running on Pulsar.
> >
>
> currently Pulsar only guarantees per-region and per partition ordering.
> so technically it doesn't really change any behaviors. However if people
> needs an aggregated order of messages from multiple regions, then you
> required interleaved replication. that's why I didn't propose changing
> the current geo-replication mechanism, instead I propose adding a new
> replication mechanism.
>
>
>
> >  2. The topic metadata size will get multiplied by N, with N being the
> > number of clusters.
> >      With 8 clusters and lot of topics that would break some existing
> > deployments.
> >
>
> Correct. For most of deployments, N is bound to 2~3 clusters. So adding a
> new replication mechanism
> can probably address the problems for most of the deployments.
>
>
> >  3. Even if we specify the same entryId when republishing, we would
> > have to ensure
> >      to use a perfectly mirrored ledger in all the regions. What would
> > happen if a
> >      ledger is fenced in one of the regions?
> >
> > This would be on top of a very substantial amount of changes in core
> parts
> > of the code that would have be to tested at scale and lastly to figure
> out
> > a compatible live migration path.
> >
>
> Sure. That's why I propose as adding a new replication mechanism not
> changing original one.
>
>
> >
> > >  so that people can choose which replication
> > > mechanism to use based on their cross-region setup and requirements.
> >
> > Applications can already chose to keep separated the topic, if they wish
> > so. You
> > just need to make sure to write to a different topic in each region
> > and subscribe
> > to all.
> >
>
> Sure. Applications can do so by themselves.
>
>
> >
> >
> > I believe the current proposal is a much less risky approach that can be
> > easily
> > implemented on top of the existing replication infrastructure, providing
> a
> > solution that will benefits a lot of use cases from day 1, with, of
> > course, room
> > to get improved down the road.
> >
>
> Sure. I didn't mean proposing another alternative.
>

Reply via email to