Hi,

On 2023-02-08 10:30:37 -0800, Andres Freund wrote:
> On 2023-02-08 10:18:41 -0800, Andres Freund wrote:
> > I don't think the syncrep logic in WalSndUpdateProgress really works as-is -
> > consider what happens if e.g. the origin filter filters out entire
> > transactions. We'll afaics never get to WalSndUpdateProgress(). In some 
> > cases
> > we'll be lucky because we'll return quickly to XLogSendLogical(), but not
> > reliably.
>
> Is it actually the right thing to check SyncRepRequested() in that logic? It's
> quite common to set up syncrep so that individual users or transactions opt
> into syncrep, but to leave the default disabled.
>
> I don't really see an alternative to making this depend solely on
> sync_standbys_defined.

Hacking on a rough prototype how I think this should rather look, I had a few
questions / remarks:

- We probably need to call UpdateProgress from a bunch of places in decode.c
  as well? Indicating that we're lagging by a lot, just because all
  transactions were in another database seems decidedly suboptimal.

- Why should lag tracking only be updated at commit like points? That seems
  like it adds odd discontinuinities?

- The mix of skipped_xact and ctx->end_xact in WalSndUpdateProgress() seems
  somewhat odd. They have very overlapping meanings IMO.

- there's no UpdateProgress calls in pgoutput_stream_abort(), but ISTM there
  should be? It's legit progress.

- That's from 6912acc04f0: I find LagTrackerRead(), LagTrackerWrite() quite
  confusing, naming-wise. IIUC "reading" is about receiving confirmation
  messages, "writing" about the time the record was generated.  ISTM that the
  current time is a quite poor approximation in XLogSendPhysical(), but pretty
  much meaningless in WalSndUpdateProgress()? Am I missing something?

- Aren't the wal_sender_timeout / 2 checks in WalSndUpdateProgress(),
  WalSndWriteData() missing wal_sender_timeout <= 0 checks?

- I don't really understand why f95d53edged55 added !end_xact to the if
  condition for ProcessPendingWrites(). Is the theory that we'll end up in an
  outer loop soon?


Attached is a current, quite rough, prototype. It addresses some of the points
raised, but far from all. There's also several XXXs/FIXMEs in it.  I changed
the file-ending to .txt to avoid hijacking the CF entry.

Greetings,

Andres Freund
>From 1d89c84b1465b28ddef8c110500c3744477488df Mon Sep 17 00:00:00 2001
From: Andres Freund <and...@anarazel.de>
Date: Wed, 8 Feb 2023 11:57:37 -0800
Subject: [PATCH v1] WIP: Initial sketch of progress update rework

---
 src/include/replication/logical.h             |   7 +-
 src/include/replication/output_plugin.h       |   1 -
 src/include/replication/reorderbuffer.h       |  12 --
 src/backend/replication/logical/logical.c     | 188 ++++++++++--------
 .../replication/logical/reorderbuffer.c       |  20 --
 src/backend/replication/pgoutput/pgoutput.c   |  10 -
 src/backend/replication/walsender.c           |  14 +-
 7 files changed, 116 insertions(+), 136 deletions(-)

diff --git a/src/include/replication/logical.h 
b/src/include/replication/logical.h
index 5f49554ea05..472f2a5b84c 100644
--- a/src/include/replication/logical.h
+++ b/src/include/replication/logical.h
@@ -27,8 +27,8 @@ typedef LogicalOutputPluginWriterWrite 
LogicalOutputPluginWriterPrepareWrite;
 typedef void (*LogicalOutputPluginWriterUpdateProgress) (struct 
LogicalDecodingContext *lr,
                                                                                
                                 XLogRecPtr Ptr,
                                                                                
                                 TransactionId xid,
-                                                                               
                                 bool skipped_xact
-);
+                                                                               
                                 bool did_write,
+                                                                               
                                 bool finished_xact);
 
 typedef struct LogicalDecodingContext
 {
@@ -105,10 +105,9 @@ typedef struct LogicalDecodingContext
         */
        bool            accept_writes;
        bool            prepared_write;
+       bool            did_write;
        XLogRecPtr      write_location;
        TransactionId write_xid;
-       /* Are we processing the end LSN of a transaction? */
-       bool            end_xact;
 } LogicalDecodingContext;
 
 
