On Wed, Aug 13, 2025 at 4:17 PM Zhijie Hou (Fujitsu)
<[email protected]> wrote:
>
> Here is the initial POC patch for this idea.
>

Thank you Hou-san for the patch.

I did some performance benchmarking for the patch and overall, the
results show substantial performance improvements.
Please find the details as follows:

Source code:
----------------
pgHead (572c0f1b0e) and v1-0001 patch

Setup:
---------
Pub --> Sub
 - Two nodes created in pub-sub logical replication setup.
 - Both nodes have the same set of pgbench tables created with scale=300.
 - The sub node is subscribed to all the changes from the pub node's
pgbench tables.

Workload Run:
--------------------
 - Disable the subscription on Sub node
 - Run default pgbench(read-write) only on Pub node with #clients=40
and run duration=10 minutes
 - Enable the subscription on Sub once pgbench completes and then
measure time taken in replication.
~~~

Test-01: Measure Replication lag
----------------------------------------
Observations:
---------------
 - Replication time improved as the number of parallel workers
increased with the patch.
 - On pgHead, replicating a 10-minute publisher workload took ~46 minutes.
 - With just 2 parallel workers (default), replication time was cut in
half, and with 8 workers it completed in ~13 minutes(3.5x faster).
 - With 16 parallel workers, achieved ~3.7x speedup over pgHead.
 - With 32 workers, performance gains plateaued slightly, likely due
to more workers running on the machine and work done parallelly is not
that high to see further improvements.

Detailed Result:
-----------------
Case    Time_taken_in_replication(sec)    rep_time_in_minutes
faster_than_head
1. pgHead              2760.791     46.01318333    -
2. patched_#worker=2    1463.853    24.3975    1.88 times
3. patched_#worker=4    1031.376    17.1896    2.68 times
4. patched_#worker=8      781.007    13.0168    3.54 times
5. patched_#worker=16    741.108    12.3518    3.73 times
6. patched_#worker=32    787.203    13.1201    3.51 times
~~~~

Test-02: Measure number of transactions parallelized
-----------------------------------------------------
 - Used a top up patch to LOG the number of transactions applied by
parallel worker, applied by leader, and are depended.
 - The LOG output e.g. -
  ```
LOG:  parallelized_nxact: 11497254 dependent_nxact: 0 leader_applied_nxact: 600
```
 - parallelized_nxact: gives the number of parallelized transactions
 - dependent_nxact: gives the dependent transactions
 - leader_applied_nxact: gives the transactions applied by leader worker
 (the required top-up v1-002 patch is attached.)

 Observations:
----------------
 - With 4 to 8 parallel workers, ~80%-98% transactions are parallelized
 - As the number of workers increased, the parallelized percentage
increased and reached 99.99% with 32 workers.

Detailed Result:
-----------------
case1: #parallel_workers = 2(default)
  #total_pgbench_txns = 24745648
    parallelized_nxact = 14439480 (58.35%)
    dependent_nxact    = 16 (0.00006%)
    leader_applied_nxact = 10306153 (41.64%)

case2: #parallel_workers = 4
  #total_pgbench_txns = 24776108
    parallelized_nxact = 19666593 (79.37%)
    dependent_nxact    = 212 (0.0008%)
    leader_applied_nxact = 5109304 (20.62%)

case3: #parallel_workers = 8
  #total_pgbench_txns = 24821333
    parallelized_nxact = 24397431 (98.29%)
    dependent_nxact    = 282 (0.001%)
    leader_applied_nxact = 423621 (1.71%)

case4: #parallel_workers = 16
  #total_pgbench_txns = 24938255
    parallelized_nxact = 24937754 (99.99%)
    dependent_nxact    = 142 (0.0005%)
    leader_applied_nxact = 360 (0.0014%)

case5: #parallel_workers = 32
  #total_pgbench_txns = 24769474
    parallelized_nxact = 24769135 (99.99%)
    dependent_nxact    = 312 (0.0013%)
    leader_applied_nxact = 28 (0.0001%)

~~~~~
The scripts used for above tests are attached.

Next, I plan to extend the testing to larger workloads by running
pgbench for 20–30 minutes.
We will also benchmark performance across different workload types to
evaluate the improvements once the patch has matured further.

--
Thanks,
Nisha
From 00c05e510015fd72e9f1ede34868e0f691ded299 Mon Sep 17 00:00:00 2001
From: Zhijie Hou <[email protected]>
Date: Thu, 14 Aug 2025 18:38:08 +0800
Subject: [PATCH v1] Add some simple statistics

---
 src/backend/replication/logical/worker.c | 21 +++++++++++++++++++++
 1 file changed, 21 insertions(+)

diff --git a/src/backend/replication/logical/worker.c 
b/src/backend/replication/logical/worker.c
index 11726b691fa..ff550900c2e 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -507,6 +507,12 @@ static BufFile *stream_fd = NULL;
  */
 static XLogRecPtr last_flushpos = InvalidXLogRecPtr;
 
