This is an automated email from the ASF dual-hosted git repository.

chenjinbao1989 pushed a commit to branch cbdb-postgres-merge
in repository https://gitbox.apache.org/repos/asf/cloudberry.git


The following commit(s) were added to refs/heads/cbdb-postgres-merge by this 
push:
     new 58168417285 Fix conflicts for replication
58168417285 is described below

commit 58168417285caa2dc48bdbd85b0f366a52ac32e2
Author: Jinbao Chen <[email protected]>
AuthorDate: Sat Jul 26 14:42:26 2025 +0800

    Fix conflicts for replication
---
 .../libpqwalreceiver/libpqwalreceiver.c            |   4 -
 src/backend/replication/logical/decode.c           |   9 +-
 src/backend/replication/logical/launcher.c         |   6 -
 src/backend/replication/logical/origin.c           |  11 +-
 src/backend/replication/logical/relation.c         |  37 -----
 src/backend/replication/logical/reorderbuffer.c    |  12 --
 src/backend/replication/logical/tablesync.c        |  11 +-
 src/backend/replication/logical/worker.c           | 163 +--------------------
 src/backend/replication/pgoutput/pgoutput.c        | 100 -------------
 src/backend/replication/repl_gram.y                |  18 ---
 src/backend/replication/repl_scanner.l             |  10 --
 src/backend/replication/slot.c                     |  28 ----
 src/backend/replication/slotfuncs.c                |   3 -
 src/backend/replication/syncrep.c                  |  72 +--------
 src/backend/replication/walreceiver.c              |  39 -----
 src/backend/replication/walsender.c                |  34 +----
 src/include/storage/buffile.h                      |   2 +-
 17 files changed, 15 insertions(+), 544 deletions(-)

diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c 
b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
index ec9d352c440..560e432be2c 100644
--- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
+++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
@@ -36,7 +36,6 @@
 #include "utils/pg_lsn.h"
 #include "utils/tuplestore.h"
 
-<<<<<<< HEAD
 /*
  * In PostgreSQL, this is a dynamically loaded module, because PostgreSQL
  * doesn't want to link libpq statically into the backend.  In GPDB, we have
@@ -44,9 +43,6 @@
  * compiled and linked directly as part of the postgres binary, like any
  * other backend .c file.
  */
-=======
-PG_MODULE_MAGIC;
->>>>>>> REL_16_9
 
 struct WalReceiverConn
 {
diff --git a/src/backend/replication/logical/decode.c 
b/src/backend/replication/logical/decode.c
index 227911d70e8..0004786e359 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -92,11 +92,7 @@ LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, 
XLogReaderState *recor
 {
        XLogRecordBuffer buf;
        TransactionId txid;
-<<<<<<< HEAD
-       RmgrData rmgr;
-=======
        RmgrData        rmgr;
->>>>>>> REL_16_9
 
        buf.origptr = ctx->reader->ReadRecPtr;
        buf.endptr = ctx->reader->EndRecPtr;
@@ -194,14 +190,11 @@ xlog_decode(LogicalDecodingContext *ctx, XLogRecordBuffer 
*buf)
                case XLOG_FPW_CHANGE:
                case XLOG_FPI_FOR_HINT:
                case XLOG_FPI:
-<<<<<<< HEAD
+               case XLOG_OVERWRITE_CONTRECORD:
                /* GPDB_14_MERGE_FIXME: see pg_control.h, Compatible, Figure 
out whether 0xC0 already used? */
                case XLOG_NEXTRELFILENODE:
                case XLOG_OVERWRITE_CONTRECORD:
                case XLOG_ENCRYPTION_LSN:
-=======
-               case XLOG_OVERWRITE_CONTRECORD:
->>>>>>> REL_16_9
                        break;
                default:
                        elog(ERROR, "unexpected RM_XLOG_ID record type: %u", 
info);
diff --git a/src/backend/replication/logical/launcher.c 
b/src/backend/replication/logical/launcher.c
index 6c43c28a731..8395ae7b23c 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -389,15 +389,9 @@ retry:
        }
 
        /*
-<<<<<<< HEAD
-        * We don't allow to invoke more sync workers once we have reached the 
sync
-        * worker limit per subscription. So, just return silently as we might 
get
-        * here because of an otherwise harmless race condition.
-=======
         * We don't allow to invoke more sync workers once we have reached the
         * sync worker limit per subscription. So, just return silently as we
         * might get here because of an otherwise harmless race condition.
->>>>>>> REL_16_9
         */
        if (OidIsValid(relid) && nsyncworkers >= 
max_sync_workers_per_subscription)
        {
diff --git a/src/backend/replication/logical/origin.c 
b/src/backend/replication/logical/origin.c
index c6c445a8ca1..7d1d88a4d92 100644
--- a/src/backend/replication/logical/origin.c
+++ b/src/backend/replication/logical/origin.c
@@ -1272,26 +1272,17 @@ pg_replication_origin_create(PG_FUNCTION_ARGS)
 
        name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
 
-<<<<<<< HEAD
-       /* Replication origins "pg_xxx|gp_xxx" are reserved for internal use */
-       if (IsReservedName(name) || IsReservedGpName(name))
-=======
        /*
         * Replication origins "any and "none" are reserved for system options.
         * The origins "pg_xxx" are reserved for internal use.
         */
-       if (IsReservedName(name) || IsReservedOriginName(name))
->>>>>>> REL_16_9
+       if (IsReservedName(name) || IsReservedOriginName(name) || 
IsReservedGpName(name))
                ereport(ERROR,
                                (errcode(ERRCODE_RESERVED_NAME),
                                 errmsg("replication origin name \"%s\" is 
reserved",
                                                name),
-<<<<<<< HEAD
-                                errdetail("Origin names starting with \"%s\" 
are reserved.", GetReservedPrefix(name))));
-=======
                                 errdetail("Origin names \"%s\", \"%s\", and 
names starting with \"pg_\" are reserved.",
                                                   LOGICALREP_ORIGIN_ANY, 
LOGICALREP_ORIGIN_NONE)));
->>>>>>> REL_16_9
 
        /*
         * If built with appropriate switch, whine when regression-testing
diff --git a/src/backend/replication/logical/relation.c 
b/src/backend/replication/logical/relation.c
index 39156d6afc3..2e0cad1d8a2 100644
--- a/src/backend/replication/logical/relation.c
+++ b/src/backend/replication/logical/relation.c
@@ -17,10 +17,7 @@
 
 #include "postgres.h"
 
-<<<<<<< HEAD
-=======
 #include "access/genam.h"
->>>>>>> REL_16_9
 #include "access/table.h"
 #include "catalog/namespace.h"
 #include "catalog/pg_am_d.h"
@@ -385,11 +382,7 @@ logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE 
lockmode)
                /* Release the no-longer-useful attrmap, if any. */
                if (entry->attrmap)
                {
-<<<<<<< HEAD
                        pfree(entry->attrmap);
-=======
-                       free_attrmap(entry->attrmap);
->>>>>>> REL_16_9
                        entry->attrmap = NULL;
                }
 
@@ -528,8 +521,6 @@ logicalrep_partmap_invalidate_cb(Datum arg, Oid reloid)
 
                while ((entry = (LogicalRepPartMapEntry *) 
hash_seq_search(&status)) != NULL)
                        entry->relmapentry.localrelvalid = false;
-<<<<<<< HEAD
-=======
        }
 }
 
@@ -564,7 +555,6 @@ logicalrep_partmap_reset_relmap(LogicalRepRelation 
*remoterel)
                logicalrep_relmap_free_entry(entry);
 
                memset(entry, 0, sizeof(LogicalRepRelMapEntry));
->>>>>>> REL_16_9
        }
 }
 
