This is an automated email from the ASF dual-hosted git repository.

maxyang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/cloudberry.git

commit 76d75e260a847ed6e93a4ae362ad949814417aad
Author: Tom Lane <[email protected]>
AuthorDate: Tue Jul 26 13:07:03 2022 -0400

    Force immediate commit after CREATE DATABASE etc in extended protocol.
    
    We have a few commands that "can't run in a transaction block",
    meaning that if they complete their processing but then we fail
    to COMMIT, we'll be left with inconsistent on-disk state.
    However, the existing defenses for this are only watertight for
    simple query protocol.  In extended protocol, we didn't commit
    until receiving a Sync message.  Since the client is allowed to
    issue another command instead of Sync, we're in trouble if that
    command fails or is an explicit ROLLBACK.  In any case, sitting
    in an inconsistent state while waiting for a client message
    that might not come seems pretty risky.
    
    This case wasn't reachable via libpq before we introduced pipeline
    mode, but it's always been an intended aspect of extended query
    protocol, and likely there are other clients that could reach it
    before.
    
    To fix, set a flag in PreventInTransactionBlock that tells
    exec_execute_message to force an immediate commit.  This seems
    to be the approach that does least damage to existing working
    cases while still preventing the undesirable outcomes.
    
    While here, add some documentation to protocol.sgml that explicitly
    says how to use pipelining.  That's latent in the existing docs if
    you know what to look for, but it's better to spell it out; and it
    provides a place to document this new behavior.
    
    Per bug #17434 from Yugo Nagata.  It's been wrong for ages,
    so back-patch to all supported branches.
    
    Discussion: https://postgr.es/m/[email protected]
---
 doc/src/sgml/protocol.sgml        | 58 +++++++++++++++++++++++++++++++++++++++
 src/backend/access/transam/xact.c | 15 +++++++++-
 src/backend/tcop/postgres.c       | 52 ++++++++++++++++++-----------------
 src/include/access/xact.h         |  6 ++++
 4 files changed, 105 insertions(+), 26 deletions(-)

diff --git a/doc/src/sgml/protocol.sgml b/doc/src/sgml/protocol.sgml
index d2111d817f..cf1fadcda4 100644
--- a/doc/src/sgml/protocol.sgml
+++ b/doc/src/sgml/protocol.sgml
@@ -1050,6 +1050,64 @@ SELCT 1/0;<!-- this typo is intentional -->
    </note>
   </sect2>
 
+  <sect2 id="protocol-flow-pipelining">
+   <title>Pipelining</title>
+
+   <indexterm zone="protocol-flow-pipelining">
+    <primary>pipelining</primary>
+    <secondary>protocol specification</secondary>
+   </indexterm>
+
+   <para>
+    Use of the extended query protocol
+    allows <firstterm>pipelining</firstterm>, which means sending a series
+    of queries without waiting for earlier ones to complete.  This reduces
+    the number of network round trips needed to complete a given series of
+    operations.  However, the user must carefully consider the required
+    behavior if one of the steps fails, since later queries will already
+    be in flight to the server.
+   </para>
+
+   <para>
+    One way to deal with that is to make the whole query series be a
+    single transaction, that is wrap it in <command>BEGIN</command> ...
+    <command>COMMIT</command>.  However, this does not help if one wishes
+    for some of the commands to commit independently of others.
+   </para>
+
+   <para>
+    The extended query protocol provides another way to manage this
+    concern, which is to omit sending Sync messages between steps that
+    are dependent.  Since, after an error, the backend will skip command
+    messages until it finds Sync, this allows later commands in a pipeline
+    to be skipped automatically when an earlier one fails, without the
+    client having to manage that explicitly with <command>BEGIN</command>
+    and <command>COMMIT</command>.  Independently-committable segments
+    of the pipeline can be separated by Sync messages.
+   </para>
+
+   <para>
+    If the client has not issued an explicit <command>BEGIN</command>,
+    then each Sync ordinarily causes an implicit <command>COMMIT</command>
+    if the preceding step(s) succeeded, or an
+    implicit <command>ROLLBACK</command> if they failed.  However, there
+    are a few DDL commands (such as <command>CREATE DATABASE</command>)
+    that cannot be executed inside a transaction block.  If one of
+    these is executed in a pipeline, it will, upon success, force an
+    immediate commit to preserve database consistency.
+    A Sync immediately following one of these has no effect except to
+    respond with ReadyForQuery.
+   </para>
+
+   <para>
+    When using this method, completion of the pipeline must be determined
+    by counting ReadyForQuery messages and waiting for that to reach the
+    number of Syncs sent.  Counting command completion responses is
+    unreliable, since some of the commands may not be executed and thus not
+    produce a completion message.
+   </para>
+  </sect2>
+
   <sect2>
    <title>Function Call</title>
 
diff --git a/src/backend/access/transam/xact.c 
b/src/backend/access/transam/xact.c
index 04f4e10ff3..ad1b09d6e8 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -4375,6 +4375,9 @@ AbortCurrentTransaction(void)
  *     could issue more commands and possibly cause a failure after the 
statement
  *     completes).  Subtransactions are verboten too.
  *