+static uint64 parallelized_nxact = 0;
+static uint64 dependent_nxact = 0;
+static uint64 leader_applied_nxact = 0;
+
+static bool    dependent_xact = false;
+
 typedef struct SubXactInfo
 {
        TransactionId xid;                      /* XID of the subxact */
@@ -1138,6 +1144,8 @@ handle_dependency_on_change(LogicalRepMsgType action, 
StringInfo s,
        if (!depends_on_xids)
                return;
 
+       dependent_xact = true;
+
        /*
         * Notify the transactions that they are dependent on the current
         * transaction.
@@ -1831,6 +1839,8 @@ apply_handle_begin(StringInfo s)
        /* There must not be an active streaming transaction. */
        Assert(!TransactionIdIsValid(stream_xid));
 
+       dependent_xact = false;
+
        logicalrep_read_begin(s, &begin_data);
 
        remote_xid = begin_data.xid;
@@ -1903,11 +1913,17 @@ apply_handle_commit(StringInfo s)
        {
                case TRANS_LEADER_APPLY:
                        apply_handle_commit_internal(&commit_data);
+                       leader_applied_nxact++;
                        break;
 
                case TRANS_LEADER_SEND_TO_PARALLEL:
                        Assert(winfo);
 
+                       if (dependent_xact)
+                               dependent_nxact++;
+                       else
+                               parallelized_nxact++;
+
                        if (pa_send_data(winfo, s->len, s->data))
                        {
                                /* Finish processing the transaction. */
@@ -1967,6 +1983,8 @@ apply_handle_commit(StringInfo s)
 
        pgstat_report_activity(STATE_IDLE, NULL);
        reset_apply_error_context_info();
+
+       dependent_xact = false;
 }
 
 /*
@@ -5058,6 +5076,9 @@ send_feedback(XLogRecPtr recvpos, bool force, bool 
requestReply)
                return;
        send_time = now;
 
+       elog(LOG, "parallelized_nxact: " UINT64_FORMAT " dependent_nxact: " 
UINT64_FORMAT " leader_applied_nxact: " UINT64_FORMAT,
+               parallelized_nxact, dependent_nxact, leader_applied_nxact);
+
        if (!reply_message)
        {
                MemoryContext oldctx = MemoryContextSwitchTo(ApplyContext);
-- 
2.50.1.windows.1

#!/bin/bash

##################
### Definition ###
##################

##sleep 5s

port_pub=5533
port_sub=5534

## scale factor
SCALE=$2

## pgbench init command
INIT_COMMAND="./pgbench -i -U postgres postgres -s $SCALE"

SOURCE=$1
WORKERS=$3

################
### clean up ###
################

./pg_ctl stop -m i -D data_pub -w
./pg_ctl stop -m i -D data_sub -w
rm -rf data*
rm pub.log
rm sub.log

#######################
### setup publisher ###
#######################

./initdb -D data_pub -U postgres
cat << EOF >> data_pub/postgresql.conf
port=$port_pub
autovacuum = false
shared_buffers = '30GB'
max_wal_size = 20GB
min_wal_size = 10GB
wal_level = logical
EOF

./pg_ctl -D data_pub start -w -l pub.log
$INIT_COMMAND -p $port_pub
./psql -U postgres -p $port_pub -c "CREATE PUBLICATION pub FOR ALL TABLES;"

#######################
### setup sublisher ###
#######################

./initdb -D data_sub -U postgres

cat << EOF >> data_sub/postgresql.conf
port=$port_sub
autovacuum = false
shared_buffers = '30GB'
max_wal_size = 20GB
min_wal_size = 10GB
track_commit_timestamp = on
# log_min_messages = DEBUG1
max_worker_processes = 100
max_logical_replication_workers = 50
max_parallel_apply_workers_per_subscription = $WORKERS
EOF

./pg_ctl -D data_sub start -w -l sub.log

## create tables only, no data
$INIT_COMMAND -Idtp -p $port_sub


## subscription definition is same in both head and patched case
./psql -U postgres -p $port_sub -c "CREATE SUBSCRIPTION sub CONNECTION 'port=$port_pub user=postgres' PUBLICATION pub;"


# Wait until all the table sync is done
REMAIN="f"

while [ "$REMAIN" = "f" ]
do
   # Sleep a bit to avoid running the query too much
    sleep 1s

    # Check pg_subscription_rel catalog. This query is ported from wait_for_subscription_sync()
    # defined in Cluster.pm.
    REMAIN=`./psql -qtA -U postgres -p $port_sub -c "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');"`

    # Print the result for the debugging purpose
    echo $REMAIN
done


sleep 5s
#!/bin/bash

##################
### Definition ###
##################
port_pub=5533
port_sub=5534

## Used source
SOURCE=patched

## Number of runs
NUMRUN=1

## Measurement for
SCALE=300
DURATION=600
NUMCLIENTS=40
WORKERS=32

echo "Test on $SOURCE with scale=$SCALE, duration=$DURATION and #workers=$WORKERS"

###########################
### measure performance ###
###########################

for i in `seq ${NUMRUN}`
do
    # Prepare clean enviroment for each measurements
    sh v1_pa_pub-sub_setup.sh $SOURCE $SCALE $WORKERS

    echo "=================="
    echo "${SOURCE}_${i}.dat"
    echo "=================="

    # Disable the subscription
    ./psql -U postgres -p $port_sub -c "ALTER SUBSCRIPTION sub DISABLE;"

##   ./psql -U postgres -p $port_pub -c "select count(*) from pgbench_history; TRUNCATE pgbench_history;"

  
    # Do actual measurements                                                                                        
    ./pgbench -p $port_pub -U postgres postgres -c $NUMCLIENTS -j $NUMCLIENTS -T $DURATION > pub_scale${SCALE}_${NUMCLIENTS}c_${WORKERS}w_${SOURCE}_${DURATION}s_${i}.dat 

    ####./pgbench -p $port_sub -U postgres postgres -c $NUMCLIENTS -j $NUMCLIENTS -T $DURATION > sub_${SOURCE}_${i}.dat


 ##  ./psql -U postgres -p $port_pub -c "select count(*) from pgbench_history; TRUNCATE pgbench_history;"

    # Enable the subscription
    ./psql -U postgres -p $port_sub -c "ALTER SUBSCRIPTION sub ENABLE;"

done

Reply via email to