diff --git a/src/include/replication/output_plugin.h 
b/src/include/replication/output_plugin.h
index 2d89d26586e..b9358e15444 100644
--- a/src/include/replication/output_plugin.h
+++ b/src/include/replication/output_plugin.h
@@ -245,6 +245,5 @@ typedef struct OutputPluginCallbacks
 /* Functions in replication/logical/logical.c */
 extern void OutputPluginPrepareWrite(struct LogicalDecodingContext *ctx, bool 
last_write);
 extern void OutputPluginWrite(struct LogicalDecodingContext *ctx, bool 
last_write);
-extern void OutputPluginUpdateProgress(struct LogicalDecodingContext *ctx, 
bool skipped_xact);
 
 #endif                                                 /* OUTPUT_PLUGIN_H */
diff --git a/src/include/replication/reorderbuffer.h 
b/src/include/replication/reorderbuffer.h
index 215d1494e90..e5db041df18 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -526,12 +526,6 @@ typedef void (*ReorderBufferStreamTruncateCB) (
                                                                                
           Relation relations[],
                                                                                
           ReorderBufferChange *change);
 
-/* update progress txn callback signature */
-typedef void (*ReorderBufferUpdateProgressTxnCB) (
-                                                                               
                  ReorderBuffer *rb,
-                                                                               
                  ReorderBufferTXN *txn,
-                                                                               
                  XLogRecPtr lsn);
-
 struct ReorderBuffer
 {
        /*
@@ -595,12 +589,6 @@ struct ReorderBuffer
        ReorderBufferStreamMessageCB stream_message;
        ReorderBufferStreamTruncateCB stream_truncate;
 
-       /*
-        * Callback to be called when updating progress during sending data of a
-        * transaction (and its subtransactions) to the output plugin.
-        */
-       ReorderBufferUpdateProgressTxnCB update_progress_txn;
-
        /*
         * Pointer that will be passed untouched to the callbacks.
         */
diff --git a/src/backend/replication/logical/logical.c 
b/src/backend/replication/logical/logical.c
index c3ec97a0a62..92eae378d98 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -93,10 +93,9 @@ static void stream_message_cb_wrapper(ReorderBuffer *cache, 
ReorderBufferTXN *tx
 static void stream_truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN 
*txn,
                                                                           int 
nrelations, Relation relations[], ReorderBufferChange *change);
 
-/* callback to update txn's progress */
-static void update_progress_txn_cb_wrapper(ReorderBuffer *cache,
-                                                                               
   ReorderBufferTXN *txn,
-                                                                               
   XLogRecPtr lsn);
+
+static void update_progress(struct LogicalDecodingContext *ctx,
+                                                       bool finished_xact);
 
 static void LoadOutputPlugin(OutputPluginCallbacks *callbacks, const char 
*plugin);
 
@@ -283,12 +282,6 @@ StartupDecodingContext(List *output_plugin_options,
        ctx->reorder->commit_prepared = commit_prepared_cb_wrapper;
        ctx->reorder->rollback_prepared = rollback_prepared_cb_wrapper;
 
-       /*
-        * Callback to support updating progress during sending data of a
-        * transaction (and its subtransactions) to the output plugin.
-        */
-       ctx->reorder->update_progress_txn = update_progress_txn_cb_wrapper;
-
        ctx->out = makeStringInfo();
        ctx->prepare_write = prepare_write;
        ctx->write = do_write;
@@ -661,6 +654,7 @@ FreeDecodingContext(LogicalDecodingContext *ctx)
 void
 OutputPluginPrepareWrite(struct LogicalDecodingContext *ctx, bool last_write)
 {
+       /* FIXME: streaming and 2PC support made this message inaccurate */
        if (!ctx->accept_writes)
                elog(ERROR, "writes are only accepted in commit, begin and 
change callbacks");
 
@@ -679,20 +673,7 @@ OutputPluginWrite(struct LogicalDecodingContext *ctx, bool 
last_write)
 
        ctx->write(ctx, ctx->write_location, ctx->write_xid, last_write);
        ctx->prepared_write = false;
-}
-
-/*
- * Update progress tracking (if supported).
- */
-void
-OutputPluginUpdateProgress(struct LogicalDecodingContext *ctx,
-                                                  bool skipped_xact)
-{
-       if (!ctx->update_progress)
-               return;
-
-       ctx->update_progress(ctx, ctx->write_location, ctx->write_xid,
-                                                skipped_xact);
+       ctx->did_write = true;
 }
 
 /*
@@ -759,7 +740,6 @@ startup_cb_wrapper(LogicalDecodingContext *ctx, 
OutputPluginOptions *opt, bool i
 
        /* set output state */
        ctx->accept_writes = false;
-       ctx->end_xact = false;
 
        /* do the actual work: call callback */
        ctx->callbacks.startup_cb(ctx, opt, is_init);
@@ -787,7 +767,6 @@ shutdown_cb_wrapper(LogicalDecodingContext *ctx)
 
        /* set output state */
        ctx->accept_writes = false;
-       ctx->end_xact = false;
 
        /* do the actual work: call callback */
        ctx->callbacks.shutdown_cb(ctx);
@@ -823,7 +802,7 @@ begin_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN 
*txn)
        ctx->accept_writes = true;
        ctx->write_xid = txn->xid;
        ctx->write_location = txn->first_lsn;
-       ctx->end_xact = false;
+       ctx->did_write = false;
 
        /* do the actual work: call callback */
        ctx->callbacks.begin_cb(ctx, txn);
@@ -855,13 +834,15 @@ commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN 
*txn,
        ctx->accept_writes = true;
        ctx->write_xid = txn->xid;
        ctx->write_location = txn->end_lsn; /* points to the end of the record 
*/
-       ctx->end_xact = true;
+       ctx->did_write = false;
 
        /* do the actual work: call callback */
        ctx->callbacks.commit_cb(ctx, txn, commit_lsn);
 
        /* Pop the error context stack */
        error_context_stack = errcallback.previous;
+
+       update_progress(ctx, true);
 }
 
 /*
@@ -896,7 +877,7 @@ begin_prepare_cb_wrapper(ReorderBuffer *cache, 
ReorderBufferTXN *txn)
        ctx->accept_writes = true;
        ctx->write_xid = txn->xid;
        ctx->write_location = txn->first_lsn;
-       ctx->end_xact = false;
+       ctx->did_write = false;
 
        /*
         * If the plugin supports two-phase commits then begin prepare callback 
is
@@ -913,6 +894,9 @@ begin_prepare_cb_wrapper(ReorderBuffer *cache, 
ReorderBufferTXN *txn)
 
        /* Pop the error context stack */
        error_context_stack = errcallback.previous;
+
+       /* XXX: arguably this does end a transaction */
+       update_progress(ctx, false);
 }
 
 static void
