Roman, We already track updates on per-transaction basis. The only difference is that instead of doing a single "increment(1)" for transaction we do "increment(X)" where X is number of updates in the given transaction.
On Mon, Dec 3, 2018 at 1:16 PM Roman Kondakov <kondako...@mail.ru.invalid> wrote: > Igor, Vladimir, Ivan, > > perhaps, we are focused too much on update counters. This feature was > designed for the continuous queries and it may not be suited well for > the historical rebalance. What if we would track updates on > per-transaction basis instead of per-update basis? Let's consider two > counters: low-water mark (LWM) and high-water mark (HWM) which should be > added to each partition. They have the following properties: > > * HWM - is a plane atomic counter. When Tx makes its first write on > primary node it does incrementAndGet for this counter and remembers > obtained value within its context. This counter can be considered as tx > id within current partition - transactions should maintain per-partition > map of their HWM ids. WAL pointer to the first record should remembered > in this map. Also this id should be recorded to WAL data records. > > When Tx sends updates to backups it sends Tx HWM too. When backup > receives this message from the primary node it takes HWM and do > setIfGreater on the local HWM counter. > > * LWM - is a plane atomic counter. When Tx terminates (either with > commit or rollback) it updates its local LWM in the same manner as > update counters do it using holes tracking. For example, if partition's > LWM = 10 now, and tx with id (HWM id) = 12 commits, we do not update > partition LWM until tx with id = 11 is committed. When id = 11 is > committed, LWM is set to 12. If we have LWM == N, this means that all > transactions with id <= N have been terminated for the current partition > and all data is already recorded in the local partition. > > Brief summary for both counters: HWM - means that partition has already > seen at least one update of transactions with id <= HWM. LWM means that > partition has all updates made by transactions wth id <= LWM. > > LWM is always <= HWM. > > On checkpoint we should store only these two counters in checkpoint > record. As optimization we can also store list of pending LWMs - ids > which haven't been merged to LWM because of the holes in sequence. > > Historical rebalance: > > 1. Demander knows its LWM - all updates before it has been applied. > Demander sends LWM to supplier. > > 2. Supplier finds the earliest checkpoint where HWM(supplier) <= LWM > (demander) > > 3. Supplier starts moving forward on WAL until it finds first data > record with HWM id = LWM (demander). From this point WAL can be > rebalanced to demander. > > In this approach updates and checkpoints on primary and backup can be > reordered in any way, but we can always find a proper point to read WAL > from. > > Let's consider a couple of examples. In this examples transaction > updates marked as w1(a) - transaction 1 updates key=a, c1 - transaction > 1 is committed, cp(1, 0) - checkpoint with HWM=1 and LWM=0. (HWM,LWM) - > current counters after operation. (HWM,LWM[hole1, hole2]) - counters > with holes in LWM. > > > 1. Simple case with no reordering: > > PRIMARY > -----w1(a)---cp(1,0)---w2(b)----w1(c)--------------c1--------c2-----cp(2,2) > (HWM,LWM) (1,0) (2,0) (2,0) (2,1) (2,2) > | | | | | > BACKUP > ------w1(a)-------------w2(b)----w1(c)---cp(2,0)----c1--------c2-----cp(2,2) > (HWM,LWM) (1,0) (2,0) (2,0) (2,1) (2,2) > > > In this case if backup failed before c1 it will receive all updates from > the beginning (HWM=0). > If it fails between c1 and c2, it will receive WAL from primary's cp(1,0), > because tx with id=1 is fully processed on backup: HWM(supplier cp(1,0))=1 > == LWM(demander)=1 > if backup fails after c2, it will receive nothing because it has all > updates HWM(supplier)=2 == LWM(demander)=2 > > > > 2. Case with reordering > > PRIMARY > -----w1(a)---cp(1,0)---w2(b)------cp(2,0)----------w1(c)------c1-----c2-----------cp(2,2) > (HWM,LWM) (1,0) (2,0) (2,0) > (2,1) (2,2) > \_____ | | > \ | > \_______ | | > \__|_______ > \__|______ | > | \ > | \ | > | \ > BACKUP > -------------------------w2(b)---w1(a)----cp(2,0)---w1(c)------------c2-------c1-----cp(2,2) > (HWM,LWM) (2,0) (2,0) (2,0) > (2,0[2]) (2,2) > > > Note here we have a hole on backup when tx2 has committed earlier than tx1 > and LWM wasn't changed at this moment. > > In last case if backup is failed before c1, the entire WAL will be > supplied because LWM=0 until this moment. > If backup fails after c1 - there is nothing to rebalance, because > HWM(supplier)=2 == LWM(demander)=2 > > > What do you think? > > > -- > Kind Regards > Roman Kondakov > > On 30.11.2018 2:01, Seliverstov Igor wrote: > > Vladimir, > > > > Look at my example: > > > > One active transaction (Tx1 which does opX ops) while another tx (Tx2 > which > > does opX' ops) is finishes with uc4: > > > > > ----uc1--op1----op2---uc2--op1'----uc3--uc4---op3------------X------------- > > Node1 > > > > > > > > ----uc1----op1----uc2----op2----uc3--op3----------uc4----cp1---- Tx1 > - > > ^ | | > > | > > ------------------------ > > | -Node2 > > ^------ > > | > > | > > | > > ----uc1-------------uc2-------------uc3--------op1'----uc4----cp1---- > > Tx2 - > > > > > > state on Node2: tx1 -> op3 -> uc2 > > cp1 [current=uc4, backpointer=uc2] > > > > Here op2 was acknowledged by op3, op3 was applied before op1' (linearized > > by WAL). > > > > All nodes having uc4 must have op1' because uc4 cannot be get earlier > than > > prepare stage while prepare stage happens after all updates so *op1' > > happens before uc4* regardless Tx2 was committed or rolled back. > > > > This means *op2 happens before uc4* (uc4 cannot be earlier op2 on any > node > > because on Node2 op2 was already finished (acknowledged by op3) when op1' > > happens) > > > > That was my idea which easy to proof. > > > > You used a different approach, but yes, It has to work. > > > > чт, 29 нояб. 2018 г. в 22:19, Vladimir Ozerov <voze...@gridgain.com>: > > > >> "If more recent WAL records will contain *ALL* updates of the > transaction" > >> -> "More recent WAL records will contain *ALL* updates of the > transaction" > >> > >> On Thu, Nov 29, 2018 at 10:15 PM Vladimir Ozerov <voze...@gridgain.com> > >> wrote: > >> > >>> Igor, > >>> > >>> Yes, I tried to draw different configurations, and it really seems to > >>> work, despite of being very hard to proof due to non-inituitive HB > edges. > >>> So let me try to spell the algorithm once again to make sure that we > are > >> on > >>> the same page here. > >>> > >>> 1) There are two nodes - primary (P) and backup (B) > >>> 2) There are three type of events: small transactions which possibly > >>> increments update counter (ucX), one long active transaction which is > >> split > >>> into multiple operations (opX), and checkpoints (cpX) > >>> 3) Every node always has current update counter. When transaction > commits > >>> it may or may not shift this counter further depending on whether there > >> are > >>> holes behind. But we have a strict rule that it always grow. Higher > >>> coutners synchrnoizes with smaller. Possible cases: > >>> ----uc1----uc2----uc3---- > >>> ----uc1--------uc3------- // uc2 missing due to reorder, but is is ok > >>> > >>> 4) Operations within a single transaction is always applied > sequentially, > >>> and hence also have HB edge: > >>> ----op1----op2----op3---- > >>> > >>> 5) When transaction operation happens, we save in memory current update > >>> counter available at this moment. I.e. we have a map from transaction > ID > >> to > >>> update counter which was relevant by the time last *completed* > operation > >>> *started*. This is very important thing - we remember the counter when > >>> operation starts, but update the map only when it finishes. This is > >> needed > >>> for situation when update counter is bumber in the middle of a long > >>> operation. > >>> ----uc1----op1----op2----uc2----uc3----op3---- > >>> | | | > >>> uc1 uc1 uc3 > >>> > >>> state: tx1 -> op3 -> uc3 > >>> > >>> 6) Whenever checkpoint occurs, we save two counters with: "current" and > >>> "backpointer". The latter is the smallest update counter associated > with > >>> active transactions. If there are no active transactions, current > update > >>> counter is used. > >>> > >>> Example 1: no active transactions. > >>> ----uc1----cp1---- > >>> ^ | > >>> -------- > >>> > >>> state: cp1 [current=uc1, backpointer=uc1] > >>> > >>> Example 2: one active transaction: > >>> --------------- > >>> | | > >>> ----uc1----op1----uc2----op2----op3----uc3----cp1---- > >>> ^ | > >>> -------------- > >>> > >>> state: tx1 -> op3 -> uc2 > >>> cp1 [current=uc3, backpointer=uc2] > >>> > >>> 7) Historical rebalance: > >>> 7.1) Demander finds latest checkpoint, get it's backpointer and sends > it > >>> to supplier. > >>> 7.2) Supplier finds earliest checkpoint where [supplier(current) <= > >>> demander(backpointer)] > >>> 7.3) Supplier reads checkpoint backpointer and finds associated WAL > >>> record. This is where we start. > >>> > >>> So in terms of WAL we have: supplier[uc_backpointer <- cp(uc_current <= > >>> demanter_uc_backpointer)] <- demander[uc_backpointer <- cp(last)] > >>> > >>> Now the most important - why it works :-) > >>> 1) Transaction opeartions are sequential, so at the time of crash nodes > >>> are *at most one operation ahead *each other > >>> 2) Demander goes to the past and finds update counter which was current > >> at > >>> the time of last TX completed operation > >>> 3) Supplier goes to the closest checkpoint in the past where this > update > >>> counter either doesn't exist or just appeared > >>> 4) Transaction cannot be committed on supplier at this checkpoint, as > it > >>> would violate UC happens-before rule > >>> 5) Tranasction may have not started yet on supplier at this point. If > >> more > >>> recent WAL records will contain *ALL* updates of the transaction > >>> 6) Transaction may exist on supplier at this checkpoint. Thanks to p.1 > we > >>> must skip at most one operation. Jump back through supplier's > checkpoint > >>> backpointer is guaranteed to do this. > >>> > >>> Igor, do we have the same understanding here? > >>> > >>> Vladimir. > >>> > >>> On Thu, Nov 29, 2018 at 2:47 PM Seliverstov Igor <gvvinbl...@gmail.com > > > >>> wrote: > >>> > >>>> Ivan, > >>>> > >>>> different transactions may be applied in different order on backup > >> nodes. > >>>> That's why we need an active tx set > >>>> and some sorting by their update times. The idea is to identify a > point > >> in > >>>> time which starting from we may lost some updates. > >>>> This point: > >>>> 1) is the last acknowledged by all backups (including possible > >> further > >>>> demander) update on timeline; > >>>> 2) have a specific update counter (aka back-counter) which we > going > >> to > >>>> start iteration from. > >>>> > >>>> After additional thinking on, I've identified a rule: > >>>> > >>>> There is two fences: > >>>> 1) update counter (UC) - this means that all updates, with less UC > >> than > >>>> applied one, was applied on a node, having this UC. > >>>> 2) update in scope of TX - all updates are applied one by one > >>>> sequentially, this means that the fact of update guaranties the > previous > >>>> update (statement) was finished on all TX participants. > >>>> > >>>> Сombining them, we can say the next: > >>>> > >>>> All updates, that was acknowledged at the time the last update of tx, > >>>> which > >>>> updated UC, applied, are guaranteed to be presented on a node having > >> such > >>>> UC > >>>> > >>>> We can use this rule to find an iterator start pointer. > >>>> > >>>> ср, 28 нояб. 2018 г. в 20:26, Павлухин Иван <vololo...@gmail.com>: > >>>> > >>>>> Guys, > >>>>> > >>>>> Another one idea. We can introduce additional update counter which is > >>>>> incremented by MVCC transactions right after executing operation > (like > >>>>> is done for classic transactions). And we can use that counter for > >>>>> searching needed WAL records. Can it did the trick? > >>>>> > >>>>> P.S. Mentally I am trying to separate facilities providing > >>>>> transactions and durability. And it seems to me that those facilities > >>>>> are in different dimensions. > >>>>> ср, 28 нояб. 2018 г. в 16:26, Павлухин Иван <vololo...@gmail.com>: > >>>>>> Sorry, if it was stated that a SINGLE transaction updates are > >> applied > >>>>>> in a same order on all replicas then I have no questions so far. I > >>>>>> thought about reordering updates coming from different transactions. > >>>>>>> I have not got why we can assume that reordering is not possible. > >>>> What > >>>>>> have I missed? > >>>>>> ср, 28 нояб. 2018 г. в 13:26, Павлухин Иван <vololo...@gmail.com>: > >>>>>>> Hi, > >>>>>>> > >>>>>>> Regarding Vladimir's new idea. > >>>>>>>> We assume that transaction can be represented as a set of > >>>>> independent operations, which are applied in the same order on both > >>>> primary > >>>>> and backup nodes. > >>>>>>> I have not got why we can assume that reordering is not possible. > >>>> What > >>>>>>> have I missed? > >>>>>>> вт, 27 нояб. 2018 г. в 14:42, Seliverstov Igor < > >>>> gvvinbl...@gmail.com>: > >>>>>>>> Vladimir, > >>>>>>>> > >>>>>>>> I think I got your point, > >>>>>>>> > >>>>>>>> It should work if we do the next: > >>>>>>>> introduce two structures: active list (txs) and candidate list > >>>>> (updCntr -> > >>>>>>>> txn pairs) > >>>>>>>> > >>>>>>>> Track active txs, mapping them to actual update counter at > >> update > >>>>> time. > >>>>>>>> On each next update put update counter, associated with previous > >>>>> update, > >>>>>>>> into a candidates list possibly overwrite existing value > >> (checking > >>>>> txn) > >>>>>>>> On tx finish remove tx from active list only if appropriate > >> update > >>>>> counter > >>>>>>>> (associated with finished tx) is applied. > >>>>>>>> On update counter update set the minimal update counter from the > >>>>> candidates > >>>>>>>> list as a back-counter, clear the candidate list and remove an > >>>>> associated > >>>>>>>> tx from the active list if present. > >>>>>>>> Use back-counter instead of actual update counter in demand > >>>> message. > >>>>>>>> вт, 27 нояб. 2018 г. в 12:56, Seliverstov Igor < > >>>> gvvinbl...@gmail.com > >>>>>> : > >>>>>>>>> Ivan, > >>>>>>>>> > >>>>>>>>> 1) The list is saved on each checkpoint, wholly (all > >>>> transactions > >>>>> in > >>>>>>>>> active state at checkpoint begin). > >>>>>>>>> We need whole the list to get oldest transaction because after > >>>>>>>>> the previous oldest tx finishes, we need to get the following > >>>> one. > >>>>>>>>> 2) I guess there is a description of how persistent storage > >>>> works > >>>>> and how > >>>>>>>>> it restores [1] > >>>>>>>>> > >>>>>>>>> Vladimir, > >>>>>>>>> > >>>>>>>>> the whole list of what we going to store on checkpoint > >>>> (updated): > >>>>>>>>> 1) Partition counter low watermark (LWM) > >>>>>>>>> 2) WAL pointer of earliest active transaction write to > >> partition > >>>>> at the > >>>>>>>>> time the checkpoint have started > >>>>>>>>> 3) List of prepared txs with acquired partition counters > >> (which > >>>>> were > >>>>>>>>> acquired but not applied yet) > >>>>>>>>> > >>>>>>>>> This way we don't need any additional info in demand message. > >>>>> Start point > >>>>>>>>> can be easily determined using stored WAL "back-pointer". > >>>>>>>>> > >>>>>>>>> [1] > >>>>>>>>> > >> > https://cwiki.apache.org/confluence/display/IGNITE/Ignite+Persistent+Store+-+under+the+hood#IgnitePersistentStore-underthehood-LocalRecoveryProcess > >>>>>>>>> > >>>>>>>>> вт, 27 нояб. 2018 г. в 11:19, Vladimir Ozerov < > >>>>> voze...@gridgain.com>: > >>>>>>>>>> Igor, > >>>>>>>>>> > >>>>>>>>>> Could you please elaborate - what is the whole set of > >>>> information > >>>>> we are > >>>>>>>>>> going to save at checkpoint time? From what I understand this > >>>>> should be: > >>>>>>>>>> 1) List of active transactions with WAL pointers of their > >> first > >>>>> writes > >>>>>>>>>> 2) List of prepared transactions with their update counters > >>>>>>>>>> 3) Partition counter low watermark (LWM) - the smallest > >>>> partition > >>>>> counter > >>>>>>>>>> before which there are no prepared transactions. > >>>>>>>>>> > >>>>>>>>>> And the we send to supplier node a message: "Give me all > >>>> updates > >>>>> starting > >>>>>>>>>> from that LWM plus data for that transactions which were > >> active > >>>>> when I > >>>>>>>>>> failed". > >>>>>>>>>> > >>>>>>>>>> Am I right? > >>>>>>>>>> > >>>>>>>>>> On Fri, Nov 23, 2018 at 11:22 AM Seliverstov Igor < > >>>>> gvvinbl...@gmail.com> > >>>>>>>>>> wrote: > >>>>>>>>>> > >>>>>>>>>>> Hi Igniters, > >>>>>>>>>>> > >>>>>>>>>>> Currently I’m working on possible approaches how to > >> implement > >>>>> historical > >>>>>>>>>>> rebalance (delta rebalance using WAL iterator) over MVCC > >>>> caches. > >>>>>>>>>>> The main difficulty is that MVCC writes changes on tx > >> active > >>>>> phase while > >>>>>>>>>>> partition update version, aka update counter, is being > >>>> applied > >>>>> on tx > >>>>>>>>>>> finish. This means we cannot start iteration over WAL right > >>>>> from the > >>>>>>>>>>> pointer where the update counter updated, but should > >> include > >>>>> updates, > >>>>>>>>>> which > >>>>>>>>>>> the transaction that updated the counter did. > >>>>>>>>>>> > >>>>>>>>>>> These updates may be much earlier than the point where the > >>>>> update > >>>>>>>>>> counter > >>>>>>>>>>> was updated, so we have to be able to identify the point > >>>> where > >>>>> the first > >>>>>>>>>>> update happened. > >>>>>>>>>>> > >>>>>>>>>>> The proposed approach includes: > >>>>>>>>>>> > >>>>>>>>>>> 1) preserve list of active txs, sorted by the time of their > >>>>> first update > >>>>>>>>>>> (using WAL ptr of first WAL record in tx) > >>>>>>>>>>> > >>>>>>>>>>> 2) persist this list on each checkpoint (together with > >> TxLog > >>>> for > >>>>>>>>>> example) > >>>>>>>>>>> 4) send whole active tx list (transactions which were in > >>>> active > >>>>> state at > >>>>>>>>>>> the time the node was crushed, empty list in case of > >> graceful > >>>>> node > >>>>>>>>>> stop) as > >>>>>>>>>>> a part of partition demand message. > >>>>>>>>>>> > >>>>>>>>>>> 4) find a checkpoint where the earliest tx exists in > >>>> persisted > >>>>> txs and > >>>>>>>>>> use > >>>>>>>>>>> saved WAL ptr as a start point or apply current approach in > >>>>> case the > >>>>>>>>>> active > >>>>>>>>>>> tx list (sent on previous step) is empty > >>>>>>>>>>> > >>>>>>>>>>> 5) start iteration. > >>>>>>>>>>> > >>>>>>>>>>> Your thoughts? > >>>>>>>>>>> > >>>>>>>>>>> Regards, > >>>>>>>>>>> Igor > >>>>>>> > >>>>>>> > >>>>>>> -- > >>>>>>> Best regards, > >>>>>>> Ivan Pavlukhin > >>>>>> > >>>>>> > >>>>>> -- > >>>>>> Best regards, > >>>>>> Ivan Pavlukhin > >>>>> > >>>>> > >>>>> -- > >>>>> Best regards, > >>>>> Ivan Pavlukhin > >>>>> >