sujatha <sujatha.sivaku...@mariadb.com> writes: Hi Sujutha,
> @sql/sql_class.h > Moved 'wait_for_prior_commit(THD *thd)' method inside sql_class.cc > > @sql/sql_class.cc > Added code to check for 'stop_on_error_sub_id' for event groups which get > skipped > and don't have any preceding group to wait for. This looks like the wrong fix. The wait_for_commit mechanism is a low-level API, it should not deal with things like stop_on_error_sub_id. > 1,2,3 are scheduled in parallel. > Since the above is true 'skip_event_group=true' is set. Simply call > 'wait_for_prior_commit' to wakeup all waiters. Group '2' didn't had any > waitee and its execution is skipped. Hence its wakeup_error=0.It sends a > positive wakeup signal to '3'. Which commits. This results in a missed > transaction. i.e 33 is missed. I think this the the real problem. 2 is stopping because of error, it must not call wakeup_subsequent_commits() without propagating the error, that breaks the whole semantics of wait_for_commit. Maybe it's enough just to also set rgi->worker_error here, when skip_event_group is set due to error in an earlier group? if (unlikely(entry->stop_on_error_sub_id <= rgi->wait_commit_sub_id)) skip_event_group= true; Then wakeup_subsequent_commits() will correctly inform the following transaction about the error so it doesn't try to commit on its own. There is already similar code in the retry_event_group() function: if (entry->stop_on_error_sub_id == (uint64) ULONGLONG_MAX || rgi->gtid_sub_id < entry->stop_on_error_sub_id) ... else { err= rgi->worker_error= 1; Then all of the changes to sql_class.h and sql_class.cc can be omittet. Also, this fix doesn't seem to belong with the other MDEV-18648 changes, it is unrelated to what the default parallel replication mode is. So please do it in a separate commit. Hope this helps, - Kristian. > diff --git a/sql/sql_class.cc b/sql/sql_class.cc > index 4eab241232b..5ba9c5fe456 100644 > --- a/sql/sql_class.cc > +++ b/sql/sql_class.cc > @@ -7365,6 +7365,33 @@ > wait_for_commit::register_wait_for_prior_commit(wait_for_commit *waitee) > } > > > +int wait_for_commit::wait_for_prior_commit(THD *thd) > +{ > + /* > + Quick inline check, to avoid function call and locking in the common > case > + where no wakeup is registered, or a registered wait was already > signalled. > + */ > + if (waitee) > + return wait_for_prior_commit2(thd); > + else > + { > + if (wakeup_error) > + my_error(ER_PRIOR_COMMIT_FAILED, MYF(0)); > + else > + { > + rpl_group_info* rgi= thd->rgi_slave; > + if (rgi && rgi->is_parallel_exec && > + rgi->parallel_entry->stop_on_error_sub_id < (uint64)ULONGLONG_MAX > && > + rgi->gtid_sub_id >= rgi->parallel_entry->stop_on_error_sub_id) > + { > + my_error(ER_PRIOR_COMMIT_FAILED, MYF(0)); > + wakeup_error= ER_PRIOR_COMMIT_FAILED; > + } > + } > + return wakeup_error; > + } > +} > + > /* > Wait for commit of another transaction to complete, as already registered > with register_wait_for_prior_commit(). If the commit already completed, > @@ -7387,6 +7414,17 @@ wait_for_commit::wait_for_prior_commit2(THD *thd) > { > if (wakeup_error) > my_error(ER_PRIOR_COMMIT_FAILED, MYF(0)); > + else > + { > + rpl_group_info* rgi= thd->rgi_slave; > + if (rgi && rgi->is_parallel_exec && > + rgi->parallel_entry->stop_on_error_sub_id < (uint64)ULONGLONG_MAX > && > + rgi->gtid_sub_id >= rgi->parallel_entry->stop_on_error_sub_id) > + { > + my_error(ER_PRIOR_COMMIT_FAILED, MYF(0)); > + wakeup_error= ER_PRIOR_COMMIT_FAILED; > + } > + } > goto end; > } > /* > diff --git a/sql/sql_class.h b/sql/sql_class.h > index 72cb8148895..0a0a1aa9fa1 100644 > --- a/sql/sql_class.h > +++ b/sql/sql_class.h > @@ -2062,21 +2062,6 @@ struct wait_for_commit > bool commit_started; > > void register_wait_for_prior_commit(wait_for_commit *waitee); > - int wait_for_prior_commit(THD *thd) > - { > - /* > - Quick inline check, to avoid function call and locking in the common > case > - where no wakeup is registered, or a registered wait was already > signalled. > - */ > - if (waitee) > - return wait_for_prior_commit2(thd); > - else > - { > - if (wakeup_error) > - my_error(ER_PRIOR_COMMIT_FAILED, MYF(0)); > - return wakeup_error; > - } > - } > void wakeup_subsequent_commits(int wakeup_error_arg) > { > /* > @@ -2123,6 +2108,7 @@ struct wait_for_commit > > void wakeup(int wakeup_error); > > + int wait_for_prior_commit(THD *thd); > int wait_for_prior_commit2(THD *thd); > void wakeup_subsequent_commits2(int wakeup_error); > void unregister_wait_for_prior_commit2(); _______________________________________________ Mailing list: https://launchpad.net/~maria-developers Post to : maria-developers@lists.launchpad.net Unsubscribe : https://launchpad.net/~maria-developers More help : https://help.launchpad.net/ListHelp