@@ -941,7 +925,7 @@ prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN 
*txn,
        ctx->accept_writes = true;
        ctx->write_xid = txn->xid;
        ctx->write_location = txn->end_lsn; /* points to the end of the record 
*/
-       ctx->end_xact = true;
+       ctx->did_write = false;
 
        /*
         * If the plugin supports two-phase commits then prepare callback is
@@ -958,6 +942,8 @@ prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN 
*txn,
 
        /* Pop the error context stack */
        error_context_stack = errcallback.previous;
+
+       update_progress(ctx, true);
 }
 
 static void
@@ -986,7 +972,7 @@ commit_prepared_cb_wrapper(ReorderBuffer *cache, 
ReorderBufferTXN *txn,
        ctx->accept_writes = true;
        ctx->write_xid = txn->xid;
        ctx->write_location = txn->end_lsn; /* points to the end of the record 
*/
-       ctx->end_xact = true;
+       ctx->did_write = false;
 
        /*
         * If the plugin support two-phase commits then commit prepared callback
@@ -1003,6 +989,8 @@ commit_prepared_cb_wrapper(ReorderBuffer *cache, 
ReorderBufferTXN *txn,
 
        /* Pop the error context stack */
        error_context_stack = errcallback.previous;
+
+       update_progress(ctx, true);
 }
 
 static void
