This is an automated email from the ASF dual-hosted git repository.

avamingli pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/cloudberry.git

commit 5f53e448b92ce182b345820bacc4ce27c4c4933f
Author: Huansong Fu <[email protected]>
AuthorDate: Thu Jan 11 20:45:05 2024 -0800

    Request syncrep for the forget commit in the remote_apply mode
    
    Currently we lazily flush the forget commit record of a dtx and sync it to a
    mirror. This has been fine since dtx recovery can take care of losing the
    record and regenerate it. Also, allowing syncrep could have negative 
performance
    impact so it shouldbe limited.
    
    However, in the case of enabling hot standby with 
synchronous_mode=remote_apply,
    it is absolutely required that the forget commit is applied on the standby 
as
    soon as the query is completed on primary. Because otherwise, the standby 
might
    not see the forget commit which makes it think the dtx still in-progress. 
So it
    would view the changes made by the dtx invisible. This would be an incorrect
    behavior of remote_apply.
    
    So now request it before supporting hot standby in the subsequent commits.
---
 src/backend/access/transam/xact.c | 10 ++++++++--
 src/backend/cdb/cdbtm.c           | 18 ++++++++++++++----
 src/include/access/xact.h         |  2 +-
 3 files changed, 23 insertions(+), 7 deletions(-)

diff --git a/src/backend/access/transam/xact.c 
b/src/backend/access/transam/xact.c
index dba50f9759..12355b4749 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -1825,17 +1825,23 @@ cleanup:
 /*
  *     RecordDistributedForgetCommitted
  */
-void
+XLogRecPtr
 RecordDistributedForgetCommitted(DistributedTransactionId gxid)
 {
        xl_xact_distributed_forget xlrec;
+       XLogRecPtr recptr;
 
        xlrec.gxid = gxid;
 
        XLogBeginInsert();
        XLogRegisterData((char *) &xlrec, sizeof(xl_xact_distributed_forget));
 
-       XLogInsert(RM_XACT_ID, XLOG_XACT_DISTRIBUTED_FORGET);
+       recptr = XLogInsert(RM_XACT_ID, XLOG_XACT_DISTRIBUTED_FORGET);
+       /* only flush immediately if we want to wait for remote_apply */
+       if (synchronous_commit >= SYNCHRONOUS_COMMIT_REMOTE_APPLY)
+               XLogFlush(recptr);
+
+       return recptr;
 }
 
 /*
diff --git a/src/backend/cdb/cdbtm.c b/src/backend/cdb/cdbtm.c
index d11814dd39..ea40ed9944 100644
--- a/src/backend/cdb/cdbtm.c
+++ b/src/backend/cdb/cdbtm.c
@@ -25,6 +25,7 @@
 #include "libpq/libpq-be.h"
 #include "libpq/pqformat.h"
 #include "miscadmin.h"
+#include "replication/syncrep.h"
 #include "storage/lmgr.h"
 #include "storage/pmsignal.h"
 #include "storage/s_lock.h"
@@ -121,7 +122,7 @@ int gp_gxid_prefetch_num;
  * FUNCTIONS PROTOTYPES
  */
 static void doPrepareTransaction(void);
-static void doInsertForgetCommitted(void);
+static XLogRecPtr doInsertForgetCommitted(void);
 static void doNotifyingOnePhaseCommit(void);
 static void doNotifyingCommitPrepared(void);
 static void doNotifyingAbort(void);
@@ -515,17 +516,21 @@ doPrepareTransaction(void)
 /*
  * Insert FORGET COMMITTED into the xlog.
  */
-static void
+static XLogRecPtr
 doInsertForgetCommitted(void)
 {
+       XLogRecPtr recptr;
+
        elog(DTM_DEBUG5, "doInsertForgetCommitted entering in state = %s", 
DtxStateToString(MyTmGxactLocal->state));
 
        setCurrentDtxState(DTX_STATE_INSERTING_FORGET_COMMITTED);
 
-       RecordDistributedForgetCommitted(getDistributedTransactionId());
+       recptr = 
RecordDistributedForgetCommitted(getDistributedTransactionId());
 
        setCurrentDtxState(DTX_STATE_INSERTED_FORGET_COMMITTED);
        MyTmGxact->includeInCkpt = false;
+
+       return recptr;
 }
 
 static void
@@ -561,6 +566,7 @@ doNotifyingCommitPrepared(void)
        MemoryContext oldcontext = CurrentMemoryContext;;
        time_t          retry_time_start;
        bool            retry_timedout;
+       XLogRecPtr      recptr;
 
        elog(DTM_DEBUG5, "doNotifyingCommitPrepared entering in state = %s", 
DtxStateToString(MyTmGxactLocal->state));
 
@@ -665,7 +671,7 @@ doNotifyingCommitPrepared(void)
 
        SIMPLE_FAULT_INJECTOR("dtm_before_insert_forget_comitted");
 
-       doInsertForgetCommitted();
+       recptr = doInsertForgetCommitted();
 
        /*
         * We release the TwophaseCommitLock only after writing our distributed
@@ -673,6 +679,10 @@ doNotifyingCommitPrepared(void)
         * their commit prepared records.
         */
        LWLockRelease(TwophaseCommitLock);
+
+       /* wait for sync'ing the FORGET commit to hot standby, if remote_apply 
or higher is requested. */
+       if (synchronous_commit >= SYNCHRONOUS_COMMIT_REMOTE_APPLY)
+               SyncRepWaitForLSN(recptr, true);
 }
 
 static void
diff --git a/src/include/access/xact.h b/src/include/access/xact.h
index c38e30ea38..bc1df57ba2 100644
--- a/src/include/access/xact.h
+++ b/src/include/access/xact.h
@@ -508,7 +508,7 @@ extern void UnregisterXactCallbackOnce(XactCallback 
callback, void *arg);
 extern void RegisterSubXactCallback(SubXactCallback callback, void *arg);
 extern void UnregisterSubXactCallback(SubXactCallback callback, void *arg);
 
-extern void RecordDistributedForgetCommitted(DistributedTransactionId gxid);
+extern XLogRecPtr RecordDistributedForgetCommitted(DistributedTransactionId 
gxid);
 extern bool IsSubTransactionAssignmentPending(void);
 extern void MarkSubTransactionAssigned(void);
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to