@@ -628,25 +618,8 @@ logicalrep_partition_open(LogicalRepRelMapEntry *root,
 
        entry = &part_entry->relmapentry;
 
-<<<<<<< HEAD
        if (found && entry->localrelvalid)
                return entry;
-=======
-       /*
-        * We must always overwrite entry->localrel with the latest partition
-        * Relation pointer, because the Relation pointed to by the old value 
may
-        * have been cleared after the caller would have closed the partition
-        * relation after the last use of this entry.  Note that localrelvalid 
is
-        * only updated by the relcache invalidation callback, so it may still 
be
-        * true irrespective of whether the Relation pointed to by localrel has
-        * been cleared or not.
-        */
-       if (found && entry->localrelvalid)
-       {
-               entry->localrel = partrel;
-               return entry;
-       }
->>>>>>> REL_16_9
 
        /* Switch to longer-lived context. */
        oldctx = MemoryContextSwitchTo(LogicalRepPartMapContext);
@@ -657,16 +630,6 @@ logicalrep_partition_open(LogicalRepRelMapEntry *root,
                part_entry->partoid = partOid;
        }
 
-<<<<<<< HEAD
-=======
-       /* Release the no-longer-useful attrmap, if any. */
-       if (entry->attrmap)
-       {
-               free_attrmap(entry->attrmap);
-               entry->attrmap = NULL;
-       }
-
->>>>>>> REL_16_9
        if (!entry->remoterel.remoteid)
        {
                int                     i;
diff --git a/src/backend/replication/logical/reorderbuffer.c 
b/src/backend/replication/logical/reorderbuffer.c
index 45134cd202d..fa04e829cc9 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -655,13 +655,8 @@ ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId 
xid, bool create,
        }
 
        /*
-<<<<<<< HEAD
-        * If the cache wasn't hit or it yielded a "does-not-exist" and we want
-        * to create an entry.
-=======
         * If the cache wasn't hit or it yielded a "does-not-exist" and we want 
to
         * create an entry.
->>>>>>> REL_16_9
         */
 
        /* search the lookup table */
@@ -3208,14 +3203,7 @@ ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb,
         * Update the total size in top level as well. This is later used to
         * compute the decoding stats.
         */
-<<<<<<< HEAD
-       if (txn->toptxn != NULL)
-               toptxn = txn->toptxn;
-       else
-               toptxn = txn;
-=======
        toptxn = rbtxn_get_toptxn(txn);
->>>>>>> REL_16_9
 
        if (addition)
        {
diff --git a/src/backend/replication/logical/tablesync.c 
b/src/backend/replication/logical/tablesync.c
index 131fca4f1cd..013e1235132 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -784,14 +784,9 @@ fetch_remote_table_info(char *nspname, char *relname,
        WalRcvExecResult *res;
        StringInfoData cmd;
        TupleTableSlot *slot;
-<<<<<<< HEAD
-       Oid                     tableRow[3] = {OIDOID, CHAROID, CHAROID};
-       Oid                     attrRow[4] = {TEXTOID, OIDOID, INT4OID, 
BOOLOID};
-=======
        Oid                     tableRow[] = {OIDOID, CHAROID, CHAROID};
        Oid                     attrRow[] = {INT2OID, TEXTOID, OIDOID, BOOLOID};
        Oid                     qualRow[] = {TEXTOID};
->>>>>>> REL_16_9
        bool            isnull;
        char            relkind;
        int                     natt;
@@ -1242,11 +1237,7 @@ copy_table(Relation rel)
                                                                                
 NULL, false, false);
 
        attnamelist = make_copy_attnamelist(relmapentry);
-<<<<<<< HEAD
-       cstate = BeginCopyFrom(pstate, rel, NULL, NULL, false, copy_read_data, 
NULL, attnamelist, NIL);
-=======
-       cstate = BeginCopyFrom(pstate, rel, NULL, NULL, false, copy_read_data, 
attnamelist, options);
->>>>>>> REL_16_9
+       cstate = BeginCopyFrom(pstate, rel, NULL, NULL, false, copy_read_data, 
NULL, attnamelist, options);
 
        /* Do the copy */
        (void) CopyFrom(cstate);
diff --git a/src/backend/replication/logical/worker.c 
b/src/backend/replication/logical/worker.c
index f132ac56fda..564177a5e40 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -221,15 +221,6 @@ typedef struct FlushPosition
 
 static dlist_head lsn_mapping = DLIST_STATIC_INIT(lsn_mapping);
 
-<<<<<<< HEAD
-typedef struct SlotErrCallbackArg
-{
-       LogicalRepRelMapEntry *rel;
-       int                     remote_attnum;
-} SlotErrCallbackArg;
-
-=======
->>>>>>> REL_16_9
 typedef struct ApplyExecutionData
 {
        EState     *estate;                     /* executor state, used to 
track resources */
@@ -407,14 +398,6 @@ static void send_feedback(XLogRecPtr recvpos, bool force, 
bool requestReply);
 
 static void DisableSubscriptionAndExit(void);
 
-<<<<<<< HEAD
-static void maybe_reread_subscription(void);
-
-/* prototype needed because of stream_commit */
-static void apply_dispatch(StringInfo s);
-
-=======
->>>>>>> REL_16_9
 static void apply_handle_commit_internal(LogicalRepCommitData *commit_data);
 static void apply_handle_insert_internal(ApplyExecutionData *edata,
                                                                                
 ResultRelInfo *relinfo,
@@ -819,29 +802,6 @@ slot_fill_defaults(LogicalRepRelMapEntry *rel, EState 
*estate,
 }
 
 /*
-<<<<<<< HEAD
- * Error callback to give more context info about data conversion failures
- * while reading data from the remote server.
- */
-static void
-slot_store_error_callback(void *arg)
-{
-       SlotErrCallbackArg *errarg = (SlotErrCallbackArg *) arg;
-       LogicalRepRelMapEntry *rel;
-
-       /* Nothing to do if remote attribute number is not set */
-       if (errarg->remote_attnum < 0)
-               return;
-
-       rel = errarg->rel;
-       errcontext("processing remote data for replication target relation 
\"%s.%s\" column \"%s\"",
-                          rel->remoterel.nspname, rel->remoterel.relname,
-                          rel->remoterel.attnames[errarg->remote_attnum]);
-}
-
-/*
-=======
->>>>>>> REL_16_9
  * Store tuple data into slot.
  *
  * Incoming data can be either text or binary format.
@@ -855,17 +815,6 @@ slot_store_data(TupleTableSlot *slot, 
LogicalRepRelMapEntry *rel,
 
        ExecClearTuple(slot);
 
-<<<<<<< HEAD
-       /* Push callback + info on the error context stack */
-       errarg.rel = rel;
-       errarg.remote_attnum = -1;
-       errcallback.callback = slot_store_error_callback;
-       errcallback.arg = (void *) &errarg;
-       errcallback.previous = error_context_stack;
-       error_context_stack = &errcallback;
-
-=======
->>>>>>> REL_16_9
        /* Call the "in" function for each non-dropped, non-null attribute */
        Assert(natts == rel->attrmap->maplen);
        for (i = 0; i < natts; i++)
@@ -879,12 +828,8 @@ slot_store_data(TupleTableSlot *slot, 
LogicalRepRelMapEntry *rel,
 
                        Assert(remoteattnum < tupleData->ncols);
 
-<<<<<<< HEAD
-                       errarg.remote_attnum = remoteattnum;
-=======
                        /* Set attnum for error callback */
                        apply_error_callback_arg.remote_attnum = remoteattnum;
->>>>>>> REL_16_9
 
                        if (tupleData->colstatus[remoteattnum] == 
LOGICALREP_COLUMN_TEXT)
                        {
@@ -932,12 +877,8 @@ slot_store_data(TupleTableSlot *slot, 
LogicalRepRelMapEntry *rel,
                                slot->tts_isnull[i] = true;
                        }
 
-<<<<<<< HEAD
-                       errarg.remote_attnum = -1;
-=======
                        /* Reset attnum for error callback */
                        apply_error_callback_arg.remote_attnum = -1;
->>>>>>> REL_16_9
                }
                else
                {
@@ -986,7 +927,6 @@ slot_modify_data(TupleTableSlot *slot, TupleTableSlot 
*srcslot,
        memcpy(slot->tts_values, srcslot->tts_values, natts * sizeof(Datum));
        memcpy(slot->tts_isnull, srcslot->tts_isnull, natts * sizeof(bool));
 
-<<<<<<< HEAD
        /* For error reporting, push callback + info on the error context stack 
*/
        errarg.rel = rel;
        errarg.remote_attnum = -1;
@@ -995,8 +935,6 @@ slot_modify_data(TupleTableSlot *slot, TupleTableSlot 
*srcslot,
        errcallback.previous = error_context_stack;
        error_context_stack = &errcallback;
 
-=======
->>>>>>> REL_16_9
        /* Call the "in" function for each replaced attribute */
        Assert(natts == rel->attrmap->maplen);
        for (i = 0; i < natts; i++)
@@ -1013,12 +951,8 @@ slot_modify_data(TupleTableSlot *slot, TupleTableSlot 
*srcslot,
                {
                        StringInfo      colvalue = 
&tupleData->colvalues[remoteattnum];
 
-<<<<<<< HEAD
-                       errarg.remote_attnum = remoteattnum;
-=======
                        /* Set attnum for error callback */
                        apply_error_callback_arg.remote_attnum = remoteattnum;
->>>>>>> REL_16_9
 
                        if (tupleData->colstatus[remoteattnum] == 
LOGICALREP_COLUMN_TEXT)
                        {
@@ -1062,12 +996,8 @@ slot_modify_data(TupleTableSlot *slot, TupleTableSlot 
*srcslot,
                                slot->tts_isnull[i] = true;
                        }
 
-<<<<<<< HEAD
-                       errarg.remote_attnum = -1;
-=======
                        /* Reset attnum for error callback */
                        apply_error_callback_arg.remote_attnum = -1;
->>>>>>> REL_16_9
                }
        }
 
@@ -2214,12 +2144,8 @@ apply_spooled_messages(FileSet *stream_fileset, 
TransactionId xid,
        elog(DEBUG1, "replayed %d (all) changes from file \"%s\"",
                 nchanges, path);
 
-<<<<<<< HEAD
-       apply_handle_commit_internal(&commit_data);
-=======
        return;
 }
->>>>>>> REL_16_9
 
 /*
  * Handle STREAM COMMIT message.
@@ -3344,11 +3270,8 @@ apply_handle_truncate(StringInfo s)
                                                relids_logged,
                                                DROP_RESTRICT,
                                                restart_seqs,
-<<<<<<< HEAD
+                                               !MySubscription->runasowner,
                                                NULL);
-=======
-                                               !MySubscription->runasowner);
->>>>>>> REL_16_9
        foreach(lc, remote_rels)
        {
                LogicalRepRelMapEntry *rel = lfirst(lc);
@@ -4093,6 +4016,7 @@ subxact_info_write(Oid subid, TransactionId xid)
        char            path[MAXPGPATH];
        Size            len;
        BufFile    *fd;
+       workfile_set *work_set;
 
        Assert(TransactionIdIsValid(xid));
 
@@ -4112,33 +4036,14 @@ subxact_info_write(Oid subid, TransactionId xid)
         * Create the subxact file if it not already created, otherwise open the
         * existing file.
         */
-<<<<<<< HEAD
-       if (ent->subxact_fileset == NULL)
+       fd = BufFileOpenFileSet(MyLogicalRepWorker->stream_fileset, path, 
O_RDWR,
+                                                       true);
+       if (fd == NULL)
        {
-               MemoryContext oldctx;
-               workfile_set *work_set;
-
-               /*
-                * We need to maintain shared fileset across multiple stream
-                * start/stop calls.  So, need to allocate it in a persistent 
context.
-                */
-               oldctx = MemoryContextSwitchTo(ApplyContext);
-               ent->subxact_fileset = palloc(sizeof(SharedFileSet));
-               SharedFileSetInit(ent->subxact_fileset, NULL);
-               MemoryContextSwitchTo(oldctx);
-
                work_set = workfile_mgr_create_set("Subxact", path, false /* 
hold pin */);
+               fd = BufFileCreateFileSet(MyLogicalRepWorker->stream_fileset, 
path, work_set);
 
-               fd = BufFileCreateShared(ent->subxact_fileset, path, work_set);
        }
-       else
-               fd = BufFileOpenShared(ent->subxact_fileset, path, O_RDWR);
-=======
-       fd = BufFileOpenFileSet(MyLogicalRepWorker->stream_fileset, path, 
O_RDWR,
-                                                       true);
-       if (fd == NULL)
-               fd = BufFileCreateFileSet(MyLogicalRepWorker->stream_fileset, 
path);
->>>>>>> REL_16_9
 
        len = sizeof(SubXactInfo) * subxact_data.nsubxacts;
 
@@ -4358,39 +4263,13 @@ stream_open_file(Oid subid, TransactionId xid, bool 
first_segment)
         * Otherwise, just open the file for writing, in append mode.
         */
        if (first_segment)
-<<<<<<< HEAD
        {
-               MemoryContext savectx;
-               SharedFileSet *fileset;
                workfile_set *work_set;
 
-               if (found)
-                       ereport(ERROR,
-                                       (errcode(ERRCODE_PROTOCOL_VIOLATION),
-                                        errmsg_internal("incorrect 
first-segment flag for streamed replication transaction")));
-
-               /*
-                * We need to maintain shared fileset across multiple stream
-                * start/stop calls. So, need to allocate it in a persistent 
context.
-                */
-               savectx = MemoryContextSwitchTo(ApplyContext);
-               fileset = palloc(sizeof(SharedFileSet));
-
-               SharedFileSetInit(fileset, NULL);
-               MemoryContextSwitchTo(savectx);
-
                work_set = workfile_mgr_create_set("LogicalStreaming", path, 
false /* hold pin */);
-               stream_fd = BufFileCreateShared(fileset, path, work_set);
-
-               /* Remember the fileset for the next stream of the same 
transaction */
-               ent->xid = xid;
-               ent->stream_fileset = fileset;
-               ent->subxact_fileset = NULL;
-       }
-=======
                stream_fd = 
BufFileCreateFileSet(MyLogicalRepWorker->stream_fileset,
-                                                                               
 path);
->>>>>>> REL_16_9
+                                                                               
 path, work_set);
+       }
        else
        {
                /*
@@ -4609,30 +4488,6 @@ void
 InitializeApplyWorker(void)
 {
        MemoryContext oldctx;
-<<<<<<< HEAD
-       char            originname[NAMEDATALEN];
-       XLogRecPtr      origin_startpos;
-       char       *myslotname;
-       WalRcvStreamOptions options;
-
-       /* Attach to slot */
-       logicalrep_worker_attach(worker_slot);
-
-       /* Setup signal handling */
-       pqsignal(SIGHUP, SignalHandlerForConfigReload);
-       pqsignal(SIGTERM, die);
-       BackgroundWorkerUnblockSignals();
-
-       /*
-        * We don't currently need any ResourceOwner in a walreceiver process, 
but
-        * if we did, we could call CreateAuxProcessResourceOwner here.
-        */
-
-       /* Initialise stats to a sanish value */
-       MyLogicalRepWorker->last_send_time = MyLogicalRepWorker->last_recv_time 
=
-               MyLogicalRepWorker->reply_time = GetCurrentTimestamp();
-
-       /* Load the libpq-specific functions */
        /*
         * In GPDB, we build libpqwalreceiver functions, as well as a copy of
         * libpq into the backend itself, to support QD-QE communication. See
@@ -4640,8 +4495,6 @@ InitializeApplyWorker(void)
         */
        if (!WalReceiverFunctions)
                libpqwalreceiver_PG_init();
-=======
->>>>>>> REL_16_9
 
        /* Run as replica session replication role. */
        SetConfigOption("session_replication_role", "replica",
diff --git a/src/backend/replication/pgoutput/pgoutput.c 
b/src/backend/replication/pgoutput/pgoutput.c
index df4ac9a123a..32b74bb4752 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -94,10 +94,6 @@ static List *LoadPublications(List *pubnames);
 static void publication_invalidation_cb(Datum arg, int cacheid,
                                                                                
uint32 hashvalue);
 static void send_relation_and_attrs(Relation relation, TransactionId xid,
-<<<<<<< HEAD
-                                                                       
LogicalDecodingContext *ctx);
-static void update_replication_progress(LogicalDecodingContext *ctx);
-=======
                                                                        
LogicalDecodingContext *ctx,
                                                                        
Bitmapset *columns);
 static void send_repl_origin(LogicalDecodingContext *ctx,
@@ -116,7 +112,6 @@ enum RowFilterPubAction
 };
 
 #define NUM_ROWFILTER_PUBACTIONS (PUBACTION_DELETE+1)
->>>>>>> REL_16_9
 
 /*
  * Entry in the map used to remember which relation schemas we sent.
@@ -611,9 +606,6 @@ static void
 pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
                                        XLogRecPtr commit_lsn)
 {
-<<<<<<< HEAD
-       update_replication_progress(ctx);
-=======
        PGOutputTxnData *txndata = (PGOutputTxnData *) 
txn->output_plugin_private;
        bool            sent_begin_txn;
 
@@ -633,7 +625,6 @@ pgoutput_commit_txn(LogicalDecodingContext *ctx, 
ReorderBufferTXN *txn,
                elog(DEBUG1, "skipped replication of an empty transaction with 
XID: %u", txn->xid);
                return;
        }
->>>>>>> REL_16_9
 
        OutputPluginPrepareWrite(ctx, true);
        logicalrep_write_commit(ctx->out, txn, commit_lsn);
@@ -1444,8 +1435,6 @@ pgoutput_change(LogicalDecodingContext *ctx, 
ReorderBufferTXN *txn,
        TupleTableSlot *old_slot = NULL;
        TupleTableSlot *new_slot = NULL;
 
-       update_replication_progress(ctx);
-
        if (!is_publishable_relation(relation))
                return;
 
@@ -1613,8 +1602,6 @@ pgoutput_truncate(LogicalDecodingContext *ctx, 
ReorderBufferTXN *txn,
        Oid                *relids;
        TransactionId xid = InvalidTransactionId;
 
-       update_replication_progress(ctx);
-
        /* Remember the xid for the change in streaming mode. See 
pgoutput_change. */
        if (in_streaming)
                xid = change->txn->xid;
@@ -1678,8 +1665,6 @@ pgoutput_message(LogicalDecodingContext *ctx, 
ReorderBufferTXN *txn,
        PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
        TransactionId xid = InvalidTransactionId;
 
-       update_replication_progress(ctx);
-
        if (!data->messages)
                return;
 
@@ -1885,11 +1870,7 @@ pgoutput_stream_commit(struct LogicalDecodingContext 
*ctx,
        Assert(!in_streaming);
        Assert(rbtxn_is_streamed(txn));
 
-<<<<<<< HEAD
-       update_replication_progress(ctx);
-=======
        OutputPluginUpdateProgress(ctx, false);
->>>>>>> REL_16_9
 
        OutputPluginPrepareWrite(ctx, true);
        logicalrep_write_stream_commit(ctx->out, txn, commit_lsn);
@@ -2058,10 +2039,7 @@ get_rel_sync_entry(PGOutputData *data, Relation relation)
                int                     publish_ancestor_level = 0;
                bool            am_partition = get_rel_relispartition(relid);
                char            relkind = get_rel_relkind(relid);
-<<<<<<< HEAD
-=======
                List       *rel_publications = NIL;
->>>>>>> REL_16_9
 
                /* Reload publications if needed before use. */
                if (!publications_valid)
@@ -2154,25 +2132,15 @@ get_rel_sync_entry(PGOutputData *data, Relation 
relation)
 
                        /*
                         * Under what relid should we publish changes in this 
publication?
-<<<<<<< HEAD
-                        * We'll use the top-most relid across all 
publications. Also track
-                        * the ancestor level for this publication.
-=======
                         * We'll use the top-most relid across all 
publications. Also
                         * track the ancestor level for this publication.
->>>>>>> REL_16_9
                         */
                        Oid                     pub_relid = relid;
                        int                     ancestor_level = 0;
 
                        /*
-<<<<<<< HEAD
-                        * If this is a FOR ALL TABLES publication, pick the 
partition root
-                        * and set the ancestor level accordingly.
-=======
                         * If this is a FOR ALL TABLES publication, pick the 
partition
                         * root and set the ancestor level accordingly.
->>>>>>> REL_16_9
                         */
                        if (pub->alltables)
                        {
@@ -2201,11 +2169,6 @@ get_rel_sync_entry(PGOutputData *data, Relation relation)
                                        Oid                     ancestor;
                                        int                     level;
                                        List       *ancestors = 
get_partition_ancestors(relid);
-<<<<<<< HEAD
-                                       ListCell   *lc2;
-                                       int                     level = 0;
-=======
->>>>>>> REL_16_9
 
                                        ancestor = 
GetTopMostAncestorInPublication(pub->oid,
                                                                                
                                           ancestors,
@@ -2213,27 +2176,11 @@ get_rel_sync_entry(PGOutputData *data, Relation 
relation)
 
                                        if (ancestor != InvalidOid)
                                        {
-<<<<<<< HEAD
-                                               Oid                     
ancestor = lfirst_oid(lc2);
-
-                                               level++;
-
-                                               if 
(list_member_oid(GetRelationPublications(ancestor),
-                                                                               
        pub->oid))
-                                               {
-                                                       ancestor_published = 
true;
-                                                       if (pub->pubviaroot)
-                                                       {
-                                                               pub_relid = 
ancestor;
-                                                               ancestor_level 
= level;
-                                                       }
-=======
                                                ancestor_published = true;
                                                if (pub->pubviaroot)
                                                {
                                                        pub_relid = ancestor;
                                                        ancestor_level = level;
->>>>>>> REL_16_9
                                                }
                                        }
                                }
@@ -2262,27 +2209,14 @@ get_rel_sync_entry(PGOutputData *data, Relation 
relation)
 
                                /*
                                 * We want to publish the changes as the 
top-most ancestor
-<<<<<<< HEAD
-                                * across all publications. So we need to check 
if the
-                                * already calculated level is higher than the 
new one. If
-                                * yes, we can ignore the new value (as it's a 
child).
-                                * Otherwise the new value is an ancestor, so 
we keep it.
-=======
                                 * across all publications. So we need to check 
if the already
                                 * calculated level is higher than the new one. 
If yes, we can
                                 * ignore the new value (as it's a child). 
Otherwise the new
                                 * value is an ancestor, so we keep it.
->>>>>>> REL_16_9
                                 */
                                if (publish_ancestor_level > ancestor_level)
                                        continue;
 
-<<<<<<< HEAD
-                               /* The new value is an ancestor, so let's keep 
it. */
-                               publish_as_relid = pub_relid;
-                               publish_ancestor_level = ancestor_level;
-                       }
-=======
                                /*
                                 * If we found an ancestor higher up in the 
tree, discard the
                                 * list of publications through which we 
replicate it, and use
@@ -2324,7 +2258,6 @@ get_rel_sync_entry(PGOutputData *data, Relation relation)
 
                        /* Initialize the column list */
                        pgoutput_column_list_init(data, rel_publications, 
entry);
->>>>>>> REL_16_9
                }
 
                list_free(pubids);
@@ -2489,36 +2422,3 @@ send_repl_origin(LogicalDecodingContext *ctx, 
RepOriginId origin_id,
                }
        }
 }
-
-/*
- * Try to update progress and send a keepalive message if too many changes were
- * processed.
- *
- * For a large transaction, if we don't send any change to the downstream for a
- * long time (exceeds the wal_receiver_timeout of standby) then it can timeout.
- * This can happen when all or most of the changes are not published.
- */
-static void
-update_replication_progress(LogicalDecodingContext *ctx)
-{
-       static int      changes_count = 0;
-
-       /*
-        * We don't want to try sending a keepalive message after processing 
each
-        * change as that can have overhead. Tests revealed that there is no
-        * noticeable overhead in doing it after continuously processing 100 or 
so
-        * changes.
-        */
-#define CHANGES_THRESHOLD 100
-
-       /*
-        * If we are at the end of transaction LSN, update progress tracking.
-        * Otherwise, after continuously processing CHANGES_THRESHOLD changes, 
we
-        * try to send a keepalive message if required.
-        */
-       if (ctx->end_xact || ++changes_count >= CHANGES_THRESHOLD)
-       {
-               OutputPluginUpdateProgress(ctx);
-               changes_count = 0;
-       }
-}
diff --git a/src/backend/replication/repl_gram.y 
b/src/backend/replication/repl_gram.y
index af609f5922f..dc826764007 100644
--- a/src/backend/replication/repl_gram.y
+++ b/src/backend/replication/repl_gram.y
@@ -66,15 +66,12 @@ Node *replication_parse_result;
 %token K_DROP_REPLICATION_SLOT
 %token K_TIMELINE_HISTORY
 %token K_WAIT
-<<<<<<< HEAD
 %token K_NOWAIT
 %token K_EXCLUDE
 %token K_MAX_RATE
 %token K_WAL
 %token K_TABLESPACE_MAP
 %token K_NOVERIFY_CHECKSUMS
-=======
->>>>>>> REL_16_9
 %token K_TIMELINE
 %token K_PHYSICAL
 %token K_LOGICAL
@@ -89,15 +86,9 @@ Node *replication_parse_result;
 %type <node>   command
 %type <node>   base_backup start_replication start_logical_replication
                                create_replication_slot drop_replication_slot 
identify_system
-<<<<<<< HEAD
-                               timeline_history show
-%type <list>   base_backup_opt_list
-%type <defelt> base_backup_opt
-=======
                                read_replication_slot timeline_history show
 %type <list>   generic_option_list
 %type <defelt> generic_option
->>>>>>> REL_16_9
 %type <uintval>        opt_timeline
 %type <list>   plugin_options plugin_opt_list
 %type <defelt> plugin_opt_elem
@@ -186,7 +177,6 @@ base_backup:
                                }
                        ;
 
-<<<<<<< HEAD
 base_backup_opt_list:
                        base_backup_opt_list base_backup_opt
                                { $$ = lappend($1, $2); }
@@ -252,8 +242,6 @@ base_backup_opt:
                                }
                        ;
 
-=======
->>>>>>> REL_16_9
 create_replication_slot:
                        /* CREATE_REPLICATION_SLOT slot [TEMPORARY] PHYSICAL 
[options] */
                        K_CREATE_REPLICATION_SLOT IDENT opt_temporary 
K_PHYSICAL create_slot_options
@@ -448,11 +436,6 @@ plugin_opt_arg:
                        | /* EMPTY */                                   { $$ = 
NULL; }
                ;
 
-<<<<<<< HEAD
-%%
-
-#include "repl_scanner.c"
-=======
 generic_option_list:
                        generic_option_list ',' generic_option
                                { $$ = lappend($1, $3); }
@@ -502,4 +485,3 @@ ident_or_keyword:
                ;
 
 %%
->>>>>>> REL_16_9
diff --git a/src/backend/replication/repl_scanner.l 
b/src/backend/replication/repl_scanner.l
index 73c22fa2d67..4e2edaba095 100644
--- a/src/backend/replication/repl_scanner.l
+++ b/src/backend/replication/repl_scanner.l
@@ -121,7 +121,6 @@ BASE_BACKUP                 { return K_BASE_BACKUP; }
 IDENTIFY_SYSTEM                { return K_IDENTIFY_SYSTEM; }
 READ_REPLICATION_SLOT  { return K_READ_REPLICATION_SLOT; }
 SHOW           { return K_SHOW; }
-<<<<<<< HEAD
 LABEL                  { return K_LABEL; }
 NOWAIT                 { return K_NOWAIT; }
 EXCLUDE                        { return K_EXCLUDE; }
@@ -130,8 +129,6 @@ MAX_RATE            { return K_MAX_RATE; }
 WAL                    { return K_WAL; }
 TABLESPACE_MAP                 { return K_TABLESPACE_MAP; }
 NOVERIFY_CHECKSUMS     { return K_NOVERIFY_CHECKSUMS; }
-=======
->>>>>>> REL_16_9
 TIMELINE                       { return K_TIMELINE; }
 START_REPLICATION      { return K_START_REPLICATION; }
 CREATE_REPLICATION_SLOT                { return K_CREATE_REPLICATION_SLOT; }
@@ -216,11 +213,7 @@ WAIT                               { return K_WAIT; }
                                        return yytext[0];
                                }
 
-<<<<<<< HEAD
-<xq,xd><<EOF>> { yyerror("unterminated quoted string"); }
-=======
 <xq,xd><<EOF>> { replication_yyerror("unterminated quoted string"); }
->>>>>>> REL_16_9
 
 
 <<EOF>>                        {
@@ -316,10 +309,7 @@ replication_scanner_is_replication_command(void)
                case K_START_REPLICATION:
                case K_CREATE_REPLICATION_SLOT:
                case K_DROP_REPLICATION_SLOT:
-<<<<<<< HEAD
-=======
                case K_READ_REPLICATION_SLOT:
->>>>>>> REL_16_9
                case K_TIMELINE_HISTORY:
                case K_SHOW:
                        /* Yes; push back the first token so we can parse 
later. */
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index 601c944b731..66659343be8 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -873,12 +873,7 @@ ReplicationSlotsComputeRequiredXmin(bool already_locked)
                SpinLockAcquire(&s->mutex);
                effective_xmin = s->effective_xmin;
                effective_catalog_xmin = s->effective_catalog_xmin;
-<<<<<<< HEAD
-               invalidated = (!XLogRecPtrIsInvalid(s->data.invalidated_at) &&
-                                          
XLogRecPtrIsInvalid(s->data.restart_lsn));
-=======
                invalidated = s->data.invalidated != RS_INVAL_NONE;
->>>>>>> REL_16_9
                SpinLockRelease(&s->mutex);
 
                /* invalidated slots need not apply */
@@ -1335,14 +1330,10 @@ ReportSlotInvalidation(ReplicationSlotInvalidationCause 
cause,
  * for syscalls, so caller must restart if we return true.
  */
 static bool
-<<<<<<< HEAD
-InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, XLogRecPtr oldestLSN,
-=======
 InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause,
                                                           ReplicationSlot *s,
                                                           XLogRecPtr oldestLSN,
                                                           Oid dboid, 
TransactionId snapshotConflictHorizon,
->>>>>>> REL_16_9
                                                           bool *invalidated)
 {
        int                     last_signaled_pid = 0;
@@ -1458,10 +1449,6 @@ 
InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause,
                {
                        MyReplicationSlot = s;
                        s->active_pid = MyProcPid;
-<<<<<<< HEAD
-                       s->data.invalidated_at = restart_lsn;
-                       s->data.restart_lsn = InvalidXLogRecPtr;
-=======
                        s->data.invalidated = conflict;
 
                        /*
@@ -1470,7 +1457,6 @@ 
InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause,
                         */
                        if (conflict == RS_INVAL_WAL_REMOVED)
                                s->data.restart_lsn = InvalidXLogRecPtr;
->>>>>>> REL_16_9
 
                        /* Let caller know */
                        *invalidated = true;
@@ -1582,12 +1568,6 @@ 
InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause,
  * NB - this runs as part of checkpoint, so avoid raising errors if possible.
  */
 bool
-<<<<<<< HEAD
-InvalidateObsoleteReplicationSlots(XLogSegNo oldestSegno)
-{
-       XLogRecPtr      oldestLSN;
-       bool            invalidated = false;
-=======
 InvalidateObsoleteReplicationSlots(ReplicationSlotInvalidationCause cause,
                                                                   XLogSegNo 
oldestSegno, Oid dboid,
                                                                   
TransactionId snapshotConflictHorizon)
@@ -1601,7 +1581,6 @@ 
InvalidateObsoleteReplicationSlots(ReplicationSlotInvalidationCause cause,
 
        if (max_replication_slots == 0)
                return invalidated;
->>>>>>> REL_16_9
 
        XLogSegNoOffsetToRecPtr(oldestSegno, 0, wal_segment_size, oldestLSN);
 
@@ -1614,13 +1593,9 @@ restart:
                if (!s->in_use)
                        continue;
 
-<<<<<<< HEAD
-               if (InvalidatePossiblyObsoleteSlot(s, oldestLSN, &invalidated))
-=======
                if (InvalidatePossiblyObsoleteSlot(cause, s, oldestLSN, dboid,
                                                                                
   snapshotConflictHorizon,
                                                                                
   &invalidated))
->>>>>>> REL_16_9
                {
                        /* if the lock was released, start from scratch */
                        goto restart;
@@ -1633,12 +1608,9 @@ restart:
         */
        if (invalidated)
        {
-<<<<<<< HEAD
                /* GPDB: Set WalSndCtl state to indicate persistent sync error 
state */
                WalSndCtl->error = WALSNDERROR_WALREAD;
 
-=======
->>>>>>> REL_16_9
                ReplicationSlotsComputeRequiredXmin(false);
                ReplicationSlotsComputeRequiredLSN();
        }
diff --git a/src/backend/replication/slotfuncs.c 
b/src/backend/replication/slotfuncs.c
index 576a2eeadee..e01adb00795 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -26,7 +26,6 @@
 #include "utils/pg_lsn.h"
 #include "utils/resowner.h"
 
-<<<<<<< HEAD
 static void
 check_permissions(void)
 {
@@ -43,8 +42,6 @@ warn_slot_only_created_on_segment(const char *name) {
                         errhint("Creating replication slots on a single 
segment is not advised.  Replication slots are automatically created by 
management tools.")));
 }
 
-=======
->>>>>>> REL_16_9
 /*
  * Helper function for creating a new physical replication slot with
  * given arguments. Note that this function doesn't release the created
diff --git a/src/backend/replication/syncrep.c 
b/src/backend/replication/syncrep.c
index 51a7e17a2c3..4b0660e54ce 100644
--- a/src/backend/replication/syncrep.c
+++ b/src/backend/replication/syncrep.c
@@ -222,12 +222,7 @@ SyncRepWaitForLSN(XLogRecPtr lsn, bool commit)
         * don't touch the queue.
         */
        if (!SyncRepRequested() ||
-<<<<<<< HEAD
                (!IS_QUERY_DISPATCHER() && !((volatile WalSndCtlData *) 
WalSndCtl)->sync_standbys_defined))
-=======
-               ((((volatile WalSndCtlData *) WalSndCtl)->sync_standbys_status) 
&
-                (SYNC_STANDBY_INIT | SYNC_STANDBY_DEFINED)) == 
SYNC_STANDBY_INIT)
->>>>>>> REL_16_9
                return;
 
        /* Cap the level for anything other than commit to remote flush only. */
@@ -243,7 +238,6 @@ SyncRepWaitForLSN(XLogRecPtr lsn, bool commit)
        Assert(MyProc->syncRepState == SYNC_REP_NOT_WAITING);
 
        /*
-<<<<<<< HEAD
         * GPDB special behavior: if the master/coordinator doesn't configure a 
standby,
         * or the standby is down, or the connection between the 
master/coordinator and standby
         * is broken, the xlog will not be synchronized to the standby before 
the key
@@ -330,12 +324,8 @@ SyncRepWaitForLSN(XLogRecPtr lsn, bool commit)
        }
 
        /*
-        * We don't wait for sync rep if WalSndCtl->sync_standbys_defined is not
-        * set.  See SyncRepUpdateSyncStandbysDefined.
-=======
         * We don't wait for sync rep if SYNC_STANDBY_DEFINED is not set.  See
         * SyncRepUpdateSyncStandbysDefined().
->>>>>>> REL_16_9
         *
         * Also check that the standby hasn't already replied. Unlikely race
         * condition but we'll be fetching that cache line anyway so it's likely
@@ -345,7 +335,6 @@ SyncRepWaitForLSN(XLogRecPtr lsn, bool commit)
         * (SYNC_STANDBY_INIT is not set), fall back to a check based on the 
LSN,
         * then do a direct GUC check.
         */
-<<<<<<< HEAD
        if (((!IS_QUERY_DISPATCHER()) && !WalSndCtl->sync_standbys_defined) ||
                lsn <= WalSndCtl->lsn[mode])
        {
@@ -356,43 +345,7 @@ SyncRepWaitForLSN(XLogRecPtr lsn, bool commit)
                           (uint32) (WalSndCtl->lsn[mode] >> 32), (uint32) 
WalSndCtl->lsn[mode],
                           (uint32) (lsn >> 32), (uint32) lsn);
 
-=======
-       if (WalSndCtl->sync_standbys_status & SYNC_STANDBY_INIT)
-       {
-               if ((WalSndCtl->sync_standbys_status & SYNC_STANDBY_DEFINED) == 
0 ||
-                       lsn <= WalSndCtl->lsn[mode])
-               {
-                       LWLockRelease(SyncRepLock);
-                       return;
-               }
-       }
-       else if (lsn <= WalSndCtl->lsn[mode])
-       {
-               /*
-                * The LSN is older than what we need to wait for.  The sync 
standby
-                * data has not been initialized yet, but we are OK to not wait
-                * because we know that there is no point in doing so based on 
the
-                * LSN.
-                */
-               LWLockRelease(SyncRepLock);
-               return;
-       }
-       else if (!SyncStandbysDefined())
-       {
-               /*
-                * If we are here, the sync standby data has not been 
initialized yet,
-                * and the LSN is newer than what need to wait for, so we have 
fallen
-                * back to the best thing we could do in this case: a check on
-                * SyncStandbysDefined() to see if the GUC is set or not.
-                *
-                * When the GUC has a value, we wait until the checkpointer 
updates
-                * the status data because we cannot be sure yet if we should 
wait or
-                * not. Here, the GUC has *no* value, we are sure that there is 
no
-                * point to wait; this matters for example when initializing a
-                * cluster, where we should never wait, and no sync standbys is 
the
-                * default behavior.
-                */
->>>>>>> REL_16_9
+
                LWLockRelease(SyncRepLock);
                return;
        }
@@ -415,22 +368,8 @@ SyncRepWaitForLSN(XLogRecPtr lsn, bool commit)
        {
                char            buffer[32];
 
-<<<<<<< HEAD
-               old_status = get_real_act_ps_display(&len);
-               /*
-                * The 32 represents the bytes in the string " waiting for 
%X/%X", as
-                * in upstream.  The 12 represents GPDB specific " replication" 
suffix.
-                */
-               new_status = (char *) palloc(len + 32 + 12 + 1);
-               memcpy(new_status, old_status, len);
-               sprintf(new_status + len, " waiting for %X/%X",
-                               LSN_FORMAT_ARGS(lsn));
-               set_ps_display(new_status);
-               new_status[len] = '\0'; /* truncate off " waiting ..." */
-=======
                sprintf(buffer, "waiting for %X/%X", LSN_FORMAT_ARGS(lsn));
                set_ps_display_suffix(buffer);
->>>>>>> REL_16_9
        }
 
        /* Report the wait */
@@ -551,20 +490,11 @@ SyncRepWaitForLSN(XLogRecPtr lsn, bool commit)
        MyProc->syncRepState = SYNC_REP_NOT_WAITING;
        MyProc->waitLSN = 0;
 
-<<<<<<< HEAD
        pgstat_report_wait_end();
 
-       if (new_status)
-       {
-               /* Reset ps display */
-               set_ps_display(new_status);
-               pfree(new_status);
-       }
-=======
        /* reset ps display to remove the suffix */
        if (update_process_title)
                set_ps_display_remove_suffix();
->>>>>>> REL_16_9
 }
 
 /*
diff --git a/src/backend/replication/walreceiver.c 
b/src/backend/replication/walreceiver.c
index 2444d79cd7a..317b0312caa 100644
--- a/src/backend/replication/walreceiver.c
+++ b/src/backend/replication/walreceiver.c
@@ -140,19 +140,12 @@ static StringInfoData incoming_message;
 static void WalRcvFetchTimeLineHistoryFiles(TimeLineID first, TimeLineID last);
 static void WalRcvWaitForStartPosition(XLogRecPtr *startpoint, TimeLineID 
*startpointTLI);
 static void WalRcvDie(int code, Datum arg);
-<<<<<<< HEAD
-static void XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len);
-static void XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr);
-static void XLogWalRcvFlush(bool dying);
-static void XLogWalRcvClose(XLogRecPtr recptr);
-=======
 static void XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len,
                                                                 TimeLineID 
tli);
 static void XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr,
                                                        TimeLineID tli);
 static void XLogWalRcvFlush(bool dying, TimeLineID tli);
 static void XLogWalRcvClose(XLogRecPtr recptr, TimeLineID tli);
->>>>>>> REL_16_9
 static void XLogWalRcvSendReply(bool force, bool requestReply);
 static void XLogWalRcvSendHSFeedback(bool immed);
 static void ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime);
@@ -929,18 +922,6 @@ XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr, 
TimeLineID tli)
 
                /* Close the current segment if it's completed */
                if (recvFile >= 0 && !XLByteInSeg(recptr, recvSegNo, 
wal_segment_size))
-<<<<<<< HEAD
-                       XLogWalRcvClose(recptr);
-
-               if (recvFile < 0)
-               {
-                       bool            use_existent = true;
-
-                       /* Create/use new log file */
-                       XLByteToSeg(recptr, recvSegNo, wal_segment_size);
-                       recvFile = XLogFileInit(recvSegNo, &use_existent, true);
-                       recvFileTLI = ThisTimeLineID;
-=======
                        XLogWalRcvClose(recptr, tli);
 
                if (recvFile < 0)
@@ -949,7 +930,6 @@ XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr, 
TimeLineID tli)
                        XLByteToSeg(recptr, recvSegNo, wal_segment_size);
                        recvFile = XLogFileInit(recvSegNo, tli);
                        recvFileTLI = tli;
->>>>>>> REL_16_9
                }
 
                /* Calculate the start offset of the received logs */
@@ -1011,11 +991,7 @@ XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr 
recptr, TimeLineID tli)
         * segment is received and written.
         */
        if (recvFile >= 0 && !XLByteInSeg(recptr, recvSegNo, wal_segment_size))
-<<<<<<< HEAD
-               XLogWalRcvClose(recptr);
-=======
                XLogWalRcvClose(recptr, tli);
->>>>>>> REL_16_9
 }
 
 /*
@@ -1086,29 +1062,18 @@ XLogWalRcvFlush(bool dying, TimeLineID tli)
  * Create an archive notification file since the segment is known completed.
  */
 static void
-<<<<<<< HEAD
-XLogWalRcvClose(XLogRecPtr recptr)
-=======
 XLogWalRcvClose(XLogRecPtr recptr, TimeLineID tli)
->>>>>>> REL_16_9
 {
        char            xlogfname[MAXFNAMELEN];
 
        Assert(recvFile >= 0 && !XLByteInSeg(recptr, recvSegNo, 
wal_segment_size));
-<<<<<<< HEAD
-=======
        Assert(tli != 0);
->>>>>>> REL_16_9
 
        /*
         * fsync() and close current file before we switch to next one. We would
         * otherwise have to reopen this file to fsync it later
         */
-<<<<<<< HEAD
-       XLogWalRcvFlush(false);
-=======
        XLogWalRcvFlush(false, tli);
->>>>>>> REL_16_9
 
        XLogFileName(xlogfname, recvFileTLI, recvSegNo, wal_segment_size);
 
@@ -1120,11 +1085,7 @@ XLogWalRcvClose(XLogRecPtr recptr, TimeLineID tli)
        if (close(recvFile) != 0)
                ereport(PANIC,
                                (errcode_for_file_access(),
-<<<<<<< HEAD
-                                errmsg("could not close log segment %s: %m",
-=======
                                 errmsg("could not close WAL segment %s: %m",
->>>>>>> REL_16_9
                                                xlogfname)));
 
        /*
diff --git a/src/backend/replication/walsender.c 
b/src/backend/replication/walsender.c
index cb2c065a4a7..e3370266438 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -98,14 +98,12 @@
 #include "utils/timeout.h"
 #include "utils/timestamp.h"
 
-<<<<<<< HEAD
 #include "cdb/cdbvars.h"
 #include "replication/gp_replication.h"
 #include "utils/faultinjector.h"
-=======
+
 /* Minimum interval used by walsender for stats flushes, in ms */
 #define WALSENDER_STATS_FLUSH_INTERVAL         1000
->>>>>>> REL_16_9
 
 /*
  * Maximum data payload in a WAL data message.  Must be >= XLOG_BLCKSZ.
@@ -130,14 +128,9 @@ bool               am_cascading_walsender = false; /* Am I 
cascading WAL to another
                                                                                
         * standby? */
 bool           am_db_walsender = false;        /* Connected to a database? */
 
-<<<<<<< HEAD
 /* User-settable parameters for walsender */
-int                    repl_catchup_within_range = 0;
-int                    max_wal_senders = 0;    /* the maximum number of 
concurrent
-=======
-/* GUC variables */
+int                    repl_catchup_within_range = 0*/
 int                    max_wal_senders = 10;   /* the maximum number of 
concurrent
->>>>>>> REL_16_9
                                                                         * 
walsenders */
 int                    wal_sender_timeout = 60 * 1000; /* maximum time to send 
one WAL
                                                                                
         * data message */
@@ -262,14 +255,9 @@ static void ProcessStandbyMessage(void);
 static void ProcessStandbyReplyMessage(void);
 static void ProcessStandbyHSFeedbackMessage(void);
 static void ProcessRepliesIfAny(void);
-<<<<<<< HEAD
 static const char *WalSndGetStateString(WalSndState state);
 static void ProcessPendingWrites(void);
-static void WalSndKeepalive(bool requestReply);
-=======
-static void ProcessPendingWrites(void);
 static void WalSndKeepalive(bool requestReply, XLogRecPtr writePtr);
->>>>>>> REL_16_9
 static void WalSndKeepaliveIfNecessary(void);
 static void WalSndCheckTimeOut(void);
 static long WalSndComputeSleeptime(TimestampTz now);
@@ -1518,10 +1506,7 @@ WalSndUpdateProgress(LogicalDecodingContext *ctx, 
XLogRecPtr lsn, TransactionId
 {
        static TimestampTz sendTime = 0;
        TimestampTz now = GetCurrentTimestamp();
-<<<<<<< HEAD
-=======
        bool            pending_writes = false;
->>>>>>> REL_16_9
        bool            end_xact = ctx->end_xact;
 
        /*
@@ -1541,17 +1526,6 @@ WalSndUpdateProgress(LogicalDecodingContext *ctx, 
XLogRecPtr lsn, TransactionId
        }
 
        /*
-<<<<<<< HEAD
-        * Try to send a keepalive if required. We don't need to try sending 
keep
-        * alive messages at the transaction end as that will be done at a later
-        * point in time. This is required only for large transactions where we
-        * don't send any changes to the downstream and the receiver can timeout
-        * due to that.
-        */
-       if (!end_xact &&
-               now >= TimestampTzPlusMilliseconds(last_reply_timestamp,
-                                                                               
   wal_sender_timeout / 2))
-=======
         * When skipping empty transactions in synchronous replication, we send 
a
         * keepalive message to avoid delaying such transactions.
         *
@@ -1584,7 +1558,6 @@ WalSndUpdateProgress(LogicalDecodingContext *ctx, 
XLogRecPtr lsn, TransactionId
        if (pending_writes || (!end_xact &&
                                                   now >= 
TimestampTzPlusMilliseconds(last_reply_timestamp,
                                                                                
                                          wal_sender_timeout / 2)))
->>>>>>> REL_16_9
                ProcessPendingWrites();
 }
 
@@ -2744,11 +2717,9 @@ InitWalSenderSlot(void)
                        walsnd->sync_standby_priority = 0;
                        walsnd->latch = &MyProc->procLatch;
                        walsnd->replyTime = 0;
-<<<<<<< HEAD
                        /* Will be decided in hand-shake */
                        walsnd->xlogCleanUpTo = InvalidXLogRecPtr;
                        walsnd->caughtup_within_range = false;
-=======
 
                        /*
                         * The kind assignment is done here and not in 
StartReplication()
@@ -2766,7 +2737,6 @@ InitWalSenderSlot(void)
                        else
                                walsnd->kind = REPLICATION_KIND_LOGICAL;
 
->>>>>>> REL_16_9
                        SpinLockRelease(&walsnd->mutex);
                        /* don't need the lock anymore */
                        MyWalSnd = (WalSnd *) walsnd;
diff --git a/src/include/storage/buffile.h b/src/include/storage/buffile.h
index 8309ab247b1..1accb0617cc 100644
--- a/src/include/storage/buffile.h
+++ b/src/include/storage/buffile.h
@@ -62,7 +62,7 @@ extern BufFile *BufFileOpenShared(SharedFileSet *fileset, 
const char *name,
 extern void BufFileDeleteShared(SharedFileSet *fileset, const char *name);
 extern void BufFileTruncateShared(BufFile *file, int fileno, off_t offset);
 
-extern BufFile *BufFileCreateFileSet(FileSet *fileset, const char *name);
+extern BufFile *BufFileCreateFileSet(FileSet *fileset, const char *name, 
workfile_set *work_set);
 extern void BufFileExportFileSet(BufFile *file);
 extern BufFile *BufFileOpenFileSet(FileSet *fileset, const char *name,
                                                                   int mode, 
bool missing_ok);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to