@@ -1032,11 +1020,13 @@ rollback_prepared_cb_wrapper(ReorderBuffer *cache, 
ReorderBufferTXN *txn,
        ctx->accept_writes = true;
        ctx->write_xid = txn->xid;
        ctx->write_location = txn->end_lsn; /* points to the end of the record 
*/
-       ctx->end_xact = true;
+       ctx->did_write = false;
 
        /*
         * If the plugin support two-phase commits then rollback prepared 
callback
         * is mandatory
+        *
+        * FIXME: This should have been caught much earlier.
         */
        if (ctx->callbacks.rollback_prepared_cb == NULL)
                ereport(ERROR,
@@ -1050,6 +1040,8 @@ rollback_prepared_cb_wrapper(ReorderBuffer *cache, 
ReorderBufferTXN *txn,
 
        /* Pop the error context stack */
        error_context_stack = errcallback.previous;
+
+       update_progress(ctx, true);
 }
 
 static void
@@ -1074,6 +1066,7 @@ change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN 
*txn,
        /* set output state */
        ctx->accept_writes = true;
        ctx->write_xid = txn->xid;
+       ctx->did_write = false;
 
        /*
         * Report this change's lsn so replies from clients can give an 
up-to-date
@@ -1083,12 +1076,29 @@ change_cb_wrapper(ReorderBuffer *cache, 
ReorderBufferTXN *txn,
         */
        ctx->write_location = change->lsn;
 
-       ctx->end_xact = false;
 
        ctx->callbacks.change_cb(ctx, txn, relation, change);
 
        /* Pop the error context stack */
        error_context_stack = errcallback.previous;
+
+       /*
+        * It is possible that the data is not sent to downstream for a long 
time
+        * either because the output plugin filtered it or there is a DDL that
+        * generates a lot of data that is not processed by the plugin. So, in
+        * such cases, the downstream can timeout. To avoid that we try to send 
a
+        * keepalive message if required.  Trying to send a keepalive message
+        * after every change has some overhead, but testing showed there is no
+        * noticeable overhead if we do it after every ~100 changes.
+        *
+        * FIXME: Need to count the number of changes, or come up with some 
other
+        * metric.
+        */
+#define CHANGES_THRESHOLD 100
+       if (!ctx->did_write)
+       {
+               update_progress(ctx, false);
+       }
 }
 
 static void
@@ -1102,7 +1112,7 @@ truncate_cb_wrapper(ReorderBuffer *cache, 
ReorderBufferTXN *txn,
        Assert(!ctx->fast_forward);
 
        if (!ctx->callbacks.truncate_cb)
-               return;
+               goto out;
 
        /* Push callback + info on the error context stack */
        state.ctx = ctx;
@@ -1116,6 +1126,7 @@ truncate_cb_wrapper(ReorderBuffer *cache, 
ReorderBufferTXN *txn,
        /* set output state */
        ctx->accept_writes = true;
        ctx->write_xid = txn->xid;
+       ctx->did_write = false;
 
        /*
         * Report this change's lsn so replies from clients can give an 
up-to-date
@@ -1125,12 +1136,14 @@ truncate_cb_wrapper(ReorderBuffer *cache, 
ReorderBufferTXN *txn,
         */
        ctx->write_location = change->lsn;
 
-       ctx->end_xact = false;
 
        ctx->callbacks.truncate_cb(ctx, txn, nrelations, relations, change);
 
        /* Pop the error context stack */
        error_context_stack = errcallback.previous;
+
+out:
+       update_progress(ctx, false);
 }
 
 bool
@@ -1154,7 +1167,6 @@ filter_prepare_cb_wrapper(LogicalDecodingContext *ctx, 
TransactionId xid,
 
        /* set output state */
        ctx->accept_writes = false;
-       ctx->end_xact = false;
 
        /* do the actual work: call callback */
        ret = ctx->callbacks.filter_prepare_cb(ctx, xid, gid);
@@ -1185,7 +1197,6 @@ filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, 
RepOriginId origin_id)
 
        /* set output state */
        ctx->accept_writes = false;
-       ctx->end_xact = false;
 
        /* do the actual work: call callback */
        ret = ctx->callbacks.filter_by_origin_cb(ctx, origin_id);
@@ -1193,6 +1204,8 @@ filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, 
RepOriginId origin_id)
        /* Pop the error context stack */
        error_context_stack = errcallback.previous;
 
+       /* FIXME: I think we need to call update_progress occasionally */
+
        return ret;
 }
 
@@ -1208,7 +1221,7 @@ message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN 
*txn,
        Assert(!ctx->fast_forward);
 
        if (ctx->callbacks.message_cb == NULL)
-               return;
+               goto out;
 
        /* Push callback + info on the error context stack */
        state.ctx = ctx;
