ah, I didnt realize we are limiting the discussion to master --> slave. but - if we're talking about master-slave replication, and under the conditions i outlined earlier (src and dest match in #partitions, no foreign writes to dest) it "just works", seems to me the only thing youre really missing is not an explicit desired offset param on each and every request, but just the ability to "reset" the starting offset on the dest cluster at topic creation.
let me try and run through a more detailed scenario: 1. suppose i set up the original cluster (src). no remote cluster yet. lets say over some period of time i produce 1 million msgs to topic X on this src cluster. 2. company grows, 2nd site is opened, dest cluster is created, topic X is created on (brand new) dest cluster. 3. offsets are manually set on every partition of X on the dest cluster to match either the oldest retained or current offset of the matching partition of X in src. in pseudo code: for (partI in numPartitions) { partIOffset if (replicateAllRetainedHistory) { partIOffset = src.getOldestRetained(partI) } else { partIOffset = src.getCurrent(partI) //will not copy over history } dest.resetStartingOffset(partI, partIOffset) <---- new mgmt API } 4. now you are free to start replicating. under master --> slave assumptions offsets will match from this point forward seems to me something like this could be made part of the replicator component (mirror maker, or whatever else you want to use) - if topic X does not exist in destination, create it, reset initial offsets to match source, start replication On Thu, Dec 29, 2016 at 12:41 PM, Andrey L. Neporada < anepor...@yandex-team.ru> wrote: > > > On 29 Dec 2016, at 20:43, radai <radai.rosenbl...@gmail.com> wrote: > > > > so, if i follow your suggested logic correctly, there would be some sort > of > > : > > > > produce(partition, msg, requestedOffset) > > > > > which would fail if requestedOffset is already taken (by another previous > > such explicit call or by another regular call that just happened to get > > assigned that offset by the partition leader on the target cluster). > > > > Yes. More formally, my proposal is to extend ProduceRequest by adding > MessageSetStartOffset: > > ProduceRequest => RequiredAcks Timeout [TopicName [Partition > MessageSetStartOffset MessageSetSize MessageSet]] > RequiredAcks => int16 > Timeout => int32 > Partition => int32 > MessageSetSize => int32 > MessageSetStartOffset => int64 > > If MessageSetStartOffset is -1, ProduceRequest should work exactly as > before - i.e. assign next available offset to given MessageSet. > > > > how would you meaningfully handle this failure? > > > > suppose this happens to some cross-cluster replicator (like mirror > maker). > > there is no use in retrying. the options would be: > > > > 1. get the next available offset - which would violate what youre trying > to > > achieve > > 2. skip msgs - so replication is incomplete, any offset "already taken" > on > > the destination is not replicated from source > > 3. stop replication for this partition completely - because starting from > > now _ALL_ offsets will be taken - 1 foreign msg ruins everything for the > > entire partition. > > > > none of these options look good to me. > > > > > > Since we are discussing master-slave replication, the only client writing > to slave cluster is the replicator itself. > In this case ProduceRequest failure is some kind of replication logic > error - for example when two replication instances are somehow launched for > single partition. > The best option here is just to stop replication process. > > So the answer to your question is (3), but this scenario should never > happen. > > > > > > On Thu, Dec 29, 2016 at 3:22 AM, Andrey L. Neporada < > > anepor...@yandex-team.ru> wrote: > > > >> Hi! > >> > >>> On 27 Dec 2016, at 19:35, radai <radai.rosenbl...@gmail.com> wrote: > >>> > >>> IIUC if you replicate from a single source cluster to a single target > >>> cluster, the topic has the same number of partitions on both, and no > one > >>> writes directly to the target cluster (so master --> slave) the offsets > >>> would be preserved. > >>> > >> > >> Yes, exactly. When you > >> 1) create topic with the same number of partitions on both master and > >> slave clusters > >> 2) write only to master > >> 3) replicate partition to partition from master to slave > >> - in this case the offsets will be preserved. > >> > >> However, you usually already have cluster that works and want to > replicate > >> some topics to another one. > >> IMHO, in this scenario there should be a way to make message offsets > equal > >> on both clusters. > >> > >>> but in the general case - how would you handle the case where multiple > >>> producers "claim" the same offset ? > >> > >> The same way as Kafka handles concurrent produce requests for the same > >> partition - produce requests for partition are serialized. > >> If the next produce request “overlaps” with previous one, it fails. > >> > >>> > >>> > >>> On Mon, Dec 26, 2016 at 4:52 AM, Andrey L. Neporada < > >>> anepor...@yandex-team.ru> wrote: > >>> > >>>> Hi all! > >>>> > >>>> Suppose you have two Kafka clusters and want to replicate topics from > >>>> primary cluster to secondary one. > >>>> It would be very convenient for readers if the message offsets for > >>>> replicated topics would be the same as for primary topics. > >>>> > >>>> As far as I know, currently there is no way to achieve this. > >>>> I wonder is it possible/reasonable to add message offset to > >> ProduceRequest? > >>>> > >>>> > >>>> — > >>>> Andrey Neporada > >>>> > >>>> > >>>> > >>>> > >> > >> > >