+ *     We must also set XACT_FLAGS_NEEDIMMEDIATECOMMIT in MyXactFlags, to 
ensure
+ *     that postgres.c follows through by committing after the statement is 
done.
+ *
  *     isTopLevel: passed down from ProcessUtility to determine whether we are
  *     inside a function.  (We will always fail if this is false, but it's
  *     convenient to centralize the check here instead of making callers do 
it.)
@@ -4416,7 +4419,9 @@ PreventInTransactionBlock(bool isTopLevel, const char 
*stmtType)
        if (CurrentTransactionState->blockState != TBLOCK_DEFAULT &&
                CurrentTransactionState->blockState != TBLOCK_STARTED)
                elog(FATAL, "cannot prevent transaction chain");
-       /* all okay */
+
+       /* All okay.  Set the flag to make sure the right thing happens later. 
*/
+       MyXactFlags |= XACT_FLAGS_NEEDIMMEDIATECOMMIT;
 }
 
 /*
@@ -4513,6 +4518,13 @@ IsInTransactionBlock(bool isTopLevel)
                CurrentTransactionState->blockState != TBLOCK_STARTED)
                return true;
 
+       /*
+        * If we tell the caller we're not in a transaction block, then inform
+        * postgres.c that it had better commit when the statement is done.
+        * Otherwise our report could be a lie.
+        */
+       MyXactFlags |= XACT_FLAGS_NEEDIMMEDIATECOMMIT;
+
        return false;
 }
 
@@ -4569,6 +4581,7 @@ CallXactCallbacks(XactEvent event)
 
        for (item = Xact_callbacks; item; item = item->next)
                item->callback(event, item->arg);
+
 }
 
 /* Register or deregister callback functions for start/end Xact.  Call only 
once. */
diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c
index 05f0abd401..09671cb2fd 100644
--- a/src/backend/tcop/postgres.c
+++ b/src/backend/tcop/postgres.c
@@ -1971,6 +1971,13 @@ exec_simple_query(const char *query_string)
                }
                else
                {
+                       /*
+                        * We had better not see XACT_FLAGS_NEEDIMMEDIATECOMMIT 
set if
+                        * we're not calling finish_xact_command().  (The 
implicit
+                        * transaction block should have prevented it from 
getting set.)
+                        */
+                       Assert(!(MyXactFlags & XACT_FLAGS_NEEDIMMEDIATECOMMIT));
+
                        /*
                         * We need a CommandCounterIncrement after every query, 
except
                         * those that start or end a transaction block.
@@ -2868,32 +2875,16 @@ exec_execute_message(const char *portal_name, int64 
max_rows)
 
        /*
         * We must copy the sourceText and prepStmtName into MessageContext in
-        * case the portal is destroyed during finish_xact_command. Can avoid 
the
-        * copy if it's not an xact command, though.
+        * case the portal is destroyed during finish_xact_command.  We do not
+        * make a copy of the portalParams though, preferring to just not print
+        * them in that case.
         */
-       if (is_xact_command)
-       {
-               sourceText = pstrdup(portal->sourceText);
-               if (portal->prepStmtName)
-                       prepStmtName = pstrdup(portal->prepStmtName);
-               else
-                       prepStmtName = "<unnamed>";
-
-               /*
-                * An xact command shouldn't have any parameters, which is a 
good
-                * thing because they wouldn't be around after 
finish_xact_command.
-                */
-               portalParams = NULL;
-       }
+       sourceText = pstrdup(portal->sourceText);
+       if (portal->prepStmtName)
+               prepStmtName = pstrdup(portal->prepStmtName);
        else
-       {
-               sourceText = portal->sourceText;
-               if (portal->prepStmtName)
-                       prepStmtName = portal->prepStmtName;
-               else
-                       prepStmtName = "<unnamed>";
-               portalParams = portal->portalParams;
-       }
+               prepStmtName = "<unnamed>";
+       portalParams = portal->portalParams;
 
        /*
         * Report query to various monitoring facilities.
@@ -2992,13 +2983,24 @@ exec_execute_message(const char *portal_name, int64 
max_rows)
 
        if (completed)
        {
-               if (is_xact_command)
+               if (is_xact_command || (MyXactFlags & 
XACT_FLAGS_NEEDIMMEDIATECOMMIT))
                {
                        /*
                         * If this was a transaction control statement, commit 
it.  We
                         * will start a new xact command for the next command 
(if any).
+                        * Likewise if the statement required immediate commit. 
 Without
+                        * this provision, we wouldn't force commit until Sync 
is
+                        * received, which creates a hazard if the client tries 
to
+                        * pipeline immediate-commit statements.
                         */
                        finish_xact_command();
+
+                       /*
+                        * These commands typically don't have any parameters, 
and even if
+                        * one did we couldn't print them now because the 
storage went
+                        * away during finish_xact_command.  So pretend there 
were none.
+                        */
+                       portalParams = NULL;
                }
                else
                {
diff --git a/src/include/access/xact.h b/src/include/access/xact.h
index 5a35d7de88..c38e30ea38 100644
--- a/src/include/access/xact.h
+++ b/src/include/access/xact.h
@@ -116,6 +116,12 @@ extern int MyXactFlags;
  */
 #define XACT_FLAGS_ACQUIREDACCESSEXCLUSIVELOCK (1U << 1)
 
+/*
+ * XACT_FLAGS_NEEDIMMEDIATECOMMIT - records whether the top level statement
+ * is one that requires immediate commit, such as CREATE DATABASE.
+ */
+#define XACT_FLAGS_NEEDIMMEDIATECOMMIT                 (1U << 2)
+
 /*
  *     start- and end-of-transaction callbacks for dynamically loaded modules
  */


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to