@@ -1223,7 +1236,7 @@ message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN 
*txn,
        ctx->accept_writes = true;
        ctx->write_xid = txn != NULL ? txn->xid : InvalidTransactionId;
        ctx->write_location = message_lsn;
-       ctx->end_xact = false;
+       ctx->did_write = false;
 
        /* do the actual work: call callback */
        ctx->callbacks.message_cb(ctx, txn, message_lsn, transactional, prefix,
@@ -1231,6 +1244,10 @@ message_cb_wrapper(ReorderBuffer *cache, 
ReorderBufferTXN *txn,
 
        /* Pop the error context stack */
        error_context_stack = errcallback.previous;
+
+out:
+       /* XXX: Hm, not sure what's the right thing is here */
+       update_progress(ctx, false);
 }
 
 static void
@@ -1258,6 +1275,7 @@ stream_start_cb_wrapper(ReorderBuffer *cache, 
ReorderBufferTXN *txn,
        /* set output state */
        ctx->accept_writes = true;
        ctx->write_xid = txn->xid;
+       ctx->did_write = false;
 
        /*
         * Report this message's lsn so replies from clients can give an
@@ -1267,7 +1285,6 @@ stream_start_cb_wrapper(ReorderBuffer *cache, 
ReorderBufferTXN *txn,
         */
        ctx->write_location = first_lsn;
 
-       ctx->end_xact = false;
 
        /* in streaming mode, stream_start_cb is required */
        if (ctx->callbacks.stream_start_cb == NULL)
@@ -1280,6 +1297,8 @@ stream_start_cb_wrapper(ReorderBuffer *cache, 
ReorderBufferTXN *txn,
 
        /* Pop the error context stack */
        error_context_stack = errcallback.previous;
+
+       /* don't call update progress, we didn't really make any */
 }
 
 static void
@@ -1307,6 +1326,7 @@ stream_stop_cb_wrapper(ReorderBuffer *cache, 
ReorderBufferTXN *txn,
        /* set output state */
        ctx->accept_writes = true;
        ctx->write_xid = txn->xid;
+       ctx->did_write = false;
 
        /*
         * Report this message's lsn so replies from clients can give an
@@ -1316,7 +1336,6 @@ stream_stop_cb_wrapper(ReorderBuffer *cache, 
ReorderBufferTXN *txn,
         */
        ctx->write_location = last_lsn;
 
-       ctx->end_xact = false;
 
        /* in streaming mode, stream_stop_cb is required */
        if (ctx->callbacks.stream_stop_cb == NULL)
@@ -1329,6 +1348,8 @@ stream_stop_cb_wrapper(ReorderBuffer *cache, 
ReorderBufferTXN *txn,
 
        /* Pop the error context stack */
        error_context_stack = errcallback.previous;
+
+       /* don't call update progress, we didn't really make any */
 }
 
 static void
@@ -1357,7 +1378,7 @@ stream_abort_cb_wrapper(ReorderBuffer *cache, 
ReorderBufferTXN *txn,
        ctx->accept_writes = true;
        ctx->write_xid = txn->xid;
        ctx->write_location = abort_lsn;
-       ctx->end_xact = true;
+       ctx->did_write = false;
 
        /* in streaming mode, stream_abort_cb is required */
        if (ctx->callbacks.stream_abort_cb == NULL)
@@ -1370,6 +1391,9 @@ stream_abort_cb_wrapper(ReorderBuffer *cache, 
ReorderBufferTXN *txn,
 
        /* Pop the error context stack */
        error_context_stack = errcallback.previous;
+
+       /* XXX: Progress wasn't updated in pgoutput */
+       update_progress(ctx, true);
 }
 
 static void
@@ -1402,7 +1426,7 @@ stream_prepare_cb_wrapper(ReorderBuffer *cache, 
ReorderBufferTXN *txn,
        ctx->accept_writes = true;
        ctx->write_xid = txn->xid;
        ctx->write_location = txn->end_lsn;
-       ctx->end_xact = true;
+       ctx->did_write = false;
 
        /* in streaming mode with two-phase commits, stream_prepare_cb is 
required */
        if (ctx->callbacks.stream_prepare_cb == NULL)
@@ -1415,6 +1439,8 @@ stream_prepare_cb_wrapper(ReorderBuffer *cache, 
ReorderBufferTXN *txn,
 
        /* Pop the error context stack */
        error_context_stack = errcallback.previous;
+
+       update_progress(ctx, true);
 }
 
 static void
@@ -1443,7 +1469,7 @@ stream_commit_cb_wrapper(ReorderBuffer *cache, 
ReorderBufferTXN *txn,
        ctx->accept_writes = true;
        ctx->write_xid = txn->xid;
        ctx->write_location = txn->end_lsn;
-       ctx->end_xact = true;
+       ctx->did_write = false;
 
        /* in streaming mode, stream_commit_cb is required */
        if (ctx->callbacks.stream_commit_cb == NULL)
@@ -1456,6 +1482,8 @@ stream_commit_cb_wrapper(ReorderBuffer *cache, 
ReorderBufferTXN *txn,
 
        /* Pop the error context stack */
        error_context_stack = errcallback.previous;
+
+       update_progress(ctx, true);
 }
 
 static void
@@ -1483,6 +1511,7 @@ stream_change_cb_wrapper(ReorderBuffer *cache, 
ReorderBufferTXN *txn,
        /* set output state */
        ctx->accept_writes = true;
        ctx->write_xid = txn->xid;
+       ctx->did_write = false;
 
        /*
         * Report this change's lsn so replies from clients can give an 
up-to-date
@@ -1492,7 +1521,6 @@ stream_change_cb_wrapper(ReorderBuffer *cache, 
ReorderBufferTXN *txn,
         */
        ctx->write_location = change->lsn;
 
-       ctx->end_xact = false;
 
        /* in streaming mode, stream_change_cb is required */
        if (ctx->callbacks.stream_change_cb == NULL)
@@ -1505,6 +1533,15 @@ stream_change_cb_wrapper(ReorderBuffer *cache, 
ReorderBufferTXN *txn,
 
        /* Pop the error context stack */
        error_context_stack = errcallback.previous;
+
+       /*
+        * FIXME: See change_cb_wrapper(). Probably should be in a helper
+        * function.
+        */
+       if (!ctx->did_write)
+       {
+               update_progress(ctx, false);
+       }
 }
 
 static void
@@ -1523,7 +1560,7 @@ stream_message_cb_wrapper(ReorderBuffer *cache, 
ReorderBufferTXN *txn,
 
        /* this callback is optional */
        if (ctx->callbacks.stream_message_cb == NULL)
-               return;
+               goto out;
 
        /* Push callback + info on the error context stack */
        state.ctx = ctx;
@@ -1538,7 +1575,7 @@ stream_message_cb_wrapper(ReorderBuffer *cache, 
ReorderBufferTXN *txn,
        ctx->accept_writes = true;
        ctx->write_xid = txn != NULL ? txn->xid : InvalidTransactionId;
        ctx->write_location = message_lsn;
-       ctx->end_xact = false;
+       ctx->did_write = false;
 
        /* do the actual work: call callback */
        ctx->callbacks.stream_message_cb(ctx, txn, message_lsn, transactional, 
prefix,
@@ -1546,6 +1583,10 @@ stream_message_cb_wrapper(ReorderBuffer *cache, 
ReorderBufferTXN *txn,
 
        /* Pop the error context stack */
        error_context_stack = errcallback.previous;
+
+out:
+       /* XXX: Hm, not sure what's the right thing is here */
+       update_progress(ctx, false);
 }
 
 static void
@@ -1578,6 +1619,7 @@ stream_truncate_cb_wrapper(ReorderBuffer *cache, 
ReorderBufferTXN *txn,
        /* set output state */
        ctx->accept_writes = true;
        ctx->write_xid = txn->xid;
+       ctx->did_write = false;
 
        /*
         * Report this change's lsn so replies from clients can give an 
up-to-date
@@ -1587,51 +1629,33 @@ stream_truncate_cb_wrapper(ReorderBuffer *cache, 
ReorderBufferTXN *txn,
         */
        ctx->write_location = change->lsn;
 
-       ctx->end_xact = false;
 
        ctx->callbacks.stream_truncate_cb(ctx, txn, nrelations, relations, 
change);
 
        /* Pop the error context stack */
        error_context_stack = errcallback.previous;
+
+       /* truncates aren't progress on their own */
 }
 
+
+/*
+ * Update progress tracking (if required).
+ *
+ * FIXME: This should now get a different name.
+
+ * FIXME: I think instead of skipped_xact we should just track whether
+ * something was written?
+ */
 static void
-update_progress_txn_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
-                                                          XLogRecPtr lsn)
+update_progress(struct LogicalDecodingContext *ctx,
+                               bool finished_xact)
 {
-       LogicalDecodingContext *ctx = cache->private_data;
-       LogicalErrorCallbackState state;
-       ErrorContextCallback errcallback;
+       if (!ctx->update_progress)
+               return;
 
-       Assert(!ctx->fast_forward);
-
-       /* Push callback + info on the error context stack */
-       state.ctx = ctx;
-       state.callback_name = "update_progress_txn";
-       state.report_location = lsn;
-       errcallback.callback = output_plugin_error_callback;
-       errcallback.arg = (void *) &state;
-       errcallback.previous = error_context_stack;
-       error_context_stack = &errcallback;
-
-       /* set output state */
-       ctx->accept_writes = false;
-       ctx->write_xid = txn->xid;
-
-       /*
-        * Report this change's lsn so replies from clients can give an 
up-to-date
-        * answer. This won't ever be enough (and shouldn't be!) to confirm
-        * receipt of this transaction, but it might allow another transaction's
-        * commit to be confirmed with one message.
-        */
-       ctx->write_location = lsn;
-
-       ctx->end_xact = false;
-
-       OutputPluginUpdateProgress(ctx, false);
-
-       /* Pop the error context stack */
-       error_context_stack = errcallback.previous;
+       ctx->update_progress(ctx, ctx->write_location, ctx->write_xid,
+                                                ctx->did_write, finished_xact);
 }
 
 /*
diff --git a/src/backend/replication/logical/reorderbuffer.c 
b/src/backend/replication/logical/reorderbuffer.c
index d5f90a5f5d2..0468d12936f 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -2100,8 +2100,6 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, 
ReorderBufferTXN *txn,
        PG_TRY();
        {
                ReorderBufferChange *change;
-               int                     changes_count = 0;      /* used to 
accumulate the number of
-                                                                               
 * changes */
 
                if (using_subtxn)
                        BeginInternalSubTransaction(streaming ? "stream" : 
"replay");
@@ -2442,24 +2440,6 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, 
ReorderBufferTXN *txn,
                                        elog(ERROR, "tuplecid value in 
changequeue");
                                        break;
                        }
-
-                       /*
-                        * It is possible that the data is not sent to 
downstream for a
-                        * long time either because the output plugin filtered 
it or there
-                        * is a DDL that generates a lot of data that is not 
processed by
-                        * the plugin. So, in such cases, the downstream can 
timeout. To
-                        * avoid that we try to send a keepalive message if 
required.
-                        * Trying to send a keepalive message after every 
change has some
-                        * overhead, but testing showed there is no noticeable 
overhead if
-                        * we do it after every ~100 changes.
-                        */
-#define CHANGES_THRESHOLD 100
-
-                       if (++changes_count >= CHANGES_THRESHOLD)
-                       {
-                               rb->update_progress_txn(rb, txn, change->lsn);
-                               changes_count = 0;
-                       }
                }
 
                /* speculative insertion record must be freed by now */
diff --git a/src/backend/replication/pgoutput/pgoutput.c 
b/src/backend/replication/pgoutput/pgoutput.c
index 73b080060da..dc2b958437b 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -584,7 +584,6 @@ pgoutput_commit_txn(LogicalDecodingContext *ctx, 
ReorderBufferTXN *txn,
         * from this transaction has been sent to the downstream.
         */
        sent_begin_txn = txndata->sent_begin_txn;
-       OutputPluginUpdateProgress(ctx, !sent_begin_txn);
        pfree(txndata);
        txn->output_plugin_private = NULL;
 
@@ -623,8 +622,6 @@ static void
 pgoutput_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
                                         XLogRecPtr prepare_lsn)
 {
-       OutputPluginUpdateProgress(ctx, false);
-
        OutputPluginPrepareWrite(ctx, true);
        logicalrep_write_prepare(ctx->out, txn, prepare_lsn);
        OutputPluginWrite(ctx, true);
@@ -637,8 +634,6 @@ static void
 pgoutput_commit_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN 
*txn,
                                                         XLogRecPtr commit_lsn)
 {
-       OutputPluginUpdateProgress(ctx, false);
-
        OutputPluginPrepareWrite(ctx, true);
        logicalrep_write_commit_prepared(ctx->out, txn, commit_lsn);
        OutputPluginWrite(ctx, true);
@@ -653,8 +648,6 @@ pgoutput_rollback_prepared_txn(LogicalDecodingContext *ctx,
                                                           XLogRecPtr 
prepare_end_lsn,
                                                           TimestampTz 
prepare_time)
 {
-       OutputPluginUpdateProgress(ctx, false);
-
        OutputPluginPrepareWrite(ctx, true);
        logicalrep_write_rollback_prepared(ctx->out, txn, prepare_end_lsn,
                                                                           
prepare_time);
@@ -1895,8 +1888,6 @@ pgoutput_stream_commit(struct LogicalDecodingContext *ctx,
        Assert(!in_streaming);
        Assert(rbtxn_is_streamed(txn));
 
-       OutputPluginUpdateProgress(ctx, false);
-
        OutputPluginPrepareWrite(ctx, true);
        logicalrep_write_stream_commit(ctx->out, txn, commit_lsn);
        OutputPluginWrite(ctx, true);
@@ -1916,7 +1907,6 @@ pgoutput_stream_prepare_txn(LogicalDecodingContext *ctx,
 {
        Assert(rbtxn_is_streamed(txn));
 
-       OutputPluginUpdateProgress(ctx, false);
        OutputPluginPrepareWrite(ctx, true);
        logicalrep_write_stream_prepare(ctx->out, txn, prepare_lsn);
        OutputPluginWrite(ctx, true);
diff --git a/src/backend/replication/walsender.c 
b/src/backend/replication/walsender.c
index 4ed3747e3f9..697f8be941e 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -251,7 +251,7 @@ static void WalSndWait(uint32 socket_events, long timeout, 
uint32 wait_event);
 static void WalSndPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, 
TransactionId xid, bool last_write);
 static void WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, 
TransactionId xid, bool last_write);
 static void WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, 
TransactionId xid,
-                                                                bool 
skipped_xact);
+                                                                bool 
did_write, bool finished_xact);
 static XLogRecPtr WalSndWaitForWal(XLogRecPtr loc);
 static void LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time);
 static TimeOffset LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now);
@@ -1465,12 +1465,11 @@ ProcessPendingWrites(void)
  */
 static void
 WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, 
TransactionId xid,
-                                        bool skipped_xact)
+                                        bool did_write, bool finished_xact)
 {
        static TimestampTz sendTime = 0;
        TimestampTz now = GetCurrentTimestamp();
        bool            pending_writes = false;
-       bool            end_xact = ctx->end_xact;
 
        /*
         * Track lag no more than once per WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS 
to
@@ -1481,8 +1480,9 @@ WalSndUpdateProgress(LogicalDecodingContext *ctx, 
XLogRecPtr lsn, TransactionId
         * transaction LSN.
         */
 #define WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS   1000
-       if (end_xact && TimestampDifferenceExceeds(sendTime, now,
-                                                                               
           WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS))
+       if (finished_xact &&
+               TimestampDifferenceExceeds(sendTime, now,
+                                                                  
WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS))
        {
                LagTrackerWrite(lsn, now);
                sendTime = now;
@@ -1496,7 +1496,7 @@ WalSndUpdateProgress(LogicalDecodingContext *ctx, 
XLogRecPtr lsn, TransactionId
         * the worst case we will just send an extra keepalive message when it 
is
         * really not required.
         */
-       if (skipped_xact &&
+       if (finished_xact && !did_write &&
                SyncRepRequested() &&
                ((volatile WalSndCtlData *) WalSndCtl)->sync_standbys_defined)
        {
@@ -1518,7 +1518,7 @@ WalSndUpdateProgress(LogicalDecodingContext *ctx, 
XLogRecPtr lsn, TransactionId
         * for large transactions where we don't send any changes to the
         * downstream and the receiver can timeout due to that.
         */
-       if (pending_writes || (!end_xact &&
+       if (pending_writes || (!finished_xact &&
                                                   now >= 
TimestampTzPlusMilliseconds(last_reply_timestamp,
                                                                                
                                          wal_sender_timeout / 2)))
                ProcessPendingWrites();
-- 
2.38.0

Reply via email to