Re: [PATCH] Reuse Workers and Replication Slots during Logical Replication
On Wed, 10 Jan 2024 at 15:04, Amit Kapila wrote: > > On Wed, Jan 10, 2024 at 2:59 PM Shlok Kyal wrote: > > > > This patch is not applying on the HEAD. Please rebase and share the > > updated patch. > > > > IIRC, there were some regressions observed with this patch. So, one > needs to analyze those as well. I think we should mark it as "Returned > with feedback". Thanks, I have updated the status to "Returned with feedback". Feel free to post an updated version with the fix for the regression and start a new entry for the same. Regards, Vignesh
Re: [PATCH] Reuse Workers and Replication Slots during Logical Replication
On Wed, Jan 10, 2024 at 2:59 PM Shlok Kyal wrote: > > This patch is not applying on the HEAD. Please rebase and share the > updated patch. > IIRC, there were some regressions observed with this patch. So, one needs to analyze those as well. I think we should mark it as "Returned with feedback". -- With Regards, Amit Kapila.
Re: [PATCH] Reuse Workers and Replication Slots during Logical Replication
Hi, This patch is not applying on the HEAD. Please rebase and share the updated patch. Thanks and Regards Shlok Kyal On Wed, 10 Jan 2024 at 14:55, Peter Smith wrote: > > Oops - now with attachments > > On Mon, Aug 21, 2023 at 5:56 PM Peter Smith wrote: >> >> Hi Melih, >> >> Last week we revisited your implementation of design#2. Vignesh rebased it, >> and then made a few other changes. >> >> PSA v28* >> >> The patch changes include: >> * changed the logic slightly by setting recv_immediately(new variable), if >> this variable is set the main apply worker loop will not wait in this case. >> * setting the relation state to ready immediately if there are no more >> incremental changes to be synced. >> * receive the incremental changes if applicable and set the relation state >> to ready without waiting. >> * reuse the worker if the worker is free before trying to start a new table >> sync worker >> * restarting the tablesync worker only after wal_retrieve_retry_interval >> >> ~ >> >> FWIW, we just wanted to share with you the performance measurements seen >> using this design#2 patch set: >> >> == >> >> RESULTS (not busy tests) >> >> -- >> 10 empty tables >> 2w 4w 8w 16w >> HEAD: 125 119 140 133 >> HEAD+v28*: 92 93 123 134 >> %improvement: 27% 22% 12% -1% >> -- >> 100 empty tables >> 2w 4w 8w 16w >> HEAD: 1037843 11091155 >> HEAD+v28*: 591 625 26162569 >> %improvement: 43% 26% -136% -122% >> -- >> 1000 empty tables >> 2w 4w 8w 16w >> HEAD: 15874 10047 991910338 >> HEAD+v28*: 33673 12199 90949896 >> %improvement: -112% -21%8% 4% >> -- >> 2000 empty tables >> 2w 4w 8w 16w >> HEAD: 45266 24216 19395 19820 >> HEAD+v28*: 88043 21550 21668 22607 >> %improvement: -95% 11% -12%-14% >> >> ~~~ >> >> Note - the results were varying quite a lot in comparison to the HEAD >> e.g. HEAD results are very consistent, but the v28* results observed are not >> HEAD 1000 (2w): 15861, 15777, 16007, 15950, 15886, 15740, 15846, 15740, >> 15908, 15940 >> v28* 1000 (2w): 34214, 13679, 8792, 33289, 31976, 56071, 57042, 56163, >> 34058, 11969 >> >> -- >> Kind Regards, >> Peter Smith. >> Fujitsu Australia
Re: [PATCH] Reuse Workers and Replication Slots during Logical Replication
Oops - now with attachments On Mon, Aug 21, 2023 at 5:56 PM Peter Smith wrote: > Hi Melih, > > Last week we revisited your implementation of design#2. Vignesh rebased > it, and then made a few other changes. > > PSA v28* > > The patch changes include: > * changed the logic slightly by setting recv_immediately(new variable), if > this variable is set the main apply worker loop will not wait in this case. > * setting the relation state to ready immediately if there are no more > incremental changes to be synced. > * receive the incremental changes if applicable and set the relation state > to ready without waiting. > * reuse the worker if the worker is free before trying to start a new > table sync worker > * restarting the tablesync worker only after wal_retrieve_retry_interval > > ~ > > FWIW, we just wanted to share with you the performance measurements seen > using this design#2 patch set: > > == > > RESULTS (not busy tests) > > -- > 10 empty tables > 2w 4w 8w 16w > HEAD: 125 119 140 133 > HEAD+v28*: 92 93 123 134 > %improvement: 27% 22% 12% -1% > -- > 100 empty tables > 2w 4w 8w 16w > HEAD: 1037843 11091155 > HEAD+v28*: 591 625 26162569 > %improvement: 43% 26% -136% -122% > -- > 1000 empty tables > 2w 4w 8w 16w > HEAD: 15874 10047 991910338 > HEAD+v28*: 33673 12199 90949896 > %improvement: -112% -21%8% 4% > -- > 2000 empty tables > 2w 4w 8w 16w > HEAD: 45266 24216 19395 19820 > HEAD+v28*: 88043 21550 21668 22607 > %improvement: -95% 11% -12%-14% > > ~~~ > > Note - the results were varying quite a lot in comparison to the HEAD > e.g. HEAD results are very consistent, but the v28* results observed are > not > HEAD 1000 (2w): 15861, 15777, 16007, 15950, 15886, 15740, 15846, 15740, > 15908, 15940 > v28* 1000 (2w): 34214, 13679, 8792, 33289, 31976, 56071, 57042, 56163, > 34058, 11969 > > -- > Kind Regards, > Peter Smith. > Fujitsu Australia > v28-0001-Reuse-Tablesync-Workers.patch Description: Binary data v28-0002-Reuse-connection-when-tablesync-workers-change-t.patch Description: Binary data v28-0004-Defect-fixes.patch Description: Binary data v28-0003-apply-worker-assigns-tables.patch Description: Binary data
Re: [PATCH] Reuse Workers and Replication Slots during Logical Replication
Hi Melih, Last week we revisited your implementation of design#2. Vignesh rebased it, and then made a few other changes. PSA v28* The patch changes include: * changed the logic slightly by setting recv_immediately(new variable), if this variable is set the main apply worker loop will not wait in this case. * setting the relation state to ready immediately if there are no more incremental changes to be synced. * receive the incremental changes if applicable and set the relation state to ready without waiting. * reuse the worker if the worker is free before trying to start a new table sync worker * restarting the tablesync worker only after wal_retrieve_retry_interval ~ FWIW, we just wanted to share with you the performance measurements seen using this design#2 patch set: == RESULTS (not busy tests) -- 10 empty tables 2w 4w 8w 16w HEAD: 125 119 140 133 HEAD+v28*: 92 93 123 134 %improvement: 27% 22% 12% -1% -- 100 empty tables 2w 4w 8w 16w HEAD: 1037843 11091155 HEAD+v28*: 591 625 26162569 %improvement: 43% 26% -136% -122% -- 1000 empty tables 2w 4w 8w 16w HEAD: 15874 10047 991910338 HEAD+v28*: 33673 12199 90949896 %improvement: -112% -21%8% 4% -- 2000 empty tables 2w 4w 8w 16w HEAD: 45266 24216 19395 19820 HEAD+v28*: 88043 21550 21668 22607 %improvement: -95% 11% -12%-14% ~~~ Note - the results were varying quite a lot in comparison to the HEAD e.g. HEAD results are very consistent, but the v28* results observed are not HEAD 1000 (2w): 15861, 15777, 16007, 15950, 15886, 15740, 15846, 15740, 15908, 15940 v28* 1000 (2w): 34214, 13679, 8792, 33289, 31976, 56071, 57042, 56163, 34058, 11969 -- Kind Regards, Peter Smith. Fujitsu Australia
Re: [PATCH] Reuse Workers and Replication Slots during Logical Replication
On Fri, Aug 11, 2023 at 11:45 PM Melih Mutlu wrote: > > Again, I couldn't reproduce the cases where you saw significantly degraded > performance. I wonder if I'm missing something. Did you do anything not > included in the test scripts you shared? Do you think v26-0001 will perform > 84% worse than HEAD, if you try again? I just want to be sure that it was not > a random thing. > Interestingly, I also don't see an improvement in above results as big as in > your results when inserts/tx ratio is smaller. Even though it certainly is > improved in such cases. > TEST ENVIRONMENTS I am running the tests on a high-spec machine: -- NOTE: Nobody else is using this machine during our testing, so there are no unexpected influences messing up the results. Linix Architecture: x86_64 CPU(s):120 Thread(s) per core:2 Core(s) per socket:15 totalusedfree shared buff/cache available Mem: 755G5.7G737G 49M 12G748G Swap: 4.0G 0B4.0G ~~~ The results I am seeing are not random. HEAD+v26-0001 is consistently worse than HEAD but only for some settings. With these settings, I see bad results (i.e. worse than HEAD) consistently every time using the dedicated test machine. Hou-san also reproduced bad results using a different high-spec machine Vignesh also reproduced bad results using just his laptop but in his case, it did *not* occur every time. As discussed elsewhere the problem is timing-related, so sometimes you may be lucky and sometimes not. ~ I expect you are running everything correctly, but if you are using just a laptop (like Vignesh) then like him you might need to try multiple times before you can hit the problem happening in your environment. Anyway, in case there is some other reason you are not seeing the bad results I have re-attached scripts and re-described every step below. == BUILDING -- NOTE: I have a very minimal configuration without any optimization/debug flags etc. See config.log $ ./configure --prefix=/home/peter/pg_oss -- NOTE: Of course, make sure to be running using the correct Postgres: echo 'set environment variables for OSS work' export PATH=/home/peter/pg_oss/bin:$PATH -- NOTE: Be sure to do git stash or whatever so don't accidentally build a patched version thinking it is the HEAD version -- NOTE: Be sure to do a full clean build and apply (or don't apply v26-0001) according to the test you wish to run. STEPS 1. sudo make clean 2. make 3. sudo make install == SCRIPTS & STEPS SCRIPTS testrun.sh do_one_test_setup.sh do_one_test_PUB.sh do_one_test_SUB.sh --- STEPS Step-1. Edit the testrun.sh tables=( 100 ) workers=( 2 4 8 16 ) size="0" prefix="0816headbusy" <-- edit to differentiate each test run ~ Step-2. Edit the do_one_test_PUB.sh IF commit_counter = 1000 THEN <-- edit this if needed. I wanted 1000 inserts/tx so nothing to do ~ Step-3: Check nothing else is running. If yes, then clean it up [peter@localhost testing_busy]$ ps -eaf | grep postgres peter111924 100103 0 19:31 pts/000:00:00 grep --color=auto postgres ~ Step-4: Run the tests [peter@localhost testing_busy]$ ./testrun.sh num_tables=100, size=0, num_workers=2, run #1 <-- check the echo matched the config you set in the Setp-1 waiting for server to shut down done server stopped waiting for server to shut down done server stopped num_tables=100, size=0, num_workers=2, run #2 waiting for server to shut down done server stopped waiting for server to shut down done server stopped num_tables=100, size=0, num_workers=2, run #3 ... ~ Step-5: Sanity check When the test completes the current folder will be full of .log and .dat* files. Check for sanity that no errors happened [peter@localhost testing_busy]$ cat *.log | grep ERROR [peter@localhost testing_busy]$ ~ Step-6: Collect the results The results are output (by the do_one_test_SUB.sh) into the *.dat_SUB files Use grep to extract them [peter@localhost testing_busy]$ cat 0816headbusy_100t_0_2w_*.dat_SUB | grep RESULT | grep -v duration | awk '{print $3}' 11742.019 12157.355 11773.807 11582.981 12220.962 12546.325 12210.713 12614.892 12015.489 13527.05 Repeat grep for other files: $ cat 0816headbusy_100t_0_4w_*.dat_SUB | grep RESULT | grep -v duration | awk '{print $3}' $ cat 0816headbusy_100t_0_8w_*.dat_SUB | grep RESULT | grep -v duration | awk '{print $3}' $ cat 0816headbusy_100t_0_16w_*.dat_SUB | grep RESULT | grep -v duration | awk '{print $3}' ~ Step-7: Summarise the results Now I just cut/paste the results from Step-6 into a spreadsheet and report the median of the runs. For example, for the above HEAD run, it was: 2w4w 8w 16w 1 11742 5996 1919 1582 2 12157 5960 1871 1469 3 11774 5926 2101 1571 4 11583 6155 1883 1671 5 12221 6310 1895 1707 6 12
Re: [PATCH] Reuse Workers and Replication Slots during Logical Replication
Here is another review comment about patch v26-0001. The tablesync worker processes include the 'relid' as part of their name. See launcher.c: snprintf(bgw.bgw_name, BGW_MAXLEN, "logical replication tablesync worker for subscription %u sync %u", subid, relid); ~~ And if that worker is "reused" by v26-0001 to process another relation there is a LOG if (reuse_worker) ereport(LOG, errmsg("logical replication table synchronization worker for subscription \"%s\" will be reused to sync table \"%s\" with relid %u.", MySubscription->name, get_rel_name(MyLogicalRepWorker->relid), MyLogicalRepWorker->relid)); AFAICT, when being "reused" the original process name remains unchanged, and so I think it will continue to appear to any user looking at it that the tablesync process is just taking a very long time handling the original 'relid'. Won't the stale process name cause confusion to the users? -- Kind Regards, Peter Smith. Fujitsu Australia
Re: [PATCH] Reuse Workers and Replication Slots during Logical Replication
On Thu, 10 Aug 2023 at 10:16, Amit Kapila wrote: > > On Wed, Aug 9, 2023 at 8:28 AM Zhijie Hou (Fujitsu) > wrote: > > > > On Thursday, August 3, 2023 7:30 PM Melih Mutlu > > wrote: > > > > > Right. I attached the v26 as you asked. > > > > Thanks for posting the patches. > > > > While reviewing the patch, I noticed one rare case that it's possible that > > there > > are two table sync worker for the same table in the same time. > > > > The patch relies on LogicalRepWorkerLock to prevent concurrent access, but > > the > > apply worker will start a new worker after releasing the lock. So, at the > > point[1] > > where the lock is released and the new table sync worker has not been > > started, > > it seems possible that another old table sync worker will be reused for the > > same table. > > > > /* Now safe to release the LWLock */ > > LWLockRelease(LogicalRepWorkerLock); > > *[1] > > /* > > * If there are free sync worker slot(s), > > start a new sync > > * worker for the table. > > */ > > if (nsyncworkers < > > max_sync_workers_per_subscription) > > ... > > > > logicalrep_worker_launch(MyLogicalRepWorker->dbid, > > > > Yeah, this is a problem. I think one idea to solve this is by > extending the lock duration till we launch the tablesync worker but we > should also consider changing this locking scheme such that there is a > better way to indicate that for a particular rel, tablesync is in > progress. Currently, the code in TablesyncWorkerMain() also acquires > the lock in exclusive mode even though the tablesync for a rel is in > progress which I guess could easily heart us for larger values of > max_logical_replication_workers. So, that could be another motivation > to think for a different locking scheme. There are couple of ways in which this issue can be solved: Approach #1) check that the reuse worker has not picked up this table for table sync from logicalrep_worker_launch while holding a lock on LogicalRepWorkerLock, if the reuse worker has already picked it up for processing, simply ignore it and return, nothing has to be done by the launcher in this case. Approach #2) a) Applyworker to create a shared memory of all the relations that need to be synced, b) tablesync worker to take a lock on this shared memory and pick the next table to be processed(tablesync worker need not get the subscription relations again and again) c) tablesync worker to update the status in shared memory for the relation(since the lock is held there will be no concurrency issues), also mark the start time in the shared memory, this will help in not to restart the failed table before wal_retrieve_retry_interval has expired d) tablesync worker to sync the table e) subscription relation will be marked as ready and the tablesync worker to remove the entry from shared memory f) Applyworker will periodically synchronize the shared memory relations to keep it in sync with the fetched subscription relation tables g) when apply worker exits, the shared memory will be cleared. Approach #2) will also help in solving the other issue reported by Amit at [1]. I felt we can use Approach #2 to solve the problem as it solves both the reported issues and also there is an added advantage where the re-use table sync worker need not scan the pg_subscription_rel to get the non-ready table for every run, instead we can use the list prepared by apply worker. Thoughts? [1] - https://www.postgresql.org/message-id/CAA4eK1KyHfVOVeio28p8CHDnuyKuej78cj_7U9mHAB4ictVQwQ%40mail.gmail.com Regards, Vignesh
Re: [PATCH] Reuse Workers and Replication Slots during Logical Replication
On Thu, Aug 10, 2023 at 10:15 AM Amit Kapila wrote: > > On Wed, Aug 9, 2023 at 8:28 AM Zhijie Hou (Fujitsu) > wrote: > > > > On Thursday, August 3, 2023 7:30 PM Melih Mutlu > > wrote: > > > > > Right. I attached the v26 as you asked. > > > > Thanks for posting the patches. > > > > While reviewing the patch, I noticed one rare case that it's possible that > > there > > are two table sync worker for the same table in the same time. > > > > The patch relies on LogicalRepWorkerLock to prevent concurrent access, but > > the > > apply worker will start a new worker after releasing the lock. So, at the > > point[1] > > where the lock is released and the new table sync worker has not been > > started, > > it seems possible that another old table sync worker will be reused for the > > same table. > > > > /* Now safe to release the LWLock */ > > LWLockRelease(LogicalRepWorkerLock); > > *[1] > > /* > > * If there are free sync worker slot(s), > > start a new sync > > * worker for the table. > > */ > > if (nsyncworkers < > > max_sync_workers_per_subscription) > > ... > > > > logicalrep_worker_launch(MyLogicalRepWorker->dbid, > > > > Yeah, this is a problem. I think one idea to solve this is by > extending the lock duration till we launch the tablesync worker but we > should also consider changing this locking scheme such that there is a > better way to indicate that for a particular rel, tablesync is in > progress. Currently, the code in TablesyncWorkerMain() also acquires > the lock in exclusive mode even though the tablesync for a rel is in > progress which I guess could easily heart us for larger values of > max_logical_replication_workers. So, that could be another motivation > to think for a different locking scheme. > Yet another problem is that currently apply worker maintains a hash table for 'last_start_times' to avoid restarting the tablesync worker immediately upon error. The same functionality is missing while reusing the table sync worker. One possibility is to use a shared hash table to remember start times but I think it may depend on what we decide to solve the previous problem reported by Hou-San. -- With Regards, Amit Kapila.
Re: [PATCH] Reuse Workers and Replication Slots during Logical Replication
On Fri, Aug 11, 2023 at 7:15 PM Melih Mutlu wrote: > > Peter Smith , 11 Ağu 2023 Cum, 01:26 tarihinde şunu > yazdı: >> >> No, I meant what I wrote there. When I ran the tests the HEAD included >> the v25-0001 refactoring patch, but v26 did not yet exist. >> >> For now, we are only performance testing the first >> "Reuse-Tablesyc-Workers" patch, but not yet including the second patch >> ("Reuse connection when..."). >> >> Note that those "Reuse-Tablesyc-Workers" patches v24-0002 and v26-0001 >> are equivalent because there are only cosmetic log message differences >> between them. > > > Ok, that's fair. > > >> >> So, my testing was with HEAD+v24-0002 (but not including v24-0003). >> Your same testing should be with HEAD+v26-0001 (but not including v26-0002). > > > That's actually what I did. I should have been more clear about what I > included in my previous email.With v26-0002, results are noticeably better > anyway. > I just rerun the test again against HEAD, HEAD+v26-0001 and additionally > HEAD+v26-0001+v26-0002 this time, for better comparison. > > Here are my results with the same scripts you shared earlier (I obviously > only changed the number of inserts before each commit. ). > Note that this is when synchronous_commit = off. > > 100 inserts/tx > +-+---+--+--+--+ > | | 2w| 4w | 8w | 16w | > +-+---+--+--+--+ > | v26-0002| 10421 | 6472 | 6656 | 6566 | > +-+---+--+--+--+ > | improvement | 31% | 12% | 0% | 5% | > +-+---+--+--+--+ > | v26-0001| 14585 | 7386 | 7129 | 7274 | > +-+---+--+--+--+ > | improvement | 9%| 5% | 12% | 7% | > +-+---+--+--+--+ > | HEAD| 16130 | 7785 | 8147 | 7827 | > +-+---+--+--+--+ > > 1000 inserts/tx > +-+---+--+--+--+ > | | 2w| 4w | 8w | 16w | > +-+---+--+--+--+ > | v26-0002| 13796 | 6848 | 5942 | 6315 | > +-+---+--+--+--+ > | improvement | 9%| 7% | 10% | 8% | > +-+---+--+--+--+ > | v26-0001| 14685 | 7325 | 6675 | 6719 | > +-+---+--+--+--+ > | improvement | 3%| 0% | 0% | 2% | > +-+---+--+--+--+ > | HEAD| 15118 | 7354 | 6644 | 6890 | > +-+---+--+--+--+ > > 2000 inserts/tx > +-+---+---+--+--+ > | | 2w| 4w| 8w | 16w | > +-+---+---+--+--+ > | v26-0002| 22442 | 9944 | 6034 | 5829 | > +-+---+---+--+--+ > | improvement | 5%| 2%| 4% | 10% | > +-+---+---+--+--+ > | v26-0001| 23632 | 10164 | 6311 | 6480 | > +-+---+---+--+--+ > | improvement | 0%| 0%| 0% | 0% | > +-+---+---+--+--+ > | HEAD| 23667 | 10157 | 6285 | 6470 | > +-+---+---+--+--+ > > 5000 inserts/tx > +-+---+---+---+--+ > | | 2w| 4w| 8w| 16w | > +-+---+---+---+--+ > | v26-0002| 41443 | 21385 | 10832 | 6146 | > +-+---+---+---+--+ > | improvement | 0%| 0%| 1%| 16% | > +-+---+---+---+--+ > | v26-0001| 41293 | 21226 | 10814 | 6158 | > +-+---+---+---+--+ > | improvement | 0%| 1%| 1%| 15% | > +-+---+---+---+--+ > | HEAD| 41503 | 21466 | 10943 | 7292 | > +-+---+---+---+--+ > > > Again, I couldn't reproduce the cases where you saw significantly degraded > performance. > I am not surprised to see that you don't see regression because as per Vignesh's analysis, this is purely a timing issue where sometimes after the patch the slot creation can take more time because there is a constant inflow of transactions on the publisher. I think we are seeing it because this workload is predominantly just creating and destroying slots. We can probably improve it later as discussed earlier by using a single for multiple copies (especially for small tables) or something like that. -- With Regards, Amit Kapila.
Re: [PATCH] Reuse Workers and Replication Slots during Logical Replication
Hi Peter, Peter Smith , 11 Ağu 2023 Cum, 01:26 tarihinde şunu yazdı: > No, I meant what I wrote there. When I ran the tests the HEAD included > the v25-0001 refactoring patch, but v26 did not yet exist. > > For now, we are only performance testing the first > "Reuse-Tablesyc-Workers" patch, but not yet including the second patch > ("Reuse connection when..."). > > Note that those "Reuse-Tablesyc-Workers" patches v24-0002 and v26-0001 > are equivalent because there are only cosmetic log message differences > between them. > Ok, that's fair. > So, my testing was with HEAD+v24-0002 (but not including v24-0003). > Your same testing should be with HEAD+v26-0001 (but not including > v26-0002). > That's actually what I did. I should have been more clear about what I included in my previous email.With v26-0002, results are noticeably better anyway. I just rerun the test again against HEAD, HEAD+v26-0001 and additionally HEAD+v26-0001+v26-0002 this time, for better comparison. Here are my results with the same scripts you shared earlier (I obviously only changed the number of inserts before each commit. ). Note that this is when synchronous_commit = off. 100 inserts/tx +-+---+--+--+--+ | | 2w| 4w | 8w | 16w | +-+---+--+--+--+ | v26-0002| 10421 | 6472 | 6656 | 6566 | +-+---+--+--+--+ | improvement | 31% | 12% | 0% | 5% | +-+---+--+--+--+ | v26-0001| 14585 | 7386 | 7129 | 7274 | +-+---+--+--+--+ | improvement | 9%| 5% | 12% | 7% | +-+---+--+--+--+ | HEAD| 16130 | 7785 | 8147 | 7827 | +-+---+--+--+--+ 1000 inserts/tx +-+---+--+--+--+ | | 2w| 4w | 8w | 16w | +-+---+--+--+--+ | v26-0002| 13796 | 6848 | 5942 | 6315 | +-+---+--+--+--+ | improvement | 9%| 7% | 10% | 8% | +-+---+--+--+--+ | v26-0001| 14685 | 7325 | 6675 | 6719 | +-+---+--+--+--+ | improvement | 3%| 0% | 0% | 2% | +-+---+--+--+--+ | HEAD| 15118 | 7354 | 6644 | 6890 | +-+---+--+--+--+ 2000 inserts/tx +-+---+---+--+--+ | | 2w| 4w| 8w | 16w | +-+---+---+--+--+ | v26-0002| 22442 | 9944 | 6034 | 5829 | +-+---+---+--+--+ | improvement | 5%| 2%| 4% | 10% | +-+---+---+--+--+ | v26-0001| 23632 | 10164 | 6311 | 6480 | +-+---+---+--+--+ | improvement | 0%| 0%| 0% | 0% | +-+---+---+--+--+ | HEAD| 23667 | 10157 | 6285 | 6470 | +-+---+---+--+--+ 5000 inserts/tx +-+---+---+---+--+ | | 2w| 4w| 8w| 16w | +-+---+---+---+--+ | v26-0002| 41443 | 21385 | 10832 | 6146 | +-+---+---+---+--+ | improvement | 0%| 0%| 1%| 16% | +-+---+---+---+--+ | v26-0001| 41293 | 21226 | 10814 | 6158 | +-+---+---+---+--+ | improvement | 0%| 1%| 1%| 15% | +-+---+---+---+--+ | HEAD| 41503 | 21466 | 10943 | 7292 | +-+---+---+---+--+ Again, I couldn't reproduce the cases where you saw significantly degraded performance. I wonder if I'm missing something. Did you do anything not included in the test scripts you shared? Do you think v26-0001 will perform 84% worse than HEAD, if you try again? I just want to be sure that it was not a random thing. Interestingly, I also don't see an improvement in above results as big as in your results when inserts/tx ratio is smaller. Even though it certainly is improved in such cases. Thanks, -- Melih Mutlu Microsoft
Re: [PATCH] Reuse Workers and Replication Slots during Logical Replication
On Fri, 11 Aug 2023 at 16:26, vignesh C wrote: > > On Wed, 9 Aug 2023 at 09:51, vignesh C wrote: > > > > Hi Melih, > > > > Here is a patch to help in getting the execution at various phases > > like: a) replication slot creation time, b) Wal reading c) Number of > > WAL records read d) subscription relation state change etc > > Couple of observation while we tested with this patch: > > 1) We noticed that the patch takes more time for finding the decoding > > start point. > > 2) Another observation was that the number of XLOG records read for > > identify the consistent point was significantly high with the v26_0001 > > patch. > > > > HEAD > > postgres=# select avg(counttime)/1000 "avgtime(ms)", > > median(counttime)/1000 "median(ms)", min(counttime)/1000 > > "mintime(ms)", max(counttime)/1000 "maxtime(ms)", logtype from test > > group by logtype; > > avgtime(ms) | median(ms) | mintime(ms) | > > maxtime(ms) | logtype > > ++-+-+-- > > 0.00579245283018867920 | 0.0020 | 0 | > > 1 | SNAPSHOT_BUILD > > 1.2246811320754717 | 0.9855 | 0 | > >37 | LOGICAL_SLOT_CREATION > >171.0863283018867920 | 183.9120 | 0 | > > 408 | FIND_DECODING_STARTPOINT > > 2.0699433962264151 | 1.4380 | 1 | > >49 | INIT_DECODING_CONTEXT > > (4 rows) > > > > HEAD + v26-0001 patch > > postgres=# select avg(counttime)/1000 "avgtime(ms)", > > median(counttime)/1000 "median(ms)", min(counttime)/1000 > > "mintime(ms)", max(counttime)/1000 "maxtime(ms)", logtype from test > > group by logtype; > > avgtime(ms) | median(ms) | mintime(ms) | > > maxtime(ms) | logtype > > ++-+-+-- > > 0.00588113207547169810 | 0.0050 | 0 | > > 0 | SNAPSHOT_BUILD > > 1.1270962264150943 | 1.1000 | 0 | > > 2 | LOGICAL_SLOT_CREATION > >301.1745528301886790 | 410.4870 | 0 | > > 427 | FIND_DECODING_STARTPOINT > > 1.4814660377358491 | 1.4530 | 1 | > > 9 | INIT_DECODING_CONTEXT > > (4 rows) > > > > In the above FIND_DECODING_STARTPOINT is very much higher with V26-0001 > > patch. > > > > HEAD > > FIND_DECODING_XLOG_RECORD_COUNT > > - average = 2762 > > - median = 3362 > > > > HEAD + reuse worker patch(v26_0001 patch) > > Where FIND_DECODING_XLOG_RECORD_COUNT > > - average = 4105 > > - median = 5345 > > > > Similarly Number of xlog records read is higher with v26_0001 patch. > > > > Steps to calculate the timing: > > -- first collect the necessary LOG from subscriber's log. > > cat *.log | grep -E > > '(LOGICAL_SLOT_CREATION|INIT_DECODING_CONTEXT|FIND_DECODING_STARTPOINT|SNAPSHOT_BUILD|FIND_DECODING_XLOG_RECORD_COUNT|LOGICAL_XLOG_READ|LOGICAL_DECODE_PROCESS_RECORD|LOGICAL_WAIT_TRANSACTION)' > > > grep.dat > > > > create table testv26(logtime varchar, pid varchar, level varchar, > > space varchar, logtype varchar, counttime int); > > -- then copy these datas into db table to count the avg number. > > COPY testv26 FROM '/home/logs/grep.dat' DELIMITER ' '; > > > > -- Finally, use the SQL to analyze the data: > > select avg(counttime)/1000 "avgtime(ms)", logtype from testv26 group by > > logtype; > > > > --- To get the number of xlog records read: > > select avg(counttime) from testv26 where logtype > > ='FIND_DECODING_XLOG_RECORD_COUNT' and counttime != 1; > > > > Thanks to Peter and Hou-san who helped in finding these out. We are > > parallely analysing this, @Melih Mutlu posting this information so > > that it might help you too in analysing this issue. > > I analysed further on why it needs to read a larger number of XLOG > records in some cases while creating the replication slot, here are my > thoughts: > Note: Tablesync worker needs to connect to the publisher and create > consistent point for the slots by reading the XLOG records. This > requires that all the open transactions and the transactions that are > created while creating consistent point should be committed. > I feel the creation of slots is better in few cases in Head because: > Publisher| Subscriber > > Begin txn1 transaction| > Insert 1..1000 records| > Commit | > Begin txn2 transaction| > Insert 1..1000 records | Apply worker applies transaction txn1 > | Start tablesync table t2 > | create consistent point in > | publisher before transaction txn3 is > | started > commit| We just need t
Re: [PATCH] Reuse Workers and Replication Slots during Logical Replication
On Wed, 9 Aug 2023 at 09:51, vignesh C wrote: > > Hi Melih, > > Here is a patch to help in getting the execution at various phases > like: a) replication slot creation time, b) Wal reading c) Number of > WAL records read d) subscription relation state change etc > Couple of observation while we tested with this patch: > 1) We noticed that the patch takes more time for finding the decoding > start point. > 2) Another observation was that the number of XLOG records read for > identify the consistent point was significantly high with the v26_0001 > patch. > > HEAD > postgres=# select avg(counttime)/1000 "avgtime(ms)", > median(counttime)/1000 "median(ms)", min(counttime)/1000 > "mintime(ms)", max(counttime)/1000 "maxtime(ms)", logtype from test > group by logtype; > avgtime(ms) | median(ms) | mintime(ms) | > maxtime(ms) | logtype > ++-+-+-- > 0.00579245283018867920 | 0.0020 | 0 | > 1 | SNAPSHOT_BUILD > 1.2246811320754717 | 0.9855 | 0 | >37 | LOGICAL_SLOT_CREATION >171.0863283018867920 | 183.9120 | 0 | > 408 | FIND_DECODING_STARTPOINT > 2.0699433962264151 | 1.4380 | 1 | >49 | INIT_DECODING_CONTEXT > (4 rows) > > HEAD + v26-0001 patch > postgres=# select avg(counttime)/1000 "avgtime(ms)", > median(counttime)/1000 "median(ms)", min(counttime)/1000 > "mintime(ms)", max(counttime)/1000 "maxtime(ms)", logtype from test > group by logtype; > avgtime(ms) | median(ms) | mintime(ms) | > maxtime(ms) | logtype > ++-+-+-- > 0.00588113207547169810 | 0.0050 | 0 | > 0 | SNAPSHOT_BUILD > 1.1270962264150943 | 1.1000 | 0 | > 2 | LOGICAL_SLOT_CREATION >301.1745528301886790 | 410.4870 | 0 | > 427 | FIND_DECODING_STARTPOINT > 1.4814660377358491 | 1.4530 | 1 | > 9 | INIT_DECODING_CONTEXT > (4 rows) > > In the above FIND_DECODING_STARTPOINT is very much higher with V26-0001 patch. > > HEAD > FIND_DECODING_XLOG_RECORD_COUNT > - average = 2762 > - median = 3362 > > HEAD + reuse worker patch(v26_0001 patch) > Where FIND_DECODING_XLOG_RECORD_COUNT > - average = 4105 > - median = 5345 > > Similarly Number of xlog records read is higher with v26_0001 patch. > > Steps to calculate the timing: > -- first collect the necessary LOG from subscriber's log. > cat *.log | grep -E > '(LOGICAL_SLOT_CREATION|INIT_DECODING_CONTEXT|FIND_DECODING_STARTPOINT|SNAPSHOT_BUILD|FIND_DECODING_XLOG_RECORD_COUNT|LOGICAL_XLOG_READ|LOGICAL_DECODE_PROCESS_RECORD|LOGICAL_WAIT_TRANSACTION)' > > grep.dat > > create table testv26(logtime varchar, pid varchar, level varchar, > space varchar, logtype varchar, counttime int); > -- then copy these datas into db table to count the avg number. > COPY testv26 FROM '/home/logs/grep.dat' DELIMITER ' '; > > -- Finally, use the SQL to analyze the data: > select avg(counttime)/1000 "avgtime(ms)", logtype from testv26 group by > logtype; > > --- To get the number of xlog records read: > select avg(counttime) from testv26 where logtype > ='FIND_DECODING_XLOG_RECORD_COUNT' and counttime != 1; > > Thanks to Peter and Hou-san who helped in finding these out. We are > parallely analysing this, @Melih Mutlu posting this information so > that it might help you too in analysing this issue. I analysed further on why it needs to read a larger number of XLOG records in some cases while creating the replication slot, here are my thoughts: Note: Tablesync worker needs to connect to the publisher and create consistent point for the slots by reading the XLOG records. This requires that all the open transactions and the transactions that are created while creating consistent point should be committed. I feel the creation of slots is better in few cases in Head because: Publisher| Subscriber Begin txn1 transaction| Insert 1..1000 records| Commit | Begin txn2 transaction| Insert 1..1000 records | Apply worker applies transaction txn1 | Start tablesync table t2 | create consistent point in | publisher before transaction txn3 is | started commit| We just need to wait till | transaction txn2 is finished. Begin txn3 transaction| Insert 1..1000 records | commit| In V26, this is happening in some cases: Publisher| Subscriber ---
Re: [PATCH] Reuse Workers and Replication Slots during Logical Replication
On Fri, Aug 11, 2023 at 12:54 AM Melih Mutlu wrote: > > Hi Peter and Vignesh, > > Peter Smith , 7 Ağu 2023 Pzt, 09:25 tarihinde şunu > yazdı: >> >> Hi Melih. >> >> Now that the design#1 ERRORs have been fixed, we returned to doing >> performance measuring of the design#1 patch versus HEAD. > > > Thanks a lot for taking the time to benchmark the patch. It's really helpful. > >> Publisher "busy" table does commit every 1000 inserts: >> 2w 4w 8w 16w >> HEAD 11898 5855 1868 1631 >> HEAD+v24-0002 21905 8254 3531 1626 >> %improvement -84% -41% -89% 0% >> >> >> ^ Note - design#1 was slower than HEAD here >> >> >> ~ >> >> >> Publisher "busy" table does commit every 2000 inserts: >> 2w 4w 8w 16w >> HEAD 21740 7109 3454 1703 >> HEAD+v24-0002 21585 10877 4779 2293 >> %improvement 1% -53% -38% -35% > > > I assume you meant HEAD+v26-0002 and not v24. I wanted to quickly reproduce > these two cases where the patch was significantly worse. Interestingly my > results are a bit different than yours. > No, I meant what I wrote there. When I ran the tests the HEAD included the v25-0001 refactoring patch, but v26 did not yet exist. For now, we are only performance testing the first "Reuse-Tablesyc-Workers" patch, but not yet including the second patch ("Reuse connection when..."). Note that those "Reuse-Tablesyc-Workers" patches v24-0002 and v26-0001 are equivalent because there are only cosmetic log message differences between them. So, my testing was with HEAD+v24-0002 (but not including v24-0003). Your same testing should be with HEAD+v26-0001 (but not including v26-0002). -- Kind Regards, Peter Smith. Fujitsu Australia
Re: [PATCH] Reuse Workers and Replication Slots during Logical Replication
Hi Peter and Vignesh, Peter Smith , 7 Ağu 2023 Pzt, 09:25 tarihinde şunu yazdı: > Hi Melih. > > Now that the design#1 ERRORs have been fixed, we returned to doing > performance measuring of the design#1 patch versus HEAD. Thanks a lot for taking the time to benchmark the patch. It's really helpful. Publisher "busy" table does commit every 1000 inserts: > 2w 4w 8w 16w > HEAD 11898 5855 1868 1631 > HEAD+v24-0002 21905 8254 3531 1626 > %improvement -84% -41% -89% 0% > ^ Note - design#1 was slower than HEAD here > ~ > Publisher "busy" table does commit every 2000 inserts: > 2w 4w 8w 16w > HEAD 21740 7109 3454 1703 > HEAD+v24-0002 21585 10877 4779 2293 > %improvement 1% -53% -38% -35% I assume you meant HEAD+v26-0002 and not v24. I wanted to quickly reproduce these two cases where the patch was significantly worse. Interestingly my results are a bit different than yours. Publisher "busy" table does commit every 1000 inserts: 2w 4w 8w 16w HEAD 22405 10335 5008 3304 HEAD+v26 19954 8037 4068 2761 %improvement 1% 2% 2% 1% Publisher "busy" table does commit every 2000 inserts: 2w 4w 8w 16w HEAD 33122 14220 7251 4279 HEAD+v26 34248 16213 7356 3914 %improvement 0% -1% 0% 1% If I'm not doing something wrong in testing (or maybe the patch doesn't perform reliable yet for some reason), I don't see a drastic change in performance. But I guess the patch is supposed to perform better than HEAD in these both cases anyway. right?. I would expect the performance of the patch to converge to HEAD's performance with large tables. But I'm not sure what to expect when apply worker is busy with large transactions. However, I need to investigate a bit more what Vignesh shared earlier [1]. It makes sense that those issues can cause this problem here. It just takes a bit of time for me to figure out these things, but I'm working on it. [1] https://www.postgresql.org/message-id/CALDaNm1TA068E2niJFUR9ig%2BYz3-ank%3Dj5%3Dj-2UocbzaDnQPrA%40mail.gmail.com Thanks, -- Melih Mutlu Microsoft
Re: [PATCH] Reuse Workers and Replication Slots during Logical Replication
Hi Melih, FYI -- The same testing was repeated but this time PG was configured to say synchronous_commit=on. Other factors and scripts were the same as before --- busy apply, 5 runs, 4 workers, 1000 inserts/tx, 100 empty tables, etc. There are still more xlog records seen for the v26 patch, but now the v26 performance was better than HEAD. RESULTS (synchronous_commit=on) --- Xlog Counts HEAD postgres=# select avg(counttime) "avg", median(counttime) "median", min(counttime) "min", max(counttime) "max", logtype from test_head group by logtype; avg |median | min | max | logtype ---+---+-+--+--- ---+---+-+--+--- ---+---+-+--+--- 1253.7509433962264151 | 1393. | 1 | 2012 | FIND_DECODING_XLOG_RECORD_COUNT (1 row) HEAD+v26-0001 postgres=# select avg(counttime) "avg", median(counttime) "median", min(counttime) "min", max(counttime) "max", logtype from test_v26 group by logtype; avg |median | min | max | logtype ---+---+-+--+--- ---+---+-+--+--- ---+---+-+--+--- 1278.4075471698113208 | 1423.5000 | 1 | 2015 | FIND_DECODING_XLOG_RECORD_COUNT (1 row) ~~ Performance HEAD [peter@localhost res_0809_vignesh_timing_sync_head]$ cat *.dat_SUB | grep RESULT | grep -v duration | awk '{print $3}' 4014.266 3892.089 4195.318 3571.862 4312.183 HEAD+v26-0001 [peter@localhost res_0809_vignesh_timing_sync_v260001]$ cat *.dat_SUB | grep RESULT | grep -v duration | awk '{print $3}' 3326.627 3213.028 3433.611 3299.803 3258.821 -- Kind Regards, Peter Smith. Fujitsu Australia
Re: [PATCH] Reuse Workers and Replication Slots during Logical Replication
On Wed, Aug 9, 2023 at 8:28 AM Zhijie Hou (Fujitsu) wrote: > > On Thursday, August 3, 2023 7:30 PM Melih Mutlu > wrote: > > > Right. I attached the v26 as you asked. > > Thanks for posting the patches. > > While reviewing the patch, I noticed one rare case that it's possible that > there > are two table sync worker for the same table in the same time. > > The patch relies on LogicalRepWorkerLock to prevent concurrent access, but the > apply worker will start a new worker after releasing the lock. So, at the > point[1] > where the lock is released and the new table sync worker has not been started, > it seems possible that another old table sync worker will be reused for the > same table. > > /* Now safe to release the LWLock */ > LWLockRelease(LogicalRepWorkerLock); > *[1] > /* > * If there are free sync worker slot(s), > start a new sync > * worker for the table. > */ > if (nsyncworkers < > max_sync_workers_per_subscription) > ... > > logicalrep_worker_launch(MyLogicalRepWorker->dbid, > Yeah, this is a problem. I think one idea to solve this is by extending the lock duration till we launch the tablesync worker but we should also consider changing this locking scheme such that there is a better way to indicate that for a particular rel, tablesync is in progress. Currently, the code in TablesyncWorkerMain() also acquires the lock in exclusive mode even though the tablesync for a rel is in progress which I guess could easily heart us for larger values of max_logical_replication_workers. So, that could be another motivation to think for a different locking scheme. -- With Regards, Amit Kapila.
Re: [PATCH] Reuse Workers and Replication Slots during Logical Replication
Hi Melih, Here is a patch to help in getting the execution at various phases like: a) replication slot creation time, b) Wal reading c) Number of WAL records read d) subscription relation state change etc Couple of observation while we tested with this patch: 1) We noticed that the patch takes more time for finding the decoding start point. 2) Another observation was that the number of XLOG records read for identify the consistent point was significantly high with the v26_0001 patch. HEAD postgres=# select avg(counttime)/1000 "avgtime(ms)", median(counttime)/1000 "median(ms)", min(counttime)/1000 "mintime(ms)", max(counttime)/1000 "maxtime(ms)", logtype from test group by logtype; avgtime(ms) | median(ms) | mintime(ms) | maxtime(ms) | logtype ++-+-+-- 0.00579245283018867920 | 0.0020 | 0 | 1 | SNAPSHOT_BUILD 1.2246811320754717 | 0.9855 | 0 | 37 | LOGICAL_SLOT_CREATION 171.0863283018867920 | 183.9120 | 0 | 408 | FIND_DECODING_STARTPOINT 2.0699433962264151 | 1.4380 | 1 | 49 | INIT_DECODING_CONTEXT (4 rows) HEAD + v26-0001 patch postgres=# select avg(counttime)/1000 "avgtime(ms)", median(counttime)/1000 "median(ms)", min(counttime)/1000 "mintime(ms)", max(counttime)/1000 "maxtime(ms)", logtype from test group by logtype; avgtime(ms) | median(ms) | mintime(ms) | maxtime(ms) | logtype ++-+-+-- 0.00588113207547169810 | 0.0050 | 0 | 0 | SNAPSHOT_BUILD 1.1270962264150943 | 1.1000 | 0 | 2 | LOGICAL_SLOT_CREATION 301.1745528301886790 | 410.4870 | 0 | 427 | FIND_DECODING_STARTPOINT 1.4814660377358491 | 1.4530 | 1 | 9 | INIT_DECODING_CONTEXT (4 rows) In the above FIND_DECODING_STARTPOINT is very much higher with V26-0001 patch. HEAD FIND_DECODING_XLOG_RECORD_COUNT - average = 2762 - median = 3362 HEAD + reuse worker patch(v26_0001 patch) Where FIND_DECODING_XLOG_RECORD_COUNT - average = 4105 - median = 5345 Similarly Number of xlog records read is higher with v26_0001 patch. Steps to calculate the timing: -- first collect the necessary LOG from subscriber's log. cat *.log | grep -E '(LOGICAL_SLOT_CREATION|INIT_DECODING_CONTEXT|FIND_DECODING_STARTPOINT|SNAPSHOT_BUILD|FIND_DECODING_XLOG_RECORD_COUNT|LOGICAL_XLOG_READ|LOGICAL_DECODE_PROCESS_RECORD|LOGICAL_WAIT_TRANSACTION)' > grep.dat create table testv26(logtime varchar, pid varchar, level varchar, space varchar, logtype varchar, counttime int); -- then copy these datas into db table to count the avg number. COPY testv26 FROM '/home/logs/grep.dat' DELIMITER ' '; -- Finally, use the SQL to analyze the data: select avg(counttime)/1000 "avgtime(ms)", logtype from testv26 group by logtype; --- To get the number of xlog records read: select avg(counttime) from testv26 where logtype ='FIND_DECODING_XLOG_RECORD_COUNT' and counttime != 1; Thanks to Peter and Hou-san who helped in finding these out. We are parallely analysing this, @Melih Mutlu posting this information so that it might help you too in analysing this issue. Regards, Vignesh From b755cab38ff76e9f63304b2d8f344cb098ca6a33 Mon Sep 17 00:00:00 2001 From: Hou Zhijie Date: Fri, 4 Aug 2023 17:57:29 +0800 Subject: [PATCH v1 1/2] count state change time --- src/backend/replication/logical/tablesync.c | 28 + 1 file changed, 28 insertions(+) diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index 651a775065..0d9298f7b3 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -123,6 +123,10 @@ #include "utils/syscache.h" #include "utils/usercontext.h" +static TimestampTz start = 0; +static long secs = 0; +static int microsecs = 0; + static bool table_states_valid = false; static List *table_states_not_ready = NIL; static bool FetchTableStates(bool *started_tx); @@ -338,6 +342,11 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn) ReplicationSlotDropAtPubNode(LogRepWorkerWalRcvConn, syncslotname, false); CommitTransactionCommand(); + + TimestampDifference(start, GetCurrentTimestamp(), &secs, µsecs); + elog(LOG, "SUBREL_STATE_SYNCDONE %d", ((int) secs * 100 + microsecs)); + start = GetCurrentTimestamp(); + pgstat_report_stat(false); /* @@ -1258,6 +1267,8 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) bool must_use_password; bool run_as_owner; + start = GetCurrentTimestamp(); + /* Check the state of the table synchronization. */ StartTransactionCommand(); relstate = GetSubscriptionRelState(MyLogicalRepWorker->subid, @@
RE: [PATCH] Reuse Workers and Replication Slots during Logical Replication
On Thursday, August 3, 2023 7:30 PM Melih Mutlu wrote: > Right. I attached the v26 as you asked. Thanks for posting the patches. While reviewing the patch, I noticed one rare case that it's possible that there are two table sync worker for the same table in the same time. The patch relies on LogicalRepWorkerLock to prevent concurrent access, but the apply worker will start a new worker after releasing the lock. So, at the point[1] where the lock is released and the new table sync worker has not been started, it seems possible that another old table sync worker will be reused for the same table. /* Now safe to release the LWLock */ LWLockRelease(LogicalRepWorkerLock); *[1] /* * If there are free sync worker slot(s), start a new sync * worker for the table. */ if (nsyncworkers < max_sync_workers_per_subscription) ... logicalrep_worker_launch(MyLogicalRepWorker->dbid, I can reproduce it by using gdb. Steps: 1. set max_sync_workers_per_subscription to 1 and setup pub/sub which publishes two tables(table A and B). 2. when the table sync worker for the table A started, use gdb to block it before being reused for another table. 3. set max_sync_workers_per_subscription to 2 and use gdb to block the apply worker at the point after releasing the LogicalRepWorkerLock and before starting another table sync worker for table B. 4. release the blocked table sync worker, then we can see the table sync worker is also reused for table B. 5. release the apply worker, then we can see the apply worker will start another table sync worker for the same table(B). I think it would be better to prevent this case from happening as this case will give some unexpected ERROR or LOG. Note that I haven't checked if it would cause worse problems like duplicate copy or others. Best Regards, Hou zj
Re: [PATCH] Reuse Workers and Replication Slots during Logical Replication
Hi Melih. Now that the design#1 ERRORs have been fixed, we returned to doing performance measuring of the design#1 patch versus HEAD. Unfortunately, we observed that under some particular conditions (large transactions of 1000 inserts/tx for a busy apply worker, 100 empty tables to be synced) the performance was worse with the design#1 patch applied. ~~ RESULTS Below are some recent measurements (for 100 empty tables to be synced when apply worker is already busy). We vary the size of the published transaction for the "busy" table, and you can see that for certain large transaction sizes (1000 and 2000 inserts/tx) the design#1 performance was worse than HEAD: ~ The publisher "busy" table does commit every 10 inserts: 2w 4w 8w 16w HEAD 3945 1138 1166 1205 HEAD+v24-0002 3559 886 355 490 %improvement 10% 22% 70% 59% ~ The publisher "busy" table does commit every 100 inserts: 2w 4w 8w 16w HEAD 2363 1357 1354 1355 HEAD+v24-0002 2077 1358 762 756 %improvement 12% 0% 44% 44% ~ Publisher "busy" table does commit every 1000 inserts: 2w 4w 8w 16w HEAD 11898 5855 1868 1631 HEAD+v24-0002 21905 8254 3531 1626 %improvement -84% -41% -89% 0% ^ Note - design#1 was slower than HEAD here ~ Publisher "busy" table does commit every 2000 inserts: 2w 4w 8w 16w HEAD 21740 7109 3454 1703 HEAD+v24-0002 21585 10877 4779 2293 %improvement 1% -53% -38% -35% ^ Note - design#1 was slower than HEAD here ~ The publisher "busy" table does commit every 5000 inserts: 2w 4w 8w 16w HEAD 36094 18105 8595 3567 HEAD+v24-0002 36305 18199 8151 3710 %improvement -1% -1% 5% -4% ~ The publisher "busy" table does commit every 1 inserts: 2w 4w 8w 16w HEAD 38077 18406 9426 5559 HEAD+v24-0002 36763 18027 8896 4166 %improvement 3% 2% 6% 25% -- TEST SCRIPTS The "busy apply" test scripts are basically the same as already posted [1], but I have reattached the latest ones again anyway. -- [1] https://www.postgresql.org/message-id/CAHut%2BPuNVNK2%2BA%2BR6eV8rKPNBHemCFE4NDtEYfpXbYr6SsvvBg%40mail.gmail.com Kind Regards, Peter Smith. Fujitsu Australia #!/bin/bash # # SUB # # First argument : number of tables # Second argument : size[Byte] of each tables # Third argument : max_sync_workers # Fourth argument : execution numbers # port_pub=5431 data_pub=datapub port_sub=5432 data_sub=datasub echo '' echo '# Check configurations #' echo '' declare num_tables if [ -n "$1" ]; then num_tables=$1 else num_tables=10 fi echo "$num_tables tables will be used while testing" declare table_size if [ -n "$2" ]; then table_size=$2 else table_size=0 fi num_sync_workers=$3 run_no=$4 # # Convert from table_size to number of tuples. The equation was # found by my tests... # declare num_tuples if [ $table_size == "10kB" ] then num_tuples=3250 else num_tuples=0 fi echo "$num_tuples tuples will be inserted to each tables" echo '##' echo '# IPC at subscriber-side #' echo '##' psql -U postgres -p $port_sub -a -c "CREATE SUBSCRIPTION ipc_from_publisher CONNECTION 'host=localhost user=postgres port=$port_pub' PUBLICATION ipc_at_publisher WITH(origin=NONE);" psql -U postgres -p $port_sub -a -c "CREATE PUBLICATION ipc_at_subscriber FOR TABLE ipc;" # wait a bit for the publisher-side to connect to this publication sleep 5s psql -U postgres -p $port_sub -a -c "INSERT INTO ipc VALUES('sub ipc ready');" psql -U postgres -p $port_sub -a -c "CALL ipc_wait_for('pub ipc ready');" echo '#' echo '# Create tables #' echo '#' ( echo "CREATE TABLE busy_tbl(a text);" echo "CREATE SCHEMA test_tables;" echo -e "SELECT 'CREATE TABLE test_tables.manytables_'||i||'(i int);' FROM generate_series(1, $num_tables) g(i) \gexec" ) | psql -U postgres -p $port_sub -a echo '##' echo '# Create subscription for busy table #' echo '##' ( echo "CREATE SUBSCRIPTION mysub CONNECTION 'host=localhost user=postgres port=$port_pub' PUBLICATION mypub;" echo "INSERT INTO ipc VALUES ('mysub is created');" ) | psql -U postgres -p $port_sub -a echo '' echo '# Test #' echo '' ( echo -e "CREATE OR REPLACE PROCEDURE log_rep_test(max INTEGER) AS \$\$ DECLARE total_duration INTERVAL := '0'; avg_duration FLOAT := 0.0; start_time TIMESTAMP; end_time TIMESTAMP; BEGIN start_time := clock_timestamp(); -- time how long it takes for all the tablesyncs to become "ready" WHILE EXISTS (SELECT 1 FROM pg_subscription_rel WHERE srsubstate != 'r') LOOP COMMIT; END LOOP; end_time := clock_timestamp(); total_duration := total_duration + (end_time - start_time); IF max > 0 THEN avg_duration := EXTRACT(EPOCH FROM total_duration) / max * 1000; END IF; RAISE NOTICE 'RESULT: %', avg_duration;
Re: [PATCH] Reuse Workers and Replication Slots during Logical Replication
FWIW, I confirmed that my review comments for v22* have all been addressed in the latest v26* patches. Thanks! -- Kind Regards, Peter Smith. Fujitsu Australia
Re: [PATCH] Reuse Workers and Replication Slots during Logical Replication
Hi, Amit Kapila , 3 Ağu 2023 Per, 09:22 tarihinde şunu yazdı: > On Thu, Aug 3, 2023 at 9:35 AM Peter Smith wrote: > > I checked the latest patch v25-0001. > > > > LGTM. > > > > Thanks, I have pushed 0001. Let's focus on the remaining patches. > Thanks! Peter Smith , 3 Ağu 2023 Per, 12:06 tarihinde şunu yazdı: > Just to clarify my previous post, I meant we will need new v26* patches > Right. I attached the v26 as you asked. Thanks, -- Melih Mutlu Microsoft v26-0001-Reuse-Tablesync-Workers.patch Description: Binary data v26-0002-Reuse-connection-when-tablesync-workers-change-t.patch Description: Binary data
Re: [PATCH] Reuse Workers and Replication Slots during Logical Replication
Just to clarify my previous post, I meant we will need new v26* patches v24-0001 -> not needed because v25-0001 pushed v24-0002 -> v26-0001 v24-0003 -> v26-0002 On Thu, Aug 3, 2023 at 6:19 PM Peter Smith wrote: > > Hi Melih, > > Now that v25-0001 has been pushed, can you please rebase the remaining > patches? > > -- > Kind Regards, > Peter Smith. > Fujitsu Australia
Re: [PATCH] Reuse Workers and Replication Slots during Logical Replication
Hi Melih, Now that v25-0001 has been pushed, can you please rebase the remaining patches? -- Kind Regards, Peter Smith. Fujitsu Australia
Re: [PATCH] Reuse Workers and Replication Slots during Logical Replication
On Thu, Aug 3, 2023 at 9:35 AM Peter Smith wrote: > > On Wed, Aug 2, 2023 at 11:19 PM Amit Kapila wrote: > > > > On Wed, Aug 2, 2023 at 4:09 PM Melih Mutlu wrote: > > > > > > PFA an updated version with some of the earlier reviews addressed. > > > Forgot to include them in the previous email. > > > > > > > It is always better to explicitly tell which reviews are addressed but > > anyway, I have done some minor cleanup in the 0001 patch including > > removing includes which didn't seem necessary, modified a few > > comments, and ran pgindent. I also thought of modifying some variable > > names based on suggestions by Peter Smith in an email [1] but didn't > > find many of them any better than the current ones so modified just a > > few of those. If you guys are okay with this then let's commit it and > > then we can focus more on the remaining patches. > > > > I checked the latest patch v25-0001. > > LGTM. > Thanks, I have pushed 0001. Let's focus on the remaining patches. -- With Regards, Amit Kapila.
Re: [PATCH] Reuse Workers and Replication Slots during Logical Replication
On Wed, Aug 2, 2023 at 11:19 PM Amit Kapila wrote: > > On Wed, Aug 2, 2023 at 4:09 PM Melih Mutlu wrote: > > > > PFA an updated version with some of the earlier reviews addressed. > > Forgot to include them in the previous email. > > > > It is always better to explicitly tell which reviews are addressed but > anyway, I have done some minor cleanup in the 0001 patch including > removing includes which didn't seem necessary, modified a few > comments, and ran pgindent. I also thought of modifying some variable > names based on suggestions by Peter Smith in an email [1] but didn't > find many of them any better than the current ones so modified just a > few of those. If you guys are okay with this then let's commit it and > then we can focus more on the remaining patches. > I checked the latest patch v25-0001. LGTM. ~~ BTW, I have re-tested many cases of HEAD versus HEAD+v25-0001 (using current test scripts previously mentioned in this thread). Because v25-0001 is only a refactoring patch we expect that the results should be the same as for HEAD, and that is what I observed. -- Kind Regards, Peter Smith. Fujitsu Australia
Re: [PATCH] Reuse Workers and Replication Slots during Logical Replication
On Wed, Aug 2, 2023 at 4:09 PM Melih Mutlu wrote: > > PFA an updated version with some of the earlier reviews addressed. > Forgot to include them in the previous email. > It is always better to explicitly tell which reviews are addressed but anyway, I have done some minor cleanup in the 0001 patch including removing includes which didn't seem necessary, modified a few comments, and ran pgindent. I also thought of modifying some variable names based on suggestions by Peter Smith in an email [1] but didn't find many of them any better than the current ones so modified just a few of those. If you guys are okay with this then let's commit it and then we can focus more on the remaining patches. [1] - https://www.postgresql.org/message-id/CAHut%2BPs3Du9JFmhecWY8%2BVFD11VLOkSmB36t_xWHHQJNMpdA-A%40mail.gmail.com -- With Regards, Amit Kapila. v25-0001-Refactor-to-split-Apply-and-Tablesync-Workers.patch Description: Binary data
Re: [PATCH] Reuse Workers and Replication Slots during Logical Replication
Hi, > PFA an updated version with some of the earlier reviews addressed. Forgot to include them in the previous email. Thanks, -- Melih Mutlu Microsoft v24-0003-Reuse-connection-when-tablesync-workers-change-t.patch Description: Binary data v24-0002-Reuse-Tablesync-Workers.patch Description: Binary data v24-0001-Refactor-to-split-Apply-and-Tablesync-Workers.patch Description: Binary data
Re: [PATCH] Reuse Workers and Replication Slots during Logical Replication
Hi, Amit Kapila , 2 Ağu 2023 Çar, 12:01 tarihinde şunu yazdı: > I think we are getting the error (ERROR: could not find logical > decoding starting point) because we wouldn't have waited for WAL to > become available before reading it. It could happen due to the > following code: > WalSndWaitForWal() > { > ... > if (streamingDoneReceiving && streamingDoneSending && > !pq_is_send_pending()) > break; > .. > } > > Now, it seems that in 0003 patch, instead of resetting flags > streamingDoneSending, and streamingDoneReceiving before start > replication, we should reset before create logical slots because we > need to read the WAL during that time as well to find the consistent > point. > Thanks for the suggestion Amit. I've been looking into this recently and couldn't figure out the cause until now. I quickly made the fix in 0003. Seems like it resolved the "could not find logical decoding starting point" errors. vignesh C , 1 Ağu 2023 Sal, 09:32 tarihinde şunu yazdı: > I agree that "no copy in progress issue" issue has nothing to do with > 0001 patch. This issue is present with the 0002 patch. > In the case when the tablesync worker has to apply the transactions > after the table is synced, the tablesync worker sends the feedback of > writepos, applypos and flushpos which results in "No copy in progress" > error as the stream has ended already. Fixed it by exiting the > streaming loop if the tablesync worker is done with the > synchronization. The attached 0004 patch has the changes for the same. > The rest of v22 patches are the same patch that were posted by Melih > in the earlier mail. Thanks for the fix. I placed it into 0002 with a slight change as follows: - send_feedback(last_received, false, false); > + if (!MyLogicalRepWorker->relsync_completed) > + send_feedback(last_received, false, false); IMHO relsync_completed means simply the same with streaming_done, that's why I wanted to check that flag instead of an additional goto statement. Does it make sense to you as well? Thanks, -- Melih Mutlu Microsoft v23-0002-Reuse-Tablesync-Workers.patch Description: Binary data v23-0001-Refactor-to-split-Apply-and-Tablesync-Workers.patch Description: Binary data v23-0003-Reuse-connection-when-tablesync-workers-change-t.patch Description: Binary data
Re: [PATCH] Reuse Workers and Replication Slots during Logical Replication
On Tue, Aug 1, 2023 at 9:44 AM Peter Smith wrote: > > > FYI, here is some more information about ERRORs seen. > > The patches were re-tested -- applied in stages (and also against the > different scripts) to identify where the problem was introduced. Below > are the observations: > > ~~~ > > Using original test scripts > > 1. Using only patch v21-0001 > - no errors > > 2. Using only patch v21-0001+0002 > - no errors > > 3. Using patch v21-0001+0002+0003 > - no errors > > ~~~ > > Using the "busy loop" test scripts for long transactions > > 1. Using only patch v21-0001 > - no errors > > 2. Using only patch v21-0001+0002 > - gives errors for "no copy in progress issue" > e.g. ERROR: could not send data to WAL stream: no COPY in progress > > 3. Using patch v21-0001+0002+0003 > - gives the same "no copy in progress issue" errors as above > e.g. ERROR: could not send data to WAL stream: no COPY in progress > - and also gives slot consistency point errors > e.g. ERROR: could not create replication slot > "pg_16700_sync_16514_7261998170966054867": ERROR: could not find > logical decoding starting point > e.g. LOG: could not drop replication slot > "pg_16700_sync_16454_7261998170966054867" on publisher: ERROR: > replication slot "pg_16700_sync_16454_7261998170966054867" does not > exist > I think we are getting the error (ERROR: could not find logical decoding starting point) because we wouldn't have waited for WAL to become available before reading it. It could happen due to the following code: WalSndWaitForWal() { ... if (streamingDoneReceiving && streamingDoneSending && !pq_is_send_pending()) break; .. } Now, it seems that in 0003 patch, instead of resetting flags streamingDoneSending, and streamingDoneReceiving before start replication, we should reset before create logical slots because we need to read the WAL during that time as well to find the consistent point. -- With Regards, Amit Kapila.
Re: [PATCH] Reuse Workers and Replication Slots during Logical Replication
On Tue, 1 Aug 2023 at 09:44, Peter Smith wrote: > > On Fri, Jul 28, 2023 at 5:22 PM Peter Smith wrote: > > > > Hi Melih, > > > > BACKGROUND > > -- > > > > We wanted to compare performance for the 2 different reuse-worker > > designs, when the apply worker is already busy handling other > > replications, and then simultaneously the test table tablesyncs are > > occurring. > > > > To test this scenario, some test scripts were written (described > > below). For comparisons, the scripts were then run using a build of > > HEAD; design #1 (v21); design #2 (0718). > > > > HOW THE TEST WORKS > > -- > > > > Overview: > > 1. The apply worker is made to subscribe to a 'busy_tbl'. > > 2. After the SUBSCRIPTION is created, the publisher-side then loops > > (forever) doing INSERTS into that busy_tbl. > > 3. While the apply worker is now busy, the subscriber does an ALTER > > SUBSCRIPTION REFRESH PUBLICATION to subscribe to all the other test > > tables. > > 4. We time how long it takes for all tablsyncs to complete > > 5. Repeat above for different numbers of empty tables (10, 100, 1000, > > 2000) and different numbers of sync workers (2, 4, 8, 16) > > > > Scripts > > --- > > > > (PSA 4 scripts to implement this logic) > > > > testrun script > > - this does common setup (do_one_test_setup) and then the pub/sub > > scripts (do_one_test_PUB and do_one_test_SUB -- see below) are run in > > parallel > > - repeat 10 times > > > > do_one_test_setup script > > - init and start instances > > - ipc setup tables and procedures > > > > do_one_test_PUB script > > - ipc setup pub/sub > > - table setup > > - publishes the "busy_tbl", but then waits for the subscriber to > > subscribe to only this one > > - alters the publication to include all other tables (so subscriber > > will see these only after the ALTER SUBSCRIPTION PUBLICATION REFRESH) > > - enter a busy INSERT loop until it informed by the subscriber that > > the test is finished > > > > do_one_test_SUB script > > - ipc setup pub/sub > > - table setup > > - subscribes only to "busy_tbl", then informs the publisher when that > > is done (this will cause the publisher to commence the stay_busy loop) > > - after it knows the publishing busy loop has started it does > > - ALTER SUBSCRIPTION REFRESH PUBLICATION > > - wait until all the tablesyncs are ready <=== This is the part that > > is timed for the test RESULT > > > > PROBLEM > > --- > > > > Looking at the output files (e.g. *.dat_PUB and *.dat_SUB) they seem > > to confirm the tests are working how we wanted. > > > > Unfortunately, there is some slot problem for the patched builds (both > > designs #1 and #2). e.g. Search "ERROR" in the *.log files and see > > many slot-related errors. > > > > Please note - running these same scripts with HEAD build gave no such > > errors. So it appears to be a patch problem. > > > > Hi > > FYI, here is some more information about ERRORs seen. > > The patches were re-tested -- applied in stages (and also against the > different scripts) to identify where the problem was introduced. Below > are the observations: > > ~~~ > > Using original test scripts > > 1. Using only patch v21-0001 > - no errors > > 2. Using only patch v21-0001+0002 > - no errors > > 3. Using patch v21-0001+0002+0003 > - no errors > > ~~~ > > Using the "busy loop" test scripts for long transactions > > 1. Using only patch v21-0001 > - no errors > > 2. Using only patch v21-0001+0002 > - gives errors for "no copy in progress issue" > e.g. ERROR: could not send data to WAL stream: no COPY in progress > > 3. Using patch v21-0001+0002+0003 > - gives the same "no copy in progress issue" errors as above > e.g. ERROR: could not send data to WAL stream: no COPY in progress > - and also gives slot consistency point errors > e.g. ERROR: could not create replication slot > "pg_16700_sync_16514_7261998170966054867": ERROR: could not find > logical decoding starting point > e.g. LOG: could not drop replication slot > "pg_16700_sync_16454_7261998170966054867" on publisher: ERROR: > replication slot "pg_16700_sync_16454_7261998170966054867" does not > exist I agree that "no copy in progress issue" issue has nothing to do with 0001 patch. This issue is present with the 0002 patch. In the case when the tablesync worker has to apply the transactions after the table is synced, the tablesync worker sends the feedback of writepos, applypos and flushpos which results in "No copy in progress" error as the stream has ended already. Fixed it by exiting the streaming loop if the tablesync worker is done with the synchronization. The attached 0004 patch has the changes for the same. The rest of v22 patches are the same patch that were posted by Melih in the earlier mail. Regards, Vignesh From bd18bd59be0a263cb3385353e73ec25542bdeff2 Mon Sep 17 00:00:00 2001 From: Melih Mutlu Date: Tue, 4 Jul 2023 22:04:46 +0300 Subject: [PATCH v22 2/3] Reuse Tablesync Workers Before this patch, tablesync workers were capa
Re: [PATCH] Reuse Workers and Replication Slots during Logical Replication
On Fri, Jul 28, 2023 at 5:22 PM Peter Smith wrote: > > Hi Melih, > > BACKGROUND > -- > > We wanted to compare performance for the 2 different reuse-worker > designs, when the apply worker is already busy handling other > replications, and then simultaneously the test table tablesyncs are > occurring. > > To test this scenario, some test scripts were written (described > below). For comparisons, the scripts were then run using a build of > HEAD; design #1 (v21); design #2 (0718). > > HOW THE TEST WORKS > -- > > Overview: > 1. The apply worker is made to subscribe to a 'busy_tbl'. > 2. After the SUBSCRIPTION is created, the publisher-side then loops > (forever) doing INSERTS into that busy_tbl. > 3. While the apply worker is now busy, the subscriber does an ALTER > SUBSCRIPTION REFRESH PUBLICATION to subscribe to all the other test > tables. > 4. We time how long it takes for all tablsyncs to complete > 5. Repeat above for different numbers of empty tables (10, 100, 1000, > 2000) and different numbers of sync workers (2, 4, 8, 16) > > Scripts > --- > > (PSA 4 scripts to implement this logic) > > testrun script > - this does common setup (do_one_test_setup) and then the pub/sub > scripts (do_one_test_PUB and do_one_test_SUB -- see below) are run in > parallel > - repeat 10 times > > do_one_test_setup script > - init and start instances > - ipc setup tables and procedures > > do_one_test_PUB script > - ipc setup pub/sub > - table setup > - publishes the "busy_tbl", but then waits for the subscriber to > subscribe to only this one > - alters the publication to include all other tables (so subscriber > will see these only after the ALTER SUBSCRIPTION PUBLICATION REFRESH) > - enter a busy INSERT loop until it informed by the subscriber that > the test is finished > > do_one_test_SUB script > - ipc setup pub/sub > - table setup > - subscribes only to "busy_tbl", then informs the publisher when that > is done (this will cause the publisher to commence the stay_busy loop) > - after it knows the publishing busy loop has started it does > - ALTER SUBSCRIPTION REFRESH PUBLICATION > - wait until all the tablesyncs are ready <=== This is the part that > is timed for the test RESULT > > PROBLEM > --- > > Looking at the output files (e.g. *.dat_PUB and *.dat_SUB) they seem > to confirm the tests are working how we wanted. > > Unfortunately, there is some slot problem for the patched builds (both > designs #1 and #2). e.g. Search "ERROR" in the *.log files and see > many slot-related errors. > > Please note - running these same scripts with HEAD build gave no such > errors. So it appears to be a patch problem. > Hi FYI, here is some more information about ERRORs seen. The patches were re-tested -- applied in stages (and also against the different scripts) to identify where the problem was introduced. Below are the observations: ~~~ Using original test scripts 1. Using only patch v21-0001 - no errors 2. Using only patch v21-0001+0002 - no errors 3. Using patch v21-0001+0002+0003 - no errors ~~~ Using the "busy loop" test scripts for long transactions 1. Using only patch v21-0001 - no errors 2. Using only patch v21-0001+0002 - gives errors for "no copy in progress issue" e.g. ERROR: could not send data to WAL stream: no COPY in progress 3. Using patch v21-0001+0002+0003 - gives the same "no copy in progress issue" errors as above e.g. ERROR: could not send data to WAL stream: no COPY in progress - and also gives slot consistency point errors e.g. ERROR: could not create replication slot "pg_16700_sync_16514_7261998170966054867": ERROR: could not find logical decoding starting point e.g. LOG: could not drop replication slot "pg_16700_sync_16454_7261998170966054867" on publisher: ERROR: replication slot "pg_16700_sync_16454_7261998170966054867" does not exist -- Kind Regards, Peter Smith. Fujitsu Australia
Re: [PATCH] Reuse Workers and Replication Slots during Logical Replication
On Thu, Jul 27, 2023 at 11:30 PM Melih Mutlu wrote: > > Hi Peter, > > Peter Smith , 26 Tem 2023 Çar, 07:40 tarihinde şunu > yazdı: >> >> Here are some comments for patch v22-0001. >> >> == >> 1. General -- naming conventions >> >> There is quite a lot of inconsistency with variable/parameter naming >> styles in this patch. I understand in most cases the names are copied >> unchanged from the original functions. Still, since this is a big >> refactor anyway, it can also be a good opportunity to clean up those >> inconsistencies instead of just propagating them to different places. >> IIUC, the usual reluctance to rename things because it would cause >> backpatch difficulties doesn't apply here (since everything is being >> refactored anyway). >> >> E.g. Consider using use snake_case names more consistently in the >> following places: > > > I can simply change the places you mentioned, that seems okay to me. > The reason why I did not change the namings in existing variables/functions > is because I did (and still do) not get what's the naming conventions in > those files. Is snake_case the convention for variables in those files (or in > general)? > TBH, I also don't know if there is a specific Postgres coding guideline to use snake_case or not (and Chat-GPT did not know either when I asked about it). I only assumed snake_case in my previous review comment because the mentioned vars were already all lowercase. Anyway, the point was that whatever style is chosen, it ought to be used *consistently* because having a random mixture of styles in the same function (e.g. worker_slot, originname, origin_startpos, myslotname, options, server_version) seems messy. Meanwhile, I think Amit suggested [1] that for now, we only need to worry about the name consistency in new code. >> 2. SetupApplyOrSyncWorker >> >> -ApplyWorkerMain(Datum main_arg) >> +SetupApplyOrSyncWorker(int worker_slot) >> { >> - int worker_slot = DatumGetInt32(main_arg); >> - char originname[NAMEDATALEN]; >> - XLogRecPtr origin_startpos = InvalidXLogRecPtr; >> - char*myslotname = NULL; >> - WalRcvStreamOptions options; >> - int server_version; >> - >> - InitializingApplyWorker = true; >> - >> /* Attach to slot */ >> logicalrep_worker_attach(worker_slot); >> >> + Assert(am_tablesync_worker() || am_leader_apply_worker()); >> + >> >> Why is the Assert not the very first statement of this function? > > > I would also prefer to assert in the very beginning but am_tablesync_worker > and am_leader_apply_worker require MyLogicalRepWorker to be not NULL. And > MyLogicalRepWorker is assigned in logicalrep_worker_attach. I can change this > if you think there is a better way to check the worker type. > I see. In that case your Assert LGTM. -- [1] https://www.postgresql.org/message-id/CAA4eK1%2Bh9hWDAKupsoiw556xqh7uvj_F1pjFJc4jQhL89HdGww%40mail.gmail.com Kind Regards, Peter Smith. Fujitsu Australia
Re: [PATCH] Reuse Workers and Replication Slots during Logical Replication
Hi Peter, Peter Smith , 26 Tem 2023 Çar, 07:40 tarihinde şunu yazdı: > Here are some comments for patch v22-0001. > > == > 1. General -- naming conventions > > There is quite a lot of inconsistency with variable/parameter naming > styles in this patch. I understand in most cases the names are copied > unchanged from the original functions. Still, since this is a big > refactor anyway, it can also be a good opportunity to clean up those > inconsistencies instead of just propagating them to different places. > IIUC, the usual reluctance to rename things because it would cause > backpatch difficulties doesn't apply here (since everything is being > refactored anyway). > > E.g. Consider using use snake_case names more consistently in the > following places: > I can simply change the places you mentioned, that seems okay to me. The reason why I did not change the namings in existing variables/functions is because I did (and still do) not get what's the naming conventions in those files. Is snake_case the convention for variables in those files (or in general)? 2. SetupApplyOrSyncWorker > > -ApplyWorkerMain(Datum main_arg) > +SetupApplyOrSyncWorker(int worker_slot) > { > - int worker_slot = DatumGetInt32(main_arg); > - char originname[NAMEDATALEN]; > - XLogRecPtr origin_startpos = InvalidXLogRecPtr; > - char*myslotname = NULL; > - WalRcvStreamOptions options; > - int server_version; > - > - InitializingApplyWorker = true; > - > /* Attach to slot */ > logicalrep_worker_attach(worker_slot); > > + Assert(am_tablesync_worker() || am_leader_apply_worker()); > + > > Why is the Assert not the very first statement of this function? > I would also prefer to assert in the very beginning but am_tablesync_worker and am_leader_apply_worker require MyLogicalRepWorker to be not NULL. And MyLogicalRepWorker is assigned in logicalrep_worker_attach. I can change this if you think there is a better way to check the worker type. Thanks, -- Melih Mutlu Microsoft
Re: [PATCH] Reuse Workers and Replication Slots during Logical Replication
On Thu, Jul 27, 2023 at 6:46 AM Peter Smith wrote: > > Here are some review comments for v22-0003 > > == > > 1. ApplicationNameForTablesync > +/* > + * Determine the application_name for tablesync workers. > + * > + * Previously, the replication slot name was used as application_name. Since > + * it's possible to reuse tablesync workers now, a tablesync worker can > handle > + * several different replication slots during its lifetime. Therefore, we > + * cannot use the slot name as application_name anymore. Instead, the slot > + * number of the tablesync worker is used as a part of the application_name. > + * > + * FIXME: if the tablesync worker starts to reuse the replication slot during > + * synchronization, we should again use the replication slot name as > + * application_name. > + */ > +static void > +ApplicationNameForTablesync(Oid suboid, int worker_slot, > + char *application_name, Size szapp) > +{ > + snprintf(application_name, szapp, "pg_%u_sync_%i_" UINT64_FORMAT, suboid, > + worker_slot, GetSystemIdentifier()); > +} > > 1a. > The intent of the "FIXME" comment was not clear. Is this some existing > problem that needs addressing, or is this really more like just an > "XXX" warning/note for the future, in case the tablesync logic > changes? > This seems to be a Note for the future, so better to use XXX notation here. > ~ > > 1b. > Since this is a new function, should it be named according to the > convention for static functions? > > e.g. > ApplicationNameForTablesync -> app_name_for_tablesync > I think for now let's follow the style for similar functions like ReplicationOriginNameForLogicalRep() and ReplicationSlotNameForTablesync(). -- With Regards, Amit Kapila.
Re: [PATCH] Reuse Workers and Replication Slots during Logical Replication
Here are some review comments for v22-0003 == 1. ApplicationNameForTablesync +/* + * Determine the application_name for tablesync workers. + * + * Previously, the replication slot name was used as application_name. Since + * it's possible to reuse tablesync workers now, a tablesync worker can handle + * several different replication slots during its lifetime. Therefore, we + * cannot use the slot name as application_name anymore. Instead, the slot + * number of the tablesync worker is used as a part of the application_name. + * + * FIXME: if the tablesync worker starts to reuse the replication slot during + * synchronization, we should again use the replication slot name as + * application_name. + */ +static void +ApplicationNameForTablesync(Oid suboid, int worker_slot, + char *application_name, Size szapp) +{ + snprintf(application_name, szapp, "pg_%u_sync_%i_" UINT64_FORMAT, suboid, + worker_slot, GetSystemIdentifier()); +} 1a. The intent of the "FIXME" comment was not clear. Is this some existing problem that needs addressing, or is this really more like just an "XXX" warning/note for the future, in case the tablesync logic changes? ~ 1b. Since this is a new function, should it be named according to the convention for static functions? e.g. ApplicationNameForTablesync -> app_name_for_tablesync -- Kind Regards, Peter Smith. Fujitsu Australia
Re: [PATCH] Reuse Workers and Replication Slots during Logical Replication
Here are some review comments for v22-0002 == 1. General - errmsg AFAIK, the errmsg part does not need to be enclosed by extra parentheses. e.g. BEFORE ereport(LOG, (errmsg("logical replication table synchronization worker for subscription \"%s\" has finished", MySubscription->name))); AFTER ereport(LOG, errmsg("logical replication table synchronization worker for subscription \"%s\" has finished", MySubscription->name)); ~ The patch has multiple cases similar to that example. == src/backend/replication/logical/tablesync.c 2. + if (reuse_worker) + { + ereport(LOG, + (errmsg("logical replication table synchronization worker for subscription \"%s\" will be reused to sync table \"%s\" with relid %u.", + MySubscription->name, + get_rel_name(MyLogicalRepWorker->relid), + MyLogicalRepWorker->relid))); + } + else + { + ereport(LOG, + (errmsg("logical replication table synchronization worker for subscription \"%s\" has finished", + MySubscription->name))); + } These brackets { } are not really necessary. ~~~ 3. TablesyncWorkerMain + for (;!done;) + { + List*rstates; + ListCell *lc; + + run_tablesync_worker(); + + if (IsTransactionState()) + CommitTransactionCommand(); + + if (MyLogicalRepWorker->relsync_completed) + { + /* + * This tablesync worker is 'done' unless another table that needs + * syncing is found. + */ + done = true; Those variables 'rstates' and 'lc' do not need to be declared at this scope -- they can be declared further down, closer to where they are needed. = src/backend/replication/logical/worker.c 4. LogicalRepApplyLoop + + if (am_tablesync_worker()) + /* + * If relsync_completed is true, this means that the tablesync + * worker is done with synchronization. Streaming has already been + * ended by process_syncing_tables_for_sync. We should move to the + * next table if needed, or exit. + */ + if (MyLogicalRepWorker->relsync_completed) + endofstream = true; Here I think it is better to use bracketing { } for the outer "if", instead of only relying on the indentation for readability. YMMV. -- Kind Regards, Peter Smith. Fujitsu Australia
Re: [PATCH] Reuse Workers and Replication Slots during Logical Replication
On Wed, Jul 26, 2023 at 10:10 AM Peter Smith wrote: > > Here are some comments for patch v22-0001. > > == > 1. General -- naming conventions > > There is quite a lot of inconsistency with variable/parameter naming > styles in this patch. I understand in most cases the names are copied > unchanged from the original functions. Still, since this is a big > refactor anyway, it can also be a good opportunity to clean up those > inconsistencies instead of just propagating them to different places. > I am not against improving consistency in the naming of existing variables but I feel it would be better to do as a separate patch along with improving the consistency function names. For new functions/variables, it would be good to follow a consistent style. -- With Regards, Amit Kapila.
Re: [PATCH] Reuse Workers and Replication Slots during Logical Replication
Here are some comments for patch v22-0001. == 1. General -- naming conventions There is quite a lot of inconsistency with variable/parameter naming styles in this patch. I understand in most cases the names are copied unchanged from the original functions. Still, since this is a big refactor anyway, it can also be a good opportunity to clean up those inconsistencies instead of just propagating them to different places. IIUC, the usual reluctance to rename things because it would cause backpatch difficulties doesn't apply here (since everything is being refactored anyway). E.g. Consider using use snake_case names more consistently in the following places: ~ 1a. start_table_sync +static void +start_table_sync(XLogRecPtr *origin_startpos, char **myslotname) +{ + char*syncslotname = NULL; origin_startpos -> (no change) myslotname -> my_slot_name (But, is there a better name for this than calling it "my" slot name) syncslotname -> sync_slot_name ~ 1b. run_tablesync_worker +static void +run_tablesync_worker() +{ + char originname[NAMEDATALEN]; + XLogRecPtr origin_startpos = InvalidXLogRecPtr; + char*slotname = NULL; + WalRcvStreamOptions options; originname -> origin_name origin_startpos -> (no change) slotname -> slot_name ~ 1c. set_stream_options +void +set_stream_options(WalRcvStreamOptions *options, +char *slotname, +XLogRecPtr *origin_startpos) +{ + int server_version; options -> (no change) slotname -> slot_name origin_startpos -> (no change) server_version -> (no change) ~ 1d. run_apply_worker static void -start_apply(XLogRecPtr origin_startpos) +run_apply_worker() { - PG_TRY(); + char originname[NAMEDATALEN]; + XLogRecPtr origin_startpos = InvalidXLogRecPtr; + char*slotname = NULL; + WalRcvStreamOptions options; + RepOriginId originid; + TimeLineID startpointTLI; + char*err; + bool must_use_password; originname -> origin_name origin_startpos => (no change) slotname -> slot_name originid -> origin_id == src/backend/replication/logical/worker.c 2. SetupApplyOrSyncWorker -ApplyWorkerMain(Datum main_arg) +SetupApplyOrSyncWorker(int worker_slot) { - int worker_slot = DatumGetInt32(main_arg); - char originname[NAMEDATALEN]; - XLogRecPtr origin_startpos = InvalidXLogRecPtr; - char*myslotname = NULL; - WalRcvStreamOptions options; - int server_version; - - InitializingApplyWorker = true; - /* Attach to slot */ logicalrep_worker_attach(worker_slot); + Assert(am_tablesync_worker() || am_leader_apply_worker()); + Why is the Assert not the very first statement of this function? == Kind Regards, Peter Smith. Fujitsu Australia
Re: [PATCH] Reuse Workers and Replication Slots during Logical Replication
Hi, Melih Mutlu , 21 Tem 2023 Cum, 12:47 tarihinde şunu yazdı: > I did not realize the order is the same with .c files. Good to know. I'll > fix it along with other comments. > Addressed the recent reviews and attached the updated patches. Thanks, -- Melih Mutlu Microsoft v22-0001-Refactor-to-split-Apply-and-Tablesync-Workers.patch Description: Binary data v22-0002-Reuse-Tablesync-Workers.patch Description: Binary data v22-0003-Reuse-connection-when-tablesync-workers-change-t.patch Description: Binary data
Re: [PATCH] Reuse Workers and Replication Slots during Logical Replication
Peter Smith , 21 Tem 2023 Cum, 12:48 tarihinde şunu yazdı: > On Fri, Jul 21, 2023 at 5:24 PM Amit Kapila > wrote: > > > > On Fri, Jul 21, 2023 at 12:05 PM Peter Smith > wrote: > > > > > > On Fri, Jul 21, 2023 at 3:39 PM Amit Kapila > wrote: > > > > > > > > > The other thing I noticed is that we > > > > don't seem to be consistent in naming functions in these files. For > > > > example, shall we make all exposed functions follow camel case (like > > > > InitializeLogRepWorker) and static functions follow _ style (like > > > > run_apply_worker) or the other possibility is to use _ style for all > > > > functions except may be the entry functions like ApplyWorkerMain()? I > > > > don't know if there is already a pattern but if not then let's form > it > > > > now, so that code looks consistent. > > > > > > > > > > +1 for using some consistent rule, but I think this may result in > > > *many* changes, so it would be safer to itemize all the changes first, > > > just to make sure everybody is OK with it first before updating > > > everything. > > > > > > > Fair enough. We can do that as a first patch and then work on the > > refactoring patch to avoid introducing more inconsistencies or we can > > do the refactoring patch first but keep all the new function names to > > follow _ style. > > > > Fixing the naming inconsistency will be more far-reaching than just a > few functions affected by these "reuse" patches. There are plenty of > existing functions already inconsistently named in the HEAD code. So > perhaps this topic should be moved to a separate thread? > +1 for moving it to a separate thread. This is not something particularly introduced by this patch. Thanks, -- Melih Mutlu Microsoft
Re: [PATCH] Reuse Workers and Replication Slots during Logical Replication
On Fri, Jul 21, 2023 at 5:24 PM Amit Kapila wrote: > > On Fri, Jul 21, 2023 at 12:05 PM Peter Smith wrote: > > > > On Fri, Jul 21, 2023 at 3:39 PM Amit Kapila wrote: > > > > > > The other thing I noticed is that we > > > don't seem to be consistent in naming functions in these files. For > > > example, shall we make all exposed functions follow camel case (like > > > InitializeLogRepWorker) and static functions follow _ style (like > > > run_apply_worker) or the other possibility is to use _ style for all > > > functions except may be the entry functions like ApplyWorkerMain()? I > > > don't know if there is already a pattern but if not then let's form it > > > now, so that code looks consistent. > > > > > > > +1 for using some consistent rule, but I think this may result in > > *many* changes, so it would be safer to itemize all the changes first, > > just to make sure everybody is OK with it first before updating > > everything. > > > > Fair enough. We can do that as a first patch and then work on the > refactoring patch to avoid introducing more inconsistencies or we can > do the refactoring patch first but keep all the new function names to > follow _ style. > Fixing the naming inconsistency will be more far-reaching than just a few functions affected by these "reuse" patches. There are plenty of existing functions already inconsistently named in the HEAD code. So perhaps this topic should be moved to a separate thread? For example, here are some existing/proposed names: === worker.c (HEAD) static functions DisableSubscriptionAndExit -> disable_subscription_and_exit FindReplTupleInLocalRel -> find_repl_tuple_in_local_rel TwoPhaseTransactionGid -> two_phase_transaction_gid TargetPrivilegesCheck -> target_privileges_check UpdateWorkerStats -> update_worker_stats LogicalRepApplyLoop -> logical_rep_apply_loop non-static functions stream_stop_internal -> StreamStopInternal apply_spooled_messages -> ApplySpooledMessages apply_dispatch -> ApplyDispatch store_flush_position -> StoreFlushPosition set_apply_error_context_origin -> SetApplyErrorContextOrigin === tablesync.c (HEAD) static functions FetchTableStates -> fetch_table_states non-static functions invalidate_syncing_table_states -> InvalidateSyncingTableStates -- Kind Regards, Peter Smith. Fujitsu Australia
Re: [PATCH] Reuse Workers and Replication Slots during Logical Replication
Amit Kapila , 21 Tem 2023 Cum, 08:39 tarihinde şunu yazdı: > On Fri, Jul 21, 2023 at 7:30 AM Peter Smith wrote: > How about SetupLogRepWorker? The other thing I noticed is that we > don't seem to be consistent in naming functions in these files. For > example, shall we make all exposed functions follow camel case (like > InitializeLogRepWorker) and static functions follow _ style (like > run_apply_worker) or the other possibility is to use _ style for all > functions except may be the entry functions like ApplyWorkerMain()? I > don't know if there is already a pattern but if not then let's form it > now, so that code looks consistent. > I agree that these files have inconsistencies in naming things. Most of the time I can't really figure out which naming convention I should use. I try to name things by looking at other functions with similar responsibilities. > 3. > > extern void pa_xact_finish(ParallelApplyWorkerInfo *winfo, > > XLogRecPtr remote_lsn); > > +extern void set_stream_options(WalRcvStreamOptions *options, > > +char *slotname, > > +XLogRecPtr *origin_startpos); > > + > > +extern void start_apply(XLogRecPtr origin_startpos); > > +extern void DisableSubscriptionAndExit(void); > > +extern void StartLogRepWorker(int worker_slot); > > > > This placement (esp. with the missing whitespace) seems to be grouping > > the set_stream_options with the other 'pa' externs, which are all > > under the comment "/* Parallel apply worker setup and interactions > > */". > > > > Putting all these up near the other "extern void > > InitializeLogRepWorker(void)" might be less ambiguous. > > > > +1. Also, note that they should be in the same order as they are in .c > files. > I did not realize the order is the same with .c files. Good to know. I'll fix it along with other comments. Thanks, -- Melih Mutlu Microsoft
Re: [PATCH] Reuse Workers and Replication Slots during Logical Replication
On Fri, Jul 21, 2023 at 12:05 PM Peter Smith wrote: > > On Fri, Jul 21, 2023 at 3:39 PM Amit Kapila wrote: > > > > On Fri, Jul 21, 2023 at 7:30 AM Peter Smith wrote: > > > > > > ~~~ > > > > > > 2. StartLogRepWorker > > > > > > /* Common function to start the leader apply or tablesync worker. */ > > > void > > > StartLogRepWorker(int worker_slot) > > > { > > > /* Attach to slot */ > > > logicalrep_worker_attach(worker_slot); > > > > > > /* Setup signal handling */ > > > pqsignal(SIGHUP, SignalHandlerForConfigReload); > > > pqsignal(SIGTERM, die); > > > BackgroundWorkerUnblockSignals(); > > > > > > /* > > > * We don't currently need any ResourceOwner in a walreceiver process, but > > > * if we did, we could call CreateAuxProcessResourceOwner here. > > > */ > > > > > > /* Initialise stats to a sanish value */ > > > MyLogicalRepWorker->last_send_time = MyLogicalRepWorker->last_recv_time = > > > MyLogicalRepWorker->reply_time = GetCurrentTimestamp(); > > > > > > /* Load the libpq-specific functions */ > > > load_file("libpqwalreceiver", false); > > > > > > InitializeLogRepWorker(); > > > > > > /* Connect to the origin and start the replication. */ > > > elog(DEBUG1, "connecting to publisher using connection string \"%s\"", > > > MySubscription->conninfo); > > > > > > /* > > > * Setup callback for syscache so that we know when something changes in > > > * the subscription relation state. > > > */ > > > CacheRegisterSyscacheCallback(SUBSCRIPTIONRELMAP, > > > invalidate_syncing_table_states, > > > (Datum) 0); > > > } > > > > > > ~ > > > > > > 2a. > > > The function name seems a bit misleading because it is not really > > > "starting" anything here - it is just more "initialization" code, > > > right? Nor is it common to all kinds of LogRepWorker. Maybe the > > > function could be named something else like 'InitApplyOrSyncWorker()'. > > > -- see also #2c > > > > > > > How about SetupLogRepWorker? > > The name is better than StartXXX, but still, SetupXXX seems a synonym > of InitXXX. That is why I thought it is a bit awkward having 2 > functions with effectively the same name and the same > initialization/setup purpose (the only difference is one function > excludes parallel workers, and the other function is common to all > workers). > I can't know of a better way. We can probably name it as SetupApplyOrSyncWorker or something like that if you find that better. > > The other thing I noticed is that we > > don't seem to be consistent in naming functions in these files. For > > example, shall we make all exposed functions follow camel case (like > > InitializeLogRepWorker) and static functions follow _ style (like > > run_apply_worker) or the other possibility is to use _ style for all > > functions except may be the entry functions like ApplyWorkerMain()? I > > don't know if there is already a pattern but if not then let's form it > > now, so that code looks consistent. > > > > +1 for using some consistent rule, but I think this may result in > *many* changes, so it would be safer to itemize all the changes first, > just to make sure everybody is OK with it first before updating > everything. > Fair enough. We can do that as a first patch and then work on the refactoring patch to avoid introducing more inconsistencies or we can do the refactoring patch first but keep all the new function names to follow _ style. Apart from this, few more comments on 0001: 1. +run_apply_worker(WalRcvStreamOptions *options, + char *slotname, + char *originname, + int originname_size, + XLogRecPtr *origin_startpos) The caller neither uses nor passes the value of origin_startpos. So, isn't it better to make origin_startpos local to run_apply_worker()? It seems the same is true for some of the other parameters slotname, originname, originname_size. Is there a reason to keep these as arguments in this function? 2. +static void +run_tablesync_worker(WalRcvStreamOptions *options, + char *slotname, + char *originname, + int originname_size, + XLogRecPtr *origin_startpos) The comments in the previous point seem to apply to this as well. 3. + set_stream_options(options, slotname, origin_startpos); + + walrcv_startstreaming(LogRepWorkerWalRcvConn, options); + + if (MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING && + AllTablesyncsReady()) This last check is done in set_stream_options() and here as well. I don't see any reason to give different answers at both places but before the patch, we were not relying on any such assumption that this check will always give the same answer considering the answer could be different due to AllTablesyncsReady(). Can we move this check outside set_stream_options()? -- With Regards, Amit Kapila.
Re: [PATCH] Reuse Workers and Replication Slots during Logical Replication
On Fri, Jul 21, 2023 at 3:39 PM Amit Kapila wrote: > > On Fri, Jul 21, 2023 at 7:30 AM Peter Smith wrote: > > > > ~~~ > > > > 2. StartLogRepWorker > > > > /* Common function to start the leader apply or tablesync worker. */ > > void > > StartLogRepWorker(int worker_slot) > > { > > /* Attach to slot */ > > logicalrep_worker_attach(worker_slot); > > > > /* Setup signal handling */ > > pqsignal(SIGHUP, SignalHandlerForConfigReload); > > pqsignal(SIGTERM, die); > > BackgroundWorkerUnblockSignals(); > > > > /* > > * We don't currently need any ResourceOwner in a walreceiver process, but > > * if we did, we could call CreateAuxProcessResourceOwner here. > > */ > > > > /* Initialise stats to a sanish value */ > > MyLogicalRepWorker->last_send_time = MyLogicalRepWorker->last_recv_time = > > MyLogicalRepWorker->reply_time = GetCurrentTimestamp(); > > > > /* Load the libpq-specific functions */ > > load_file("libpqwalreceiver", false); > > > > InitializeLogRepWorker(); > > > > /* Connect to the origin and start the replication. */ > > elog(DEBUG1, "connecting to publisher using connection string \"%s\"", > > MySubscription->conninfo); > > > > /* > > * Setup callback for syscache so that we know when something changes in > > * the subscription relation state. > > */ > > CacheRegisterSyscacheCallback(SUBSCRIPTIONRELMAP, > > invalidate_syncing_table_states, > > (Datum) 0); > > } > > > > ~ > > > > 2a. > > The function name seems a bit misleading because it is not really > > "starting" anything here - it is just more "initialization" code, > > right? Nor is it common to all kinds of LogRepWorker. Maybe the > > function could be named something else like 'InitApplyOrSyncWorker()'. > > -- see also #2c > > > > How about SetupLogRepWorker? The name is better than StartXXX, but still, SetupXXX seems a synonym of InitXXX. That is why I thought it is a bit awkward having 2 functions with effectively the same name and the same initialization/setup purpose (the only difference is one function excludes parallel workers, and the other function is common to all workers). > The other thing I noticed is that we > don't seem to be consistent in naming functions in these files. For > example, shall we make all exposed functions follow camel case (like > InitializeLogRepWorker) and static functions follow _ style (like > run_apply_worker) or the other possibility is to use _ style for all > functions except may be the entry functions like ApplyWorkerMain()? I > don't know if there is already a pattern but if not then let's form it > now, so that code looks consistent. > +1 for using some consistent rule, but I think this may result in *many* changes, so it would be safer to itemize all the changes first, just to make sure everybody is OK with it first before updating everything. -- Kind Regards, Peter Smith
Re: [PATCH] Reuse Workers and Replication Slots during Logical Replication
On Fri, Jul 21, 2023 at 7:30 AM Peter Smith wrote: > > ~~~ > > 2. StartLogRepWorker > > /* Common function to start the leader apply or tablesync worker. */ > void > StartLogRepWorker(int worker_slot) > { > /* Attach to slot */ > logicalrep_worker_attach(worker_slot); > > /* Setup signal handling */ > pqsignal(SIGHUP, SignalHandlerForConfigReload); > pqsignal(SIGTERM, die); > BackgroundWorkerUnblockSignals(); > > /* > * We don't currently need any ResourceOwner in a walreceiver process, but > * if we did, we could call CreateAuxProcessResourceOwner here. > */ > > /* Initialise stats to a sanish value */ > MyLogicalRepWorker->last_send_time = MyLogicalRepWorker->last_recv_time = > MyLogicalRepWorker->reply_time = GetCurrentTimestamp(); > > /* Load the libpq-specific functions */ > load_file("libpqwalreceiver", false); > > InitializeLogRepWorker(); > > /* Connect to the origin and start the replication. */ > elog(DEBUG1, "connecting to publisher using connection string \"%s\"", > MySubscription->conninfo); > > /* > * Setup callback for syscache so that we know when something changes in > * the subscription relation state. > */ > CacheRegisterSyscacheCallback(SUBSCRIPTIONRELMAP, > invalidate_syncing_table_states, > (Datum) 0); > } > > ~ > > 2a. > The function name seems a bit misleading because it is not really > "starting" anything here - it is just more "initialization" code, > right? Nor is it common to all kinds of LogRepWorker. Maybe the > function could be named something else like 'InitApplyOrSyncWorker()'. > -- see also #2c > How about SetupLogRepWorker? The other thing I noticed is that we don't seem to be consistent in naming functions in these files. For example, shall we make all exposed functions follow camel case (like InitializeLogRepWorker) and static functions follow _ style (like run_apply_worker) or the other possibility is to use _ style for all functions except may be the entry functions like ApplyWorkerMain()? I don't know if there is already a pattern but if not then let's form it now, so that code looks consistent. > ~ > > 2b. > Should this have Assert to ensure this is only called from leader > apply or tablesync? -- see also #2c > > ~ > > 2c. > IMO maybe the best/tidiest way to do this is not to introduce a new > function at all. Instead, just put all this "common init" code into > the existing "common init" function ('InitializeLogRepWorker') and > execute it only if (am_tablesync_worker() || am_leader_apply_worker()) > { }. > I don't like 2c much because it will make InitializeLogRepWorker() have two kinds of initializations. > == > src/include/replication/worker_internal.h > > 3. > extern void pa_xact_finish(ParallelApplyWorkerInfo *winfo, > XLogRecPtr remote_lsn); > +extern void set_stream_options(WalRcvStreamOptions *options, > +char *slotname, > +XLogRecPtr *origin_startpos); > + > +extern void start_apply(XLogRecPtr origin_startpos); > +extern void DisableSubscriptionAndExit(void); > +extern void StartLogRepWorker(int worker_slot); > > This placement (esp. with the missing whitespace) seems to be grouping > the set_stream_options with the other 'pa' externs, which are all > under the comment "/* Parallel apply worker setup and interactions > */". > > Putting all these up near the other "extern void > InitializeLogRepWorker(void)" might be less ambiguous. > +1. Also, note that they should be in the same order as they are in .c files. -- With Regards, Amit Kapila.
Re: [PATCH] Reuse Workers and Replication Slots during Logical Replication
Some review comments for v21-0002. On Thu, Jul 20, 2023 at 11:41 PM Melih Mutlu wrote: > > Hi, > > Attached the updated patches with recent reviews addressed. > > See below for my comments: > > Peter Smith , 19 Tem 2023 Çar, 06:08 tarihinde şunu > yazdı: >> >> 5. >> + /* Found a table for next iteration */ >> + finish_sync_worker(true); >> + >> + StartTransactionCommand(); >> + ereport(LOG, >> + (errmsg("logical replication worker for subscription \"%s\" will be >> reused to sync table \"%s\" with relid %u.", >> + MySubscription->name, >> + get_rel_name(MyLogicalRepWorker->relid), >> + MyLogicalRepWorker->relid))); >> + CommitTransactionCommand(); >> + >> + done = false; >> + break; >> + } >> + LWLockRelease(LogicalRepWorkerLock); >> >> >> 5b. >> Isn't there a missing call to that LWLockRelease, if the 'break' happens? > > > Lock is already released before break, if that's the lock you meant: > >> /* Update worker state for the next table */ >> MyLogicalRepWorker->relid = rstate->relid; >> MyLogicalRepWorker->relstate = rstate->state; >> MyLogicalRepWorker->relstate_lsn = rstate->lsn; >> LWLockRelease(LogicalRepWorkerLock); >> >> >> /* Found a table for next iteration */ >> finish_sync_worker(true); >> done = false; >> break; > > Sorry, I misread the code. You are right. == src/backend/replication/logical/tablesync.c 1. + if (!reuse_worker) + { + ereport(LOG, + (errmsg("logical replication table synchronization worker for subscription \"%s\" has finished", + MySubscription->name))); + } + else + { + ereport(LOG, + (errmsg("logical replication worker for subscription \"%s\" will be reused to sync table \"%s\" with relid %u.", + MySubscription->name, + get_rel_name(MyLogicalRepWorker->relid), + MyLogicalRepWorker->relid))); + } 1a. We know this must be a tablesync worker, so I think that second errmsg should also be saying "logical replication table synchronization worker". ~ 1b. Since this is if/else anyway, is it simpler to be positive and say "if (reuse_worker)" instead of the negative "if (!reuse_worker)" ~~~ 2. run_tablesync_worker { + MyLogicalRepWorker->relsync_completed = false; + + /* Start table synchronization. */ start_table_sync(origin_startpos, &slotname); This still contains the added comment that I'd previously posted I thought was adding anything useful. Also, I didn't think this comment exists in the HEAD code. == src/backend/replication/logical/worker.c 3. LogicalRepApplyLoop + /* + * apply_dispatch() may have gone into apply_handle_commit() + * which can call process_syncing_tables_for_sync. + * + * process_syncing_tables_for_sync decides whether the sync of + * the current table is completed. If it is completed, + * streaming must be already ended. So, we can break the loop. + */ + if (am_tablesync_worker() && + MyLogicalRepWorker->relsync_completed) + { + endofstream = true; + break; + } + Maybe just personal taste, but IMO it is better to rearrange like below because then there is no reason to read the long comment except for tablesync workers. if (am_tablesync_worker()) { /* * apply_dispatch() may have gone into apply_handle_commit() * which can call process_syncing_tables_for_sync. * * process_syncing_tables_for_sync decides whether the sync of * the current table is completed. If it is completed, * streaming must be already ended. So, we can break the loop. */ if (MyLogicalRepWorker->relsync_completed) { endofstream = true; break; } } ~~~ 4. LogicalRepApplyLoop + + /* + * If relsync_completed is true, this means that the tablesync + * worker is done with synchronization. Streaming has already been + * ended by process_syncing_tables_for_sync. We should move to the + * next table if needed, or exit. + */ + if (am_tablesync_worker() && + MyLogicalRepWorker->relsync_completed) + endofstream = true; Ditto the same comment about rearranging the condition, as #3 above. == src/include/replication/worker_internal.h 5. + /* + * Indicates whether tablesync worker has completed syncing its assigned + * table. + */ + bool relsync_completed; + Isn't it better to arrange this to be adjacent to other relXXX fields, so they all clearly belong to that "Used for initial table synchronization." group? For example, something like: /* Used for initial table synchronization. */ Oid relid; char relstate; XLogRecPtr relstate_lsn; slock_t relmutex; bool relsync_completed; /* has tablesync finished syncing the assigned table? */ -- Kind Regards, Peter Smith. Fujitsu Australia
Re: [PATCH] Reuse Workers and Replication Slots during Logical Replication
Some review comments for v21-0001 == src/backend/replication/logical/worker.c 1. InitializeLogRepWorker if (am_tablesync_worker()) ereport(LOG, - (errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has started", + (errmsg("logical replication worker for subscription \"%s\", table \"%s\" has started", MySubscription->name, get_rel_name(MyLogicalRepWorker->relid; I think this should not be changed. IIUC that decision for using the generic worker name for translations was only when the errmsg was in shared code where the worker type was not clear from existing conditions. See also previous review comments [1]. ~~~ 2. StartLogRepWorker /* Common function to start the leader apply or tablesync worker. */ void StartLogRepWorker(int worker_slot) { /* Attach to slot */ logicalrep_worker_attach(worker_slot); /* Setup signal handling */ pqsignal(SIGHUP, SignalHandlerForConfigReload); pqsignal(SIGTERM, die); BackgroundWorkerUnblockSignals(); /* * We don't currently need any ResourceOwner in a walreceiver process, but * if we did, we could call CreateAuxProcessResourceOwner here. */ /* Initialise stats to a sanish value */ MyLogicalRepWorker->last_send_time = MyLogicalRepWorker->last_recv_time = MyLogicalRepWorker->reply_time = GetCurrentTimestamp(); /* Load the libpq-specific functions */ load_file("libpqwalreceiver", false); InitializeLogRepWorker(); /* Connect to the origin and start the replication. */ elog(DEBUG1, "connecting to publisher using connection string \"%s\"", MySubscription->conninfo); /* * Setup callback for syscache so that we know when something changes in * the subscription relation state. */ CacheRegisterSyscacheCallback(SUBSCRIPTIONRELMAP, invalidate_syncing_table_states, (Datum) 0); } ~ 2a. The function name seems a bit misleading because it is not really "starting" anything here - it is just more "initialization" code, right? Nor is it common to all kinds of LogRepWorker. Maybe the function could be named something else like 'InitApplyOrSyncWorker()'. -- see also #2c ~ 2b. Should this have Assert to ensure this is only called from leader apply or tablesync? -- see also #2c ~ 2c. IMO maybe the best/tidiest way to do this is not to introduce a new function at all. Instead, just put all this "common init" code into the existing "common init" function ('InitializeLogRepWorker') and execute it only if (am_tablesync_worker() || am_leader_apply_worker()) { }. == src/include/replication/worker_internal.h 3. extern void pa_xact_finish(ParallelApplyWorkerInfo *winfo, XLogRecPtr remote_lsn); +extern void set_stream_options(WalRcvStreamOptions *options, +char *slotname, +XLogRecPtr *origin_startpos); + +extern void start_apply(XLogRecPtr origin_startpos); +extern void DisableSubscriptionAndExit(void); +extern void StartLogRepWorker(int worker_slot); This placement (esp. with the missing whitespace) seems to be grouping the set_stream_options with the other 'pa' externs, which are all under the comment "/* Parallel apply worker setup and interactions */". Putting all these up near the other "extern void InitializeLogRepWorker(void)" might be less ambiguous. -- [1] worker name in errmsg - https://www.postgresql.org/message-id/CAA4eK1%2B%2BwkxxMjsPh-z2aKa9ZjNhKsjv0Tnw%2BTVX-hCBkDHusw%40mail.gmail.com Kind Regards, Peter Smith. Fujitsu Australia
Re: [PATCH] Reuse Workers and Replication Slots during Logical Replication
On Thu, Jul 20, 2023 at 11:41 PM Melih Mutlu wrote: > > Hi, > > Attached the updated patches with recent reviews addressed. > > See below for my comments: > > Peter Smith , 19 Tem 2023 Çar, 06:08 tarihinde şunu > yazdı: >> >> Some review comments for v19-0001 >> >> 2. LogicalRepSyncTableStart >> >> /* >> * Finally, wait until the leader apply worker tells us to catch up and >> * then return to let LogicalRepApplyLoop do it. >> */ >> wait_for_worker_state_change(SUBREL_STATE_CATCHUP); >> >> ~ >> >> Should LogicalRepApplyLoop still be mentioned here, since that is >> static in worker.c? Maybe it is better to refer instead to the common >> 'start_apply' wrapper? (see also #5a below) > > > Isn't' LogicalRepApplyLoop static on HEAD and also mentioned in the exact > comment in tablesync.c while the common "start_apply" function also exists? > I'm not sure how such a change would be related to this patch. > Fair enough. I thought it was questionable for one module to refer to another module's static functions, but you are correct - it is not really related to your patch. Sorry for the noise. -- Kind Regards, Peter Smith. Fujitsu Australia
Re: [PATCH] Reuse Workers and Replication Slots during Logical Replication
Hi, Attached the updated patches with recent reviews addressed. See below for my comments: Peter Smith , 19 Tem 2023 Çar, 06:08 tarihinde şunu yazdı: > Some review comments for v19-0001 > > 2. LogicalRepSyncTableStart > > /* > * Finally, wait until the leader apply worker tells us to catch up and > * then return to let LogicalRepApplyLoop do it. > */ > wait_for_worker_state_change(SUBREL_STATE_CATCHUP); > > ~ > > Should LogicalRepApplyLoop still be mentioned here, since that is > static in worker.c? Maybe it is better to refer instead to the common > 'start_apply' wrapper? (see also #5a below) Isn't' LogicalRepApplyLoop static on HEAD and also mentioned in the exact comment in tablesync.c while the common "start_apply" function also exists? I'm not sure how such a change would be related to this patch. --- 5. > + /* Found a table for next iteration */ > + finish_sync_worker(true); > + > + StartTransactionCommand(); > + ereport(LOG, > + (errmsg("logical replication worker for subscription \"%s\" will be > reused to sync table \"%s\" with relid %u.", > + MySubscription->name, > + get_rel_name(MyLogicalRepWorker->relid), > + MyLogicalRepWorker->relid))); > + CommitTransactionCommand(); > + > + done = false; > + break; > + } > + LWLockRelease(LogicalRepWorkerLock); > 5b. > Isn't there a missing call to that LWLockRelease, if the 'break' happens? Lock is already released before break, if that's the lock you meant: /* Update worker state for the next table */ > MyLogicalRepWorker->relid = rstate->relid; > MyLogicalRepWorker->relstate = rstate->state; > MyLogicalRepWorker->relstate_lsn = rstate->lsn; > LWLockRelease(LogicalRepWorkerLock); > /* Found a table for next iteration */ > finish_sync_worker(true); > done = false; > break; --- 2. > As for the publisher node, this patch allows to reuse logical > walsender processes > after the streaming is done once. > ~ > Is this paragraph even needed? Since the connection is reused then it > already implies the other end (the Wlasender) is being reused, right? I actually see no harm in explaining this explicitly. Thanks, -- Melih Mutlu Microsoft v21-0001-Refactor-to-split-Apply-and-Tablesync-Workers.patch Description: Binary data v21-0002-Reuse-Tablesync-Workers.patch Description: Binary data v21-0003-Reuse-connection-when-tablesync-workers-change-t.patch Description: Binary data
Re: [PATCH] Reuse Workers and Replication Slots during Logical Replication
On Thu, Jul 20, 2023 at 5:12 PM Melih Mutlu wrote: > > Peter Smith , 20 Tem 2023 Per, 05:41 tarihinde şunu > yazdı: >> >> 7. InitializeLogRepWorker >> >> if (am_tablesync_worker()) >> ereport(LOG, >> - (errmsg("logical replication worker for subscription \"%s\", table >> \"%s\" has started", >> + (errmsg("logical replication worker for subscription \"%s\", table >> \"%s\" with relid %u has started", >> MySubscription->name, >> - get_rel_name(MyLogicalRepWorker->relid; >> + get_rel_name(MyLogicalRepWorker->relid), >> + MyLogicalRepWorker->relid))); >> >> But this is certainly a tablesync worker so the message here should >> say "logical replication table synchronization worker" like the HEAD >> code used to do. >> >> It seems this mistake was introduced in patch v20-0001. > > > I'm a bit confused here. Isn't it decided to use "logical replication worker" > regardless of the worker's type [1]. That's why I made this change. If that's > not the case here, I'll put it back. > I feel where the worker type is clear, it is better to use it unless the same can lead to translation issues. -- With Regards, Amit Kapila.
Re: [PATCH] Reuse Workers and Replication Slots during Logical Replication
Hi, Peter Smith , 20 Tem 2023 Per, 05:41 tarihinde şunu yazdı: > 7. InitializeLogRepWorker > > if (am_tablesync_worker()) > ereport(LOG, > - (errmsg("logical replication worker for subscription \"%s\", table > \"%s\" has started", > + (errmsg("logical replication worker for subscription \"%s\", table > \"%s\" with relid %u has started", > MySubscription->name, > - get_rel_name(MyLogicalRepWorker->relid; > + get_rel_name(MyLogicalRepWorker->relid), > + MyLogicalRepWorker->relid))); > > But this is certainly a tablesync worker so the message here should > say "logical replication table synchronization worker" like the HEAD > code used to do. > > It seems this mistake was introduced in patch v20-0001. > I'm a bit confused here. Isn't it decided to use "logical replication worker" regardless of the worker's type [1]. That's why I made this change. If that's not the case here, I'll put it back. [1] https://www.postgresql.org/message-id/flat/CAHut%2BPt1xwATviPGjjtJy5L631SGf3qjV9XUCmxLu16cHamfgg%40mail.gmail.com Thanks, -- Melih Mutlu Microsoft
Re: [PATCH] Reuse Workers and Replication Slots during Logical Replication
Hi Peter, Peter Smith , 20 Tem 2023 Per, 07:10 tarihinde şunu yazdı: > Hi, I had a look at the latest 3 patch (v20-0003). > > Although this patch was recently modified, the updates are mostly only > to make it compatible with the updated v20-0002 patch. Specifically, > the v20-0003 updates did not yet address my review comments from > v17-0003 [1]. > Yes, I only addressed your reviews for 0001 and 0002, and rebased 0003 in latest patches as stated here [1]. I'll update the patch soon according to recent reviews, including yours for 0003. [1] https://www.postgresql.org/message-id/CAGPVpCTvALKEXe0%3DN-%2BiMmVxVQ-%2BP8KZ_1qQ1KsSSZ-V9wJ5hw%40mail.gmail.com Thanks for the reminder. -- Melih Mutlu Microsoft
Re: [PATCH] Reuse Workers and Replication Slots during Logical Replication
Hi, I had a look at the latest 3 patch (v20-0003). Although this patch was recently modified, the updates are mostly only to make it compatible with the updated v20-0002 patch. Specifically, the v20-0003 updates did not yet address my review comments from v17-0003 [1]. Anyway, this post is just a reminder so the earlier review doesn't get forgotten. -- [1] v17-0003 review - https://www.postgresql.org/message-id/CAHut%2BPuMAiO_X_Kw6ud-jr5WOm%2Brpkdu7CppDU6mu%3DgY7UVMzQ%40mail.gmail.com Kind Regards, Peter Smith. Fujitsu Australia
Re: [PATCH] Reuse Workers and Replication Slots during Logical Replication
On Thu, Jul 20, 2023 at 8:02 AM Peter Smith wrote: > > On Tue, Jul 18, 2023 at 1:54 AM Melih Mutlu wrote: > > > > Hi, > > > > PFA updated patches. Rebased 0003 with minor changes. Addressed Peter's > > reviews for 0001 and 0002 with some small comments below. > > > > Peter Smith , 10 Tem 2023 Pzt, 10:09 tarihinde şunu > > yazdı: > >> > >> 6. LogicalRepApplyLoop > >> > >> + /* > >> + * apply_dispatch() may have gone into apply_handle_commit() > >> + * which can call process_syncing_tables_for_sync. > >> + * > >> + * process_syncing_tables_for_sync decides whether the sync of > >> + * the current table is completed. If it is completed, > >> + * streaming must be already ended. So, we can break the loop. > >> + */ > >> + if (MyLogicalRepWorker->is_sync_completed) > >> + { > >> + endofstream = true; > >> + break; > >> + } > >> + > >> > >> and > >> > >> + /* > >> + * If is_sync_completed is true, this means that the tablesync > >> + * worker is done with synchronization. Streaming has already been > >> + * ended by process_syncing_tables_for_sync. We should move to the > >> + * next table if needed, or exit. > >> + */ > >> + if (MyLogicalRepWorker->is_sync_completed) > >> + endofstream = true; > >> > >> ~ > >> > >> Instead of those code fragments above assigning 'endofstream' as a > >> side-effect, would it be the same (but tidier) to just modify the > >> other "breaking" condition below: > >> > >> BEFORE: > >> /* Check if we need to exit the streaming loop. */ > >> if (endofstream) > >> break; > >> > >> AFTER: > >> /* Check if we need to exit the streaming loop. */ > >> if (endofstream || MyLogicalRepWorker->is_sync_completed) > >> break; > > > > > > First place you mentioned also breaks the infinite loop. Such an if > > statement is needed there with or without endofstream assignment. > > > > I think if there is a flag to break a loop, using that flag to indicate > > that we should exit the loop seems more appropriate to me. I see that it > > would be a bit tidier without endofstream = true lines, but I feel like it > > would also be less readable. > > > > I don't have a strong opinion though. I'm just keeping them as they are for > > now, but I can change them if you disagree. > > > > I felt it was slightly sneaky to re-use the existing variable as a > convenient way to do what you want. But, I don’t feel strongly enough > on this point to debate it -- maybe see later if others have an > opinion about this. > I feel it is okay to use the existing variable 'endofstream' here but shall we have an assertion that it is a tablesync worker? > >> > >> > >> 10b. > >> All the other tablesync-related fields of this struct are named as > >> relXXX, so I wonder if is better for this to follow the same pattern. > >> e.g. 'relsync_completed' > > > > > > Aren't those start with rel because they're related to the relation that > > the tablesync worker is syncing? is_sync_completed is not a relation > > specific field. I'm okay with changing the name but feel like > > relsync_completed would be misleading. > > My reading of the code is slightly different: Only these fields have > the prefix ‘rel’ and they are all grouped under the comment “/* Used > for initial table synchronization. */” because AFAIK only these fields > are TWS specific (not used for other kinds of workers). > > Since this new flag field is also TWS-specific, therefore IMO it > should follow the same consistent name pattern. But, if you are > unconvinced, maybe see later if others have an opinion about it. > +1 to use the prefix 'rel' here as the sync is specific to the relation. Even during apply phase, we will apply the relation-specific changes. See should_apply_changes_for_rel(). -- With Regards, Amit Kapila.
Re: [PATCH] Reuse Workers and Replication Slots during Logical Replication
Some review comments for patch v20-0002 == src/backend/replication/logical/tablesync.c 1. finish_sync_worker /* * Exit routine for synchronization worker. * * If reuse_worker is false, the worker will not be reused and exit. */ ~ IMO the "will not be reused" part doesn't need saying -- it is self-evident from the fact "reuse_worker is false". SUGGESTION If reuse_worker is false, at the conclusion of this function the worker process will exit. ~~~ 2. finish_sync_worker - StartTransactionCommand(); - ereport(LOG, - (errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has finished", - MySubscription->name, - get_rel_name(MyLogicalRepWorker->relid; - CommitTransactionCommand(); - /* Find the leader apply worker and signal it. */ logicalrep_worker_wakeup(MyLogicalRepWorker->subid, InvalidOid); - /* Stop gracefully */ - proc_exit(0); + if (!reuse_worker) + { + StartTransactionCommand(); + ereport(LOG, + (errmsg("logical replication table synchronization worker for subscription \"%s\" has finished", + MySubscription->name))); + CommitTransactionCommand(); + + /* Stop gracefully */ + proc_exit(0); + } In the HEAD code the log message came *before* it signalled to the apply leader. Won't it be better to keep the logic in that same order? ~~~ 3. process_syncing_tables_for_sync - finish_sync_worker(); + /* Sync worker has completed synchronization of the current table. */ + MyLogicalRepWorker->is_sync_completed = true; + + ereport(LOG, + (errmsg("logical replication table synchronization worker for subscription \"%s\", relation \"%s\" with relid %u has finished", + MySubscription->name, + get_rel_name(MyLogicalRepWorker->relid), + MyLogicalRepWorker->relid))); + CommitTransactionCommand(); IIUC it is only the " table synchronization" part that is finished here; not the whole "table synchronization worker" (compared to finish_sync_worker function), so maybe the word "worker" should not be in this message. ~~~ 4. TablesyncWorkerMain + if (MyLogicalRepWorker->is_sync_completed) + { + /* tablesync is done unless a table that needs syncning is found */ + done = true; SUGGESTION (Typo "syncning" and minor rewording.) This tablesync worker is 'done' unless another table that needs syncing is found. ~ 5. + /* Found a table for next iteration */ + finish_sync_worker(true); + + StartTransactionCommand(); + ereport(LOG, + (errmsg("logical replication worker for subscription \"%s\" will be reused to sync table \"%s\" with relid %u.", + MySubscription->name, + get_rel_name(MyLogicalRepWorker->relid), + MyLogicalRepWorker->relid))); + CommitTransactionCommand(); + + done = false; + break; + } + LWLockRelease(LogicalRepWorkerLock); 5a. IMO it seems better to put this ereport *inside* the finish_sync_worker() function alongside the similar log for when the worker is not reused. ~ 5b. Isn't there a missing call to that LWLockRelease, if the 'break' happens? == src/backend/replication/logical/worker.c 6. LogicalRepApplyLoop Refer to [1] for my reply to a previous review comment ~~~ 7. InitializeLogRepWorker if (am_tablesync_worker()) ereport(LOG, - (errmsg("logical replication worker for subscription \"%s\", table \"%s\" has started", + (errmsg("logical replication worker for subscription \"%s\", table \"%s\" with relid %u has started", MySubscription->name, - get_rel_name(MyLogicalRepWorker->relid; + get_rel_name(MyLogicalRepWorker->relid), + MyLogicalRepWorker->relid))); But this is certainly a tablesync worker so the message here should say "logical replication table synchronization worker" like the HEAD code used to do. It seems this mistake was introduced in patch v20-0001. == src/include/replication/worker_internal.h 8. Refer to [1] for my reply to a previous review comment -- [1] Replies to previous 0002 comments -- https://www.postgresql.org/message-id/CAHut%2BPtiAtGJC52SGNdobOah5ctYDDhWWKd%3DuP%3DrkRgXzg5rdg%40mail.gmail.com Kind Regards, Peter Smith. Fujitsu Australia
Re: [PATCH] Reuse Workers and Replication Slots during Logical Replication
On Tue, Jul 18, 2023 at 1:54 AM Melih Mutlu wrote: > > Hi, > > PFA updated patches. Rebased 0003 with minor changes. Addressed Peter's > reviews for 0001 and 0002 with some small comments below. > > Peter Smith , 10 Tem 2023 Pzt, 10:09 tarihinde şunu > yazdı: >> >> 6. LogicalRepApplyLoop >> >> + /* >> + * apply_dispatch() may have gone into apply_handle_commit() >> + * which can call process_syncing_tables_for_sync. >> + * >> + * process_syncing_tables_for_sync decides whether the sync of >> + * the current table is completed. If it is completed, >> + * streaming must be already ended. So, we can break the loop. >> + */ >> + if (MyLogicalRepWorker->is_sync_completed) >> + { >> + endofstream = true; >> + break; >> + } >> + >> >> and >> >> + /* >> + * If is_sync_completed is true, this means that the tablesync >> + * worker is done with synchronization. Streaming has already been >> + * ended by process_syncing_tables_for_sync. We should move to the >> + * next table if needed, or exit. >> + */ >> + if (MyLogicalRepWorker->is_sync_completed) >> + endofstream = true; >> >> ~ >> >> Instead of those code fragments above assigning 'endofstream' as a >> side-effect, would it be the same (but tidier) to just modify the >> other "breaking" condition below: >> >> BEFORE: >> /* Check if we need to exit the streaming loop. */ >> if (endofstream) >> break; >> >> AFTER: >> /* Check if we need to exit the streaming loop. */ >> if (endofstream || MyLogicalRepWorker->is_sync_completed) >> break; > > > First place you mentioned also breaks the infinite loop. Such an if statement > is needed there with or without endofstream assignment. > > I think if there is a flag to break a loop, using that flag to indicate that > we should exit the loop seems more appropriate to me. I see that it would be > a bit tidier without endofstream = true lines, but I feel like it would also > be less readable. > > I don't have a strong opinion though. I'm just keeping them as they are for > now, but I can change them if you disagree. > I felt it was slightly sneaky to re-use the existing variable as a convenient way to do what you want. But, I don’t feel strongly enough on this point to debate it -- maybe see later if others have an opinion about this. >> >> >> 10b. >> All the other tablesync-related fields of this struct are named as >> relXXX, so I wonder if is better for this to follow the same pattern. >> e.g. 'relsync_completed' > > > Aren't those start with rel because they're related to the relation that the > tablesync worker is syncing? is_sync_completed is not a relation specific > field. I'm okay with changing the name but feel like relsync_completed would > be misleading. My reading of the code is slightly different: Only these fields have the prefix ‘rel’ and they are all grouped under the comment “/* Used for initial table synchronization. */” because AFAIK only these fields are TWS specific (not used for other kinds of workers). Since this new flag field is also TWS-specific, therefore IMO it should follow the same consistent name pattern. But, if you are unconvinced, maybe see later if others have an opinion about it. -- Kind Regards, Peter Smith. Fujitsu Australia
Re: [PATCH] Reuse Workers and Replication Slots during Logical Replication
On Wed, Jul 19, 2023 at 8:38 AM Peter Smith wrote: > > Some review comments for v19-0001 > ... > == > src/backend/replication/logical/worker.c > > 3. set_stream_options > > +/* > + * Sets streaming options including replication slot name and origin start > + * position. Workers need these options for logical replication. > + */ > +void > +set_stream_options(WalRcvStreamOptions *options, > > I'm not sure if the last sentence of the comment is adding anything useful. > Personally, I find it useful as at a high-level it tells the purpose of setting these options. > ~~~ > > 4. start_apply > /* > * Run the apply loop with error handling. Disable the subscription, > * if necessary. > * > * Note that we don't handle FATAL errors which are probably because > * of system resource error and are not repeatable. > */ > void > start_apply(XLogRecPtr origin_startpos) > > ~ > > 4a. > Somehow I found the function names to be confusing. Intuitively (IMO) > 'start_apply' is for apply worker and 'start_tablesync' is for > tablesync worker. But actually, the start_apply() function is the > *common* function for both kinds of worker. Might be easier to > understand if start_apply function name can be changed to indicate it > is really common -- e.g. common_apply_loop(), or similar. > > ~ > > 4b. > If adverse to changing the function name, it might be helpful anyway > if the function comment can emphasize this function is shared by > different worker types. e.g. "Common function to run the apply > loop..." > I would prefer to change the comments as suggested by you in 4b because both the workers (apply and tablesync) need to perform apply, so it seems logical for both of them to invoke start_apply. -- With Regards, Amit Kapila.
Re: [PATCH] Reuse Workers and Replication Slots during Logical Replication
Some review comments for v19-0001 == src/backend/replication/logical/tablesync.c 1. run_tablesync_worker +run_tablesync_worker(WalRcvStreamOptions *options, + char *slotname, + char *originname, + int originname_size, + XLogRecPtr *origin_startpos) +{ + /* Start table synchronization. */ + start_table_sync(origin_startpos, &slotname); There was no such comment ("/* Start table synchronization. */") in the original HEAD code, so I didn't see that it adds much value by adding it in the refactored code. ~~~ 2. LogicalRepSyncTableStart /* * Finally, wait until the leader apply worker tells us to catch up and * then return to let LogicalRepApplyLoop do it. */ wait_for_worker_state_change(SUBREL_STATE_CATCHUP); ~ Should LogicalRepApplyLoop still be mentioned here, since that is static in worker.c? Maybe it is better to refer instead to the common 'start_apply' wrapper? (see also #5a below) == src/backend/replication/logical/worker.c 3. set_stream_options +/* + * Sets streaming options including replication slot name and origin start + * position. Workers need these options for logical replication. + */ +void +set_stream_options(WalRcvStreamOptions *options, I'm not sure if the last sentence of the comment is adding anything useful. ~~~ 4. start_apply /* * Run the apply loop with error handling. Disable the subscription, * if necessary. * * Note that we don't handle FATAL errors which are probably because * of system resource error and are not repeatable. */ void start_apply(XLogRecPtr origin_startpos) ~ 4a. Somehow I found the function names to be confusing. Intuitively (IMO) 'start_apply' is for apply worker and 'start_tablesync' is for tablesync worker. But actually, the start_apply() function is the *common* function for both kinds of worker. Might be easier to understand if start_apply function name can be changed to indicate it is really common -- e.g. common_apply_loop(), or similar. ~ 4b. If adverse to changing the function name, it might be helpful anyway if the function comment can emphasize this function is shared by different worker types. e.g. "Common function to run the apply loop..." ~~~ 5. run_apply_worker + ReplicationOriginNameForLogicalRep(MySubscription->oid, InvalidOid, +originname, originname_size); + + /* Setup replication origin tracking. */ + StartTransactionCommand(); Even if you wish ReplicationOriginNameForLogicalRep() to be outside of the transaction I thought it should still come *after* the comment, same as it does in the HEAD code. ~~~ 6. ApplyWorkerMain - /* Run the main loop. */ - start_apply(origin_startpos); + /* This is leader apply worker */ + run_apply_worker(&options, myslotname, originname, sizeof(originname), &origin_startpos); proc_exit(0); } ~ 6a. The comment "/* This is leader apply worker */" is redundant now. This function is the entry point for leader apply workers so it can't be anything else. ~ 6b. Caller parameter wrapping differs from the similar code in TablesyncWorkerMain. Shouldn't they be similar? e.g. + run_apply_worker(&options, myslotname, originname, sizeof(originname), &origin_startpos); versus + run_tablesync_worker(&options, + myslotname, + originname, + sizeof(originname), + &origin_startpos); == src/include/replication/worker_internal.h 7. + +extern void set_stream_options(WalRcvStreamOptions *options, +char *slotname, +XLogRecPtr *origin_startpos); +extern void start_apply(XLogRecPtr origin_startpos); +extern void DisableSubscriptionAndExit(void); + Maybe all the externs belong together? It doesn't seem right for just these 3 externs to be separated from all the others, with those static inline functions in-between. -- Kind Regards, Peter Smith. Fujitsu Australia
Re: [PATCH] Reuse Workers and Replication Slots during Logical Replication
On Tue, 11 Jul 2023 at 08:30, Peter Smith wrote: > > On Tue, Jul 11, 2023 at 12:31 AM Melih Mutlu wrote: > > > > Hi, > > > > Hayato Kuroda (Fujitsu) , 6 Tem 2023 Per, > > 12:47 tarihinde şunu yazdı: > > > > > > Dear Melih, > > > > > > > Thanks for the 0003 patch. But it did not work for me. Can you create > > > > a subscription successfully with patch 0003 applied? > > > > I get the following error: " ERROR: table copy could not start > > > > transaction on publisher: another command is already in progress". > > > > > > You got the ERROR when all the patches (0001-0005) were applied, right? > > > I have focused on 0001 and 0002 only, so I missed something. > > > If it was not correct, please attach the logfile and test script what you > > > did. > > > > Yes, I did get an error with all patches applied. But with only 0001 > > and 0002, your version seems like working and mine does not. > > What do you think about combining 0002 and 0003? Or should those stay > > separate? > > > > Even if patches 0003 and 0002 are to be combined, I think that should > not happen until after the "reuse" design is confirmed which way is > best. > > e.g. IMO it might be easier to compare the different PoC designs for > patch 0002 if there is no extra logic involved. > > PoC design#1 -- each tablesync decides for itself what to do next > after it finishes > PoC design#2 -- reuse tablesync using a "pool" of available workers I did a POC for design#2 for implementing a worker pool to synchronize the tables for a subscriber. The core design is the same as what Melih had implemented at [1]. I had already started the implementation of POC based on one of the earlier e-mail [2] Peter had shared. The POC has been implemented like: a) Apply worker will check the tablesync pool and see if any tablesync worker is free: i) If there are no free workers in the pool, start a table sync worker and add it to the table sync pool. ii) If there are free workers in the pool, re-use the tablesync worker for synchronizing another table. b) Apply worker will check if the tables are synchronized, if all the tables are synchronized apply worker will release all the workers from the tablesync pool c) Apply worker and tablesync worker has shared memory to share the following relation data and execution state between the apply worker and the tablesync worker d) The apply worker and tablesync worker's pid are also stored in the shared memory so that we need not take a lock on LogicalRepWorkerLock and loop on max_logical_replication_workers every time. We use the pid stored in shared memory to wake up the apply worker and tablesync worker whenever needed. While I was implementing the POC I found one issue in the POC patch(there is no problem with the HEAD code, issue was only with the POC): 1) Apply worker was waiting for the table to be set to SYNCDONE. 2) Mean time tablesync worker sets the table to SYNCDONE and sets apply worker's latch. 3) Apply worker will reset the latch set by tablesync and go to main loop and wait in main loop latch(since tablesync worker's latch was already reset, apply worker will wait for 1 second) To fix this I had to set apply worker's latch once in 1ms in this case alone which is not a good solution as it will consume a lot of cpu cycles. A better fix for this would be to introduce a new subscription relation state. Attached patch has the changes for the same. 001, 0002 and 0003 are the patches shared by Melih and Kuroda-san earlier. 0004 patch has the changes for the POC of Tablesync worker pool implementation. POC design 1: Tablesync worker identifies the tables that should be synced and reuses the connection. POC design 2: Tablesync worker pool with apply worker scheduling the work to tablesync workers in the tablesync pool and reusing the connection. Performance results for 10 empty tables: +---+++--++ || 2 sync workers | 4 sync workers | 8 sync workers | 16 sync workers| +---+++--++ | HEAD | 128.4685 ms| 121.271 ms | 136.5455 ms | N/A | +---+++--++ | POC design#1| 70.7095 ms| 80.9805 ms| 102.773 ms | N/A | +---+++--++ | POC design#2| 70.858 ms | 83.0845 ms| 112.505 ms| N/A | +---+++--++ Performance results for 100 empty tables: +---+++--++ | | 2 sync workers | 4 sync workers | 8 sync workers | 16 sync workers| +---+---
Re: [PATCH] Reuse Workers and Replication Slots during Logical Replication
On Tue, Jul 18, 2023 at 2:33 PM Melih Mutlu wrote: > > Attached the fixed patchset. > Few comments on 0001 1. + logicalrep_worker_attach(worker_slot); + + /* Setup signal handling */ + pqsignal(SIGHUP, SignalHandlerForConfigReload); + pqsignal(SIGTERM, die); + BackgroundWorkerUnblockSignals(); + + /* + * We don't currently need any ResourceOwner in a walreceiver process, but + * if we did, we could call CreateAuxProcessResourceOwner here. + */ + + /* Initialise stats to a sanish value */ + MyLogicalRepWorker->last_send_time = MyLogicalRepWorker->last_recv_time = + MyLogicalRepWorker->reply_time = GetCurrentTimestamp(); + + /* Load the libpq-specific functions */ + load_file("libpqwalreceiver", false); + + InitializeLogRepWorker(); + + /* Connect to the origin and start the replication. */ + elog(DEBUG1, "connecting to publisher using connection string \"%s\"", + MySubscription->conninfo); + + /* + * Setup callback for syscache so that we know when something changes in + * the subscription relation state. + */ + CacheRegisterSyscacheCallback(SUBSCRIPTIONRELMAP, + invalidate_syncing_table_states, + (Datum) 0); It seems this part of the code is the same for ApplyWorkerMain() and TablesyncWorkerMain(). So, won't it be better to move it into a common function? 2. Can LogicalRepSyncTableStart() be static function? 3. I think you don't need to send 0004, 0005 each time till we are able to finish patches till 0003. 4. In 0001's commit message, you can say that it will help the upcoming reuse tablesync worker patch. -- With Regards, Amit Kapila.
Re: [PATCH] Reuse Workers and Replication Slots during Logical Replication
On Tue, Jul 18, 2023 at 11:25 AM Peter Smith wrote: > > On Tue, Jul 18, 2023 at 1:54 AM Melih Mutlu wrote: > > > > Hi, > > > > PFA updated patches. Rebased 0003 with minor changes. Addressed Peter's > > reviews for 0001 and 0002 with some small comments below. > > > > Thanks, I will take another look at these soon. FYI, the 0001 patch > does not apply cleanly. It needs to be rebased again because > get_worker_name() function was recently removed from HEAD. > Sorry, to be more correct -- it applied OK, but failed to build. > replication/logical/worker.o: In function `InitializeLogRepWorker': > /home/postgres/oss_postgres_misc/src/backend/replication/logical/worker.c:4605: > undefined reference to `get_worker_name' > > -- > Kind Regards, > Peter Smith. > Fujitsu Australia
Re: [PATCH] Reuse Workers and Replication Slots during Logical Replication
On Tue, Jul 18, 2023 at 1:54 AM Melih Mutlu wrote: > > Hi, > > PFA updated patches. Rebased 0003 with minor changes. Addressed Peter's > reviews for 0001 and 0002 with some small comments below. > Thanks, I will take another look at these soon. FYI, the 0001 patch does not apply cleanly. It needs to be rebased again because get_worker_name() function was recently removed from HEAD. replication/logical/worker.o: In function `InitializeLogRepWorker': /home/postgres/oss_postgres_misc/src/backend/replication/logical/worker.c:4605: undefined reference to `get_worker_name' -- Kind Regards, Peter Smith. Fujitsu Australia
Re: [PATCH] Reuse Workers and Replication Slots during Logical Replication
On Fri, Jul 14, 2023 at 3:07 PM Melih Mutlu wrote: > > Amit Kapila , 14 Tem 2023 Cum, 11:11 tarihinde şunu > yazdı: >> >> Yeah, it is quite surprising that Design#2 is worse than master. I >> suspect there is something wrong going on with your Design#2 patch. >> One area to check is whether apply worker is able to quickly assign >> the new relations to tablesync workers. Note that currently after the >> first time assigning the tables to workers, the apply worker may wait >> before processing the next set of tables in the main loop of >> LogicalRepApplyLoop(). The other minor point about design#2 >> implementation is that you may want to first assign the allocated >> tablesync workers before trying to launch a new worker. > > > It's not actually worse than master all the time. It seems like it's just > unreliable. > Here are some consecutive runs for both designs and master. > > design#1 = 1621,527 ms, 1788,533 ms, 1645,618 ms, 1702,068 ms, 1745,753 ms > design#2 = 2089,077 ms, 1864,571 ms, 4574,799 ms, 5422,217 ms, 1905,944 ms > master = 2815,138 ms, 2481,954 ms , 2594,413 ms, 2620,690 ms, 2489,323 ms > > And apply worker was not busy with applying anything during these experiments > since there were not any writes to the publisher. I'm not sure how that would > also affect the performance if there were any writes. > Yeah, this is a valid point. I think this is in favor of the Design#1 approach we are discussing here. One thing I was thinking whether we can do anything to alleviate the contention at the higher worker count. One possibility is to have some kind of available worker list which can be used to pick up the next worker instead of checking all the workers while assigning the next table. We can probably explore it separately once the first three patches are ready because anyway, this will be an optimization atop the Design#1 approach. -- With Regards, Amit Kapila.
Re: [PATCH] Reuse Workers and Replication Slots during Logical Replication
Hi, Amit Kapila , 14 Tem 2023 Cum, 11:11 tarihinde şunu yazdı: > Yeah, it is quite surprising that Design#2 is worse than master. I > suspect there is something wrong going on with your Design#2 patch. > One area to check is whether apply worker is able to quickly assign > the new relations to tablesync workers. Note that currently after the > first time assigning the tables to workers, the apply worker may wait > before processing the next set of tables in the main loop of > LogicalRepApplyLoop(). The other minor point about design#2 > implementation is that you may want to first assign the allocated > tablesync workers before trying to launch a new worker. > It's not actually worse than master all the time. It seems like it's just unreliable. Here are some consecutive runs for both designs and master. design#1 = 1621,527 ms, 1788,533 ms, 1645,618 ms, 1702,068 ms, 1745,753 ms design#2 = 2089,077 ms, 1864,571 ms, 4574,799 ms, 5422,217 ms, 1905,944 ms master = 2815,138 ms, 2481,954 ms , 2594,413 ms, 2620,690 ms, 2489,323 ms And apply worker was not busy with applying anything during these experiments since there were not any writes to the publisher. I'm not sure how that would also affect the performance if there were any writes. Thanks, -- Melih Mutlu Microsoft
Re: [PATCH] Reuse Workers and Replication Slots during Logical Replication
Hi Kuroda-san. Here are some review comments for the v17-0003 patch. They are all minor. == Commit message 1. Previously tablesync workers establish new connections when it changes the syncing table, but this might have additional overhead. This patch allows to reuse connections instead. ~ /This patch allows to reuse connections instead./This patch allows the existing connection to be reused./ ~~~ 2. As for the publisher node, this patch allows to reuse logical walsender processes after the streaming is done once. ~ Is this paragraph even needed? Since the connection is reused then it already implies the other end (the Wlasender) is being reused, right? == src/backend/replication/logical/tablesync.c 3. + * FIXME: set appropriate application_name. Previously, the slot name was used + * because the lifetime of the tablesync worker was same as that, but now the + * tablesync worker handles many slots during the synchronization so that it is + * not suitable. So what should be? Note that if the tablesync worker starts to + * reuse the replication slot during synchronization, we should use the slot + * name as application_name again. + */ +static void +ApplicationNameForTablesync(Oid suboid, int worker_slot, + char *application_name, Size szapp) 3a. I felt that most of this FIXME comment belongs with the calling code, not here. 3b. Also, maybe it needs some rewording -- I didn't understand exactly what it is trying to say. ~~~ 4. - /* - * Here we use the slot name instead of the subscription name as the - * application_name, so that it is different from the leader apply worker, - * so that synchronous replication can distinguish them. - */ - LogRepWorkerWalRcvConn = - walrcv_connect(MySubscription->conninfo, true, -must_use_password, -slotname, &err); + /* Connect to the publisher if haven't done so already. */ + if (LogRepWorkerWalRcvConn == NULL) + { + char application_name[NAMEDATALEN]; + + /* + * The application_name must be also different from the leader apply + * worker because synchronous replication must distinguish them. + */ + ApplicationNameForTablesync(MySubscription->oid, + MyLogicalRepWorker->worker_slot, + application_name, + NAMEDATALEN); + LogRepWorkerWalRcvConn = + walrcv_connect(MySubscription->conninfo, true, +must_use_password, +application_name, &err); + } + Should the comment mention the "subscription name" as it did before? SUGGESTION The application_name must differ from the subscription name (used by the leader apply worker) because synchronous replication has to be able to distinguish this worker from the leader apply worker. == src/backend/replication/logical/worker.c 5. -start_table_sync(XLogRecPtr *origin_startpos, char **myslotname) +start_table_sync(XLogRecPtr *origin_startpos, + char **myslotname) This is a wrapping change only. It looks like an unnecessary hangover from a previous version of 0003. == src/backend/replication/walsender.c 6. exec_replication_command + if (cmd->kind == REPLICATION_KIND_PHYSICAL) StartReplication(cmd); ~ The extra blank line does not belong in this patch. == src/include/replication/worker_internal.h + /* Indicates the slot number which corresponds to this LogicalRepWorker. */ + int worker_slot; + 6a I think this field is very fundamental, so IMO it should be defined at the top of the struct, maybe nearby the other 'in_use' and 'generation' fields. ~ 6b. Also, since this is already a "worker" struct so there is no need to have "worker" in the field name again -- just "slot_number" or "slotnum" might be a better name. And then the comment can also be simplified. SUGGESTION /* Slot number of this worker. */ int slotnum; -- Kind Regards, Peter Smith. Fujitsu Australia
Re: [PATCH] Reuse Workers and Replication Slots during Logical Replication
On Fri, Jul 14, 2023 at 1:58 AM Melih Mutlu wrote: > > Here are some quick numbers with 100 empty tables. > > +--++++ > | | 2 sync workers | 4 sync workers | 8 sync workers | > +--++++ > | POC design#1 | 1909.873 ms| 986.261 ms | 552.404 ms | > +--++++ > | POC design#2 | 4962.208 ms| 1240.503 ms| 1165.405 ms| > +--++++ > | master | 2666.008 ms| 1462.012 ms| 986.848 ms | > +--++++ > > Seems like design#1 is better than both design#2 and master overall. It's > surprising to see that even master beats design#2 in some cases though. Not > sure if that is expected or there are some places to improve design#2 even > more. > Yeah, it is quite surprising that Design#2 is worse than master. I suspect there is something wrong going on with your Design#2 patch. One area to check is whether apply worker is able to quickly assign the new relations to tablesync workers. Note that currently after the first time assigning the tables to workers, the apply worker may wait before processing the next set of tables in the main loop of LogicalRepApplyLoop(). The other minor point about design#2 implementation is that you may want to first assign the allocated tablesync workers before trying to launch a new worker. > > PS: I only attached the related patches and not the whole patch set. 0001 and > 0002 may contain some of your earlier reviews, but I'll send a proper updated > set soon. > Yeah, that would be helpful. -- With Regards, Amit Kapila.
Re: [PATCH] Reuse Workers and Replication Slots during Logical Replication
Hi Peter, Peter Smith , 11 Tem 2023 Sal, 05:59 tarihinde şunu yazdı: > Even if patches 0003 and 0002 are to be combined, I think that should > not happen until after the "reuse" design is confirmed which way is > best. > > e.g. IMO it might be easier to compare the different PoC designs for > patch 0002 if there is no extra logic involved. > > PoC design#1 -- each tablesync decides for itself what to do next > after it finishes > PoC design#2 -- reuse tablesync using a "pool" of available workers Right. I made a patch 0003 to change 0002 so that tables will be assigned to sync workers by apply worker. It's a rough POC and ignores some edge cases. But this is what I think how apply worker would take the responsibility of table assignments. Hope the implementation makes sense and I'm not missing anything that may cause degraded perforrmance. PoC design#1 --> apply only patch 0001 and 0002 PoC design#2 --> apply all patches, 0001, 0002 and 0003 Here are some quick numbers with 100 empty tables. +--++++ | | 2 sync workers | 4 sync workers | 8 sync workers | +--++++ | POC design#1 | 1909.873 ms| 986.261 ms | 552.404 ms | +--++++ | POC design#2 | 4962.208 ms| 1240.503 ms| 1165.405 ms| +--++++ | master | 2666.008 ms| 1462.012 ms| 986.848 ms | +--++++ Seems like design#1 is better than both design#2 and master overall. It's surprising to see that even master beats design#2 in some cases though. Not sure if that is expected or there are some places to improve design#2 even more. What do you think? PS: I only attached the related patches and not the whole patch set. 0001 and 0002 may contain some of your earlier reviews, but I'll send a proper updated set soon. Thanks, -- Melih Mutlu Microsoft v18-0002-Reuse-Tablesync-Workers.patch Description: Binary data v18-0001-Refactor-to-split-Apply-and-Tablesync-Workers.patch Description: Binary data v18-0003-apply-worker-assigns-tables.patch Description: Binary data
RE: [PATCH] Reuse Workers and Replication Slots during Logical Replication
Dear Melih, > > > Thanks for the 0003 patch. But it did not work for me. Can you create > > > a subscription successfully with patch 0003 applied? > > > I get the following error: " ERROR: table copy could not start > > > transaction on publisher: another command is already in progress". > > > > You got the ERROR when all the patches (0001-0005) were applied, right? > > I have focused on 0001 and 0002 only, so I missed something. > > If it was not correct, please attach the logfile and test script what you > > did. > > Yes, I did get an error with all patches applied. But with only 0001 > and 0002, your version seems like working and mine does not. Hmm, really? IIUC I did not modify 0001 and 0002 patches, I just re-assigned the version number. I compared between yours and mine, but no meaningful differences were found. E.g., following command compared v4-0002 and v16-0002: ``` diff --git a/../reuse_workers/v4-0002-Reuse-Tablesync-Workers.patch b/../reuse_workers/hayato/v16-0002-Reuse-Tablesync-Workers.patch index 5350216e98..7785a573e4 100644 --- a/../reuse_workers/v4-0002-Reuse-Tablesync-Workers.patch +++ b/../reuse_workers/hayato/v16-0002-Reuse-Tablesync-Workers.patch @@ -1,7 +1,7 @@ -From d482022b40e0a5ce1b74fd0e320cb5b45da2f671 Mon Sep 17 00:00:00 2001 +From db3e8e2d7aadea79126c5816bce8b06dc82f33c2 Mon Sep 17 00:00:00 2001 From: Melih Mutlu Date: Tue, 4 Jul 2023 22:04:46 +0300 -Subject: [PATCH 2/5] Reuse Tablesync Workers +Subject: [PATCH v16 2/5] Reuse Tablesync Workers This commit allows reusing tablesync workers for syncing more than one table sequentially during their lifetime, instead of exiting after @@ -324,5 +324,5 @@ index 7aba034774..1e9f8e6e72 100644 static inline bool am_tablesync_worker(void) -- -2.25.1 +2.27.0 ``` For confirmation, please attach the logfile and test script what you did if you could reproduce? > What do you think about combining 0002 and 0003? Or should those stay > separate? I have no strong opinion, but it may be useful to keep them pluggable. Best Regards, Hayato Kuroda FUJITSU LIMITED
Re: [PATCH] Reuse Workers and Replication Slots during Logical Replication
On Mon, Jul 10, 2023 at 8:01 PM Melih Mutlu wrote: > > Hayato Kuroda (Fujitsu) , 6 Tem 2023 Per, > 12:47 tarihinde şunu yazdı: > > > > Dear Melih, > > > > > Thanks for the 0003 patch. But it did not work for me. Can you create > > > a subscription successfully with patch 0003 applied? > > > I get the following error: " ERROR: table copy could not start > > > transaction on publisher: another command is already in progress". > > > > You got the ERROR when all the patches (0001-0005) were applied, right? > > I have focused on 0001 and 0002 only, so I missed something. > > If it was not correct, please attach the logfile and test script what you > > did. > > Yes, I did get an error with all patches applied. But with only 0001 > and 0002, your version seems like working and mine does not. > What do you think about combining 0002 and 0003? Or should those stay > separate? > I am fine either way but I think one minor advantage of keeping 0003 separate is that we can focus on some of the problems specific to that patch. For example, the following comment in the 0003 patch: "FIXME: set appropriate application_name...". I have given a suggestion to address it in [1] and Kuroda-San seems to have addressed the same but I am not sure if all of us agree with that or if there is any better way to address it. What do you think? > > > * 0003 basically improved performance from first two patches > > Agree, 0003 is definitely a good addition which was missing earlier. > +1. [1] - https://www.postgresql.org/message-id/CAA4eK1JOZHmy2o2F2wTCPKsjpwDiKZPOeTa_jt%3Dwm2JLbf-jsg%40mail.gmail.com -- With Regards, Amit Kapila.
Re: [PATCH] Reuse Workers and Replication Slots during Logical Replication
On Tue, Jul 11, 2023 at 12:31 AM Melih Mutlu wrote: > > Hi, > > Hayato Kuroda (Fujitsu) , 6 Tem 2023 Per, > 12:47 tarihinde şunu yazdı: > > > > Dear Melih, > > > > > Thanks for the 0003 patch. But it did not work for me. Can you create > > > a subscription successfully with patch 0003 applied? > > > I get the following error: " ERROR: table copy could not start > > > transaction on publisher: another command is already in progress". > > > > You got the ERROR when all the patches (0001-0005) were applied, right? > > I have focused on 0001 and 0002 only, so I missed something. > > If it was not correct, please attach the logfile and test script what you > > did. > > Yes, I did get an error with all patches applied. But with only 0001 > and 0002, your version seems like working and mine does not. > What do you think about combining 0002 and 0003? Or should those stay > separate? > Even if patches 0003 and 0002 are to be combined, I think that should not happen until after the "reuse" design is confirmed which way is best. e.g. IMO it might be easier to compare the different PoC designs for patch 0002 if there is no extra logic involved. PoC design#1 -- each tablesync decides for itself what to do next after it finishes PoC design#2 -- reuse tablesync using a "pool" of available workers -- Kind Regards, Peter Smith. Fujitsu Australia
Re: [PATCH] Reuse Workers and Replication Slots during Logical Replication
Here are some review comments for patch v16-3 == 1. Commit Message. The patch description is missing. == 2. General. +LogicalRepSyncTableStart(XLogRecPtr *origin_startpos, int worker_slot) and +start_table_sync(XLogRecPtr *origin_startpos, + char **myslotname, + int worker_slot) and @@ -4548,12 +4552,13 @@ run_tablesync_worker(WalRcvStreamOptions *options, char *slotname, char *originname, int originname_size, - XLogRecPtr *origin_startpos) + XLogRecPtr *origin_startpos, + int worker_slot) It seems the worker_slot is being passed all over the place as an additional function argument so that it can be used to construct an application_name. Is it possible/better to introduce a new 'MyLogicalRepWorker' field for the 'worker_slot' so it does not have to be passed like this? == src/backend/replication/logical/tablesync.c 3. + /* + * Disconnect from publisher. Otherwise reused sync workers causes + * exceeding max_wal_senders. + */ + if (LogRepWorkerWalRcvConn != NULL) + { + walrcv_disconnect(LogRepWorkerWalRcvConn); + LogRepWorkerWalRcvConn = NULL; + } + Why is this comment mentioning anything about "reused workers" at all? The worker process exits in this function, right? ~~~ 4. LogicalRepSyncTableStart /* - * Here we use the slot name instead of the subscription name as the - * application_name, so that it is different from the leader apply worker, - * so that synchronous replication can distinguish them. + * Connect to publisher if not yet. The application_name must be also + * different from the leader apply worker because synchronous replication + * must distinguish them. */ I felt all the details in the 2nd part of this comment belong inside the condition, not outside. SUGGESTION /* Connect to the publisher if haven't done so already. */ ~~~ 5. + if (LogRepWorkerWalRcvConn == NULL) + { + char application_name[NAMEDATALEN]; + + /* + * FIXME: set appropriate application_name. Previously, the slot name + * was used becasue the lifetime of the tablesync worker was same as + * that, but now the tablesync worker handles many slots during the + * synchronization so that it is not suitable. So what should be? + * Note that if the tablesync worker starts to reuse the replication + * slot during synchronization, we should use the slot name as + * application_name again. + */ + snprintf(application_name, NAMEDATALEN, "pg_%u_sync_%i", + MySubscription->oid, worker_slot); + LogRepWorkerWalRcvConn = + walrcv_connect(MySubscription->conninfo, true, +must_use_password, +application_name, &err); + } 5a. /becasue/because/ ~ 5b. I am not sure about what name this should ideally use, but anyway for uniqueness doesn't it still need to include the GetSystemIdentifier() same as function ReplicationSlotNameForTablesync() was doing? Maybe this can use the same function ReplicationSlotNameForTablesync() can be used but just pass the worker_slot instead of the relid? == src/backend/replication/logical/worker.c 6. LogicalRepApplyLoop /* * Init the ApplyMessageContext which we clean up after each replication - * protocol message. + * protocol message, if needed. */ - ApplyMessageContext = AllocSetContextCreate(ApplyContext, - "ApplyMessageContext", - ALLOCSET_DEFAULT_SIZES); + if (!ApplyMessageContext) + ApplyMessageContext = AllocSetContextCreate(ApplyContext, + "ApplyMessageContext", + Maybe slightly reword the comment. BEFORE: Init the ApplyMessageContext which we clean up after each replication protocol message, if needed. AFTER: Init the ApplyMessageContext if needed. This context is cleaned up after each replication protocol message. == src/backend/replication/walsender.c 7. + /* + * Initialize the flag again because this streaming may be + * second time. + */ + streamingDoneSending = streamingDoneReceiving = false; Isn't this only possible to be 2nd time because the "reuse tablesync worker" might re-issue a START_REPLICATION again to the same WALSender? So, should this flag reset ONLY be done for the logical replication ('else' part), otherwise it should be asserted false? e.g. Would it be better to be like this? if (cmd->kind == REPLICATION_KIND_PHYSICAL) { Assert(!streamingDoneSending && !streamingDoneReceiving) StartReplication(cmd); } else { /* Reset flags because reusing tablesync workers can mean this is the second time here. */ streamingDoneSending = streamingDoneReceiving = false; StartLogicalReplication(cmd); } -- Kind Regards, Peter Smith. Fujitsu Australia
Re: [PATCH] Reuse Workers and Replication Slots during Logical Replication
Hi, Hayato Kuroda (Fujitsu) , 6 Tem 2023 Per, 12:47 tarihinde şunu yazdı: > > Dear Melih, > > > Thanks for the 0003 patch. But it did not work for me. Can you create > > a subscription successfully with patch 0003 applied? > > I get the following error: " ERROR: table copy could not start > > transaction on publisher: another command is already in progress". > > You got the ERROR when all the patches (0001-0005) were applied, right? > I have focused on 0001 and 0002 only, so I missed something. > If it was not correct, please attach the logfile and test script what you did. Yes, I did get an error with all patches applied. But with only 0001 and 0002, your version seems like working and mine does not. What do you think about combining 0002 and 0003? Or should those stay separate? > Hi, I did a performance testing for v16 patch set. > Results show that patches significantly improves the performance in most > cases. > > # Method > > Following tests were done 10 times per condition, and compared by median. > do_one_test.sh was used for the testing. > > 1. Create tables on publisher > 2. Insert initial data on publisher > 3. Create tables on subscriber > 4. Create a replication slot (mysub_slot) on publisher > 5. Create a publication on publisher > 6. Create tables on subscriber > --- timer on --- > 7. Create subscription with pre-existing replication slot (mysub_slot) > 8. Wait until all srsubstate in pg_subscription_rel becomes 'r' > --- timer off --- > Thanks for taking the time to do testing and sharing the results. This is also how I've been doing the testing since, but the process was half scripted, half manual work. > According to the measurement, we can say following things: > > * In any cases the performance was improved from the HEAD. > * The improvement became more significantly if number of synced tables were > increased. Yes, I believe it becomes more significant when workers spend less time with actually copying data but more with other stuff like launching workers, opening connections etc. > * 0003 basically improved performance from first two patches Agree, 0003 is definitely a good addition which was missing earlier. Thanks, -- Melih Mutlu Microsoft
Re: [PATCH] Reuse Workers and Replication Slots during Logical Replication
Hi, Amit Kapila , 6 Tem 2023 Per, 06:56 tarihinde şunu yazdı: > > On Wed, Jul 5, 2023 at 1:48 AM Melih Mutlu wrote: > > > > Hayato Kuroda (Fujitsu) , 4 Tem 2023 Sal, > > 08:42 tarihinde şunu yazdı: > > > > > But in the later patch the tablesync worker tries to reuse the slot > > > > > during the > > > > > synchronization, so in this case the application_name should be same > > > > > as > > > > slotname. > > > > > > > > > > > > > Fair enough. I am slightly afraid that if we can't show the benefits > > > > with later patches then we may need to drop them but at this stage I > > > > feel we need to investigate why those are not helping? > > > > > > Agreed. Now I'm planning to do performance testing independently. We can > > > discuss > > > based on that or Melih's one. > > > > Here I attached what I use for performance testing of this patch. > > > > I only benchmarked the patch set with reusing connections very roughly > > so far. But seems like it improves quite significantly. For example, > > it took 611 ms to sync 100 empty tables, it was 1782 ms without > > reusing connections. > > First 3 patches from the set actually bring a good amount of > > improvement, but not sure about the later patches yet. > > > > I suggest then we should focus first on those 3, get them committed > and then look at the remaining. > That sounds good. I'll do my best to address any review/concern from reviewers now for the first 3 patches and hopefully those can get committed first. I'll continue working on the remaining patches later. -- Melih Mutlu Microsoft
Re: [PATCH] Reuse Workers and Replication Slots during Logical Replication
Hi, here are some review comments for patch v16-0002. == Commit message 1. This commit allows reusing tablesync workers for syncing more than one table sequentially during their lifetime, instead of exiting after only syncing one table. Before this commit, tablesync workers were capable of syncing only one table. For each table, a new sync worker was launched and that worker would exit when done processing the table. Now, tablesync workers are not limited to processing only one table. When done, they can move to processing another table in the same subscription. ~ IMO that first paragraph can be removed because AFAIK the other paragraphs are saying exactly the same thing but worded differently. == src/backend/replication/logical/tablesync.c 2. General -- for clean_sync_worker and finish_sync_worker TBH, I found the separation of clean_sync_worker() and finish_sync_worker() to be confusing. Can't it be rearranged to keep the same function but just pass a boolean to tell it to exit or not exit? e.g. finish_sync_worker(bool reuse_worker) { ... } ~~~ 3. clean_sync_worker /* - * Commit any outstanding transaction. This is the usual case, unless - * there was nothing to do for the table. + * Commit any outstanding transaction. This is the usual case, unless there + * was nothing to do for the table. */ The word wrap seems OK, except the change seemed unrelated to this patch (??) ~~~ 4. + /* + * Disconnect from publisher. Otherwise reused sync workers causes + * exceeding max_wal_senders + */ Missing period, and not an English sentence. SUGGESTION (??) Disconnect from the publisher otherwise reusing the sync worker can error due to exceeding max_wal_senders. ~~~ 5. finish_sync_worker +/* + * Exit routine for synchronization worker. + */ +void +pg_attribute_noreturn() +finish_sync_worker(void) +{ + clean_sync_worker(); + /* And flush all writes. */ XLogFlush(GetXLogWriteRecPtr()); StartTransactionCommand(); ereport(LOG, - (errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has finished", - MySubscription->name, - get_rel_name(MyLogicalRepWorker->relid; + (errmsg("logical replication table synchronization worker for subscription \"%s\" has finished", + MySubscription->name))); CommitTransactionCommand(); In the original code, the XLogFlush was in a slightly different order than in this refactored code. E.g. it came before signalling the apply worker. Is it OK to be changed? Keeping one function (suggested in #2) can maybe remove this potential issue. == src/backend/replication/logical/worker.c 6. LogicalRepApplyLoop + /* + * apply_dispatch() may have gone into apply_handle_commit() + * which can call process_syncing_tables_for_sync. + * + * process_syncing_tables_for_sync decides whether the sync of + * the current table is completed. If it is completed, + * streaming must be already ended. So, we can break the loop. + */ + if (MyLogicalRepWorker->is_sync_completed) + { + endofstream = true; + break; + } + and + /* + * If is_sync_completed is true, this means that the tablesync + * worker is done with synchronization. Streaming has already been + * ended by process_syncing_tables_for_sync. We should move to the + * next table if needed, or exit. + */ + if (MyLogicalRepWorker->is_sync_completed) + endofstream = true; ~ Instead of those code fragments above assigning 'endofstream' as a side-effect, would it be the same (but tidier) to just modify the other "breaking" condition below: BEFORE: /* Check if we need to exit the streaming loop. */ if (endofstream) break; AFTER: /* Check if we need to exit the streaming loop. */ if (endofstream || MyLogicalRepWorker->is_sync_completed) break; ~~~ 7. LogicalRepApplyLoop + /* + * Tablesync workers should end streaming before exiting the main loop to + * drop replication slot. Only end streaming here for apply workers. + */ + if (!am_tablesync_worker()) + walrcv_endstreaming(LogRepWorkerWalRcvConn, &tli); This comment does not seem very clear. Maybe it can be reworded: SUGGESTION End streaming here only for apply workers. Ending streaming for tablesync workers is deferred until ... because ... ~~~ 8. TablesyncWorkerMain + StartTransactionCommand(); + ereport(LOG, + (errmsg("%s for subscription \"%s\" has moved to sync table \"%s\" with relid %u.", + get_worker_name(), + MySubscription->name, + get_rel_name(MyLogicalRepWorker->relid), + MyLogicalRepWorker->relid))); + CommitTransactionCommand(); The "has moved to..." terminology is unusual. If you say something "will be reused to..." then it matches better the commit message etc. ~~~ 9. + if (!is_table_found) + break; Instead of an infinite loop that is exited by this 'break' it might be better to rearrange the logic slightly so the 'for' loop can exit normally: BEFORE: for (;;) AFTER for (; !done;) == src/include/replication/worker_internal.h 10. XLogRecPtr relstate_lsn; slock_t relmutex; + /* +
RE: [PATCH] Reuse Workers and Replication Slots during Logical Replication
Dear hackers, Hi, I did a performance testing for v16 patch set. Results show that patches significantly improves the performance in most cases. # Method Following tests were done 10 times per condition, and compared by median. do_one_test.sh was used for the testing. 1. Create tables on publisher 2. Insert initial data on publisher 3. Create tables on subscriber 4. Create a replication slot (mysub_slot) on publisher 5. Create a publication on publisher 6. Create tables on subscriber --- timer on --- 7. Create subscription with pre-existing replication slot (mysub_slot) 8. Wait until all srsubstate in pg_subscription_rel becomes 'r' --- timer off --- # Tested sources I used three types of sources * HEAD (f863d82) * HEAD + 0001 + 0002 * HEAD + 0001 + 0002 + 0003 # Tested conditions Following parameters were changed during the measurement. ### table size * empty * around 10kB ### number of tables * 10 * 100 * 1000 * 2000 ### max_sync_workers_per_subscription * 2 * 4 * 8 * 16 ## Results Please see the attached image file. Each cell shows the improvement percentage of measurement comapred with HEAD, HEAD + 0001 + 0002, and HEAD + 0001 + 0002 + 0003. According to the measurement, we can say following things: * In any cases the performance was improved from the HEAD. * The improvement became more significantly if number of synced tables were increased. * 0003 basically improved performance from first two patches * Increasing workers could sometimes lead to lesser performance due to contention. This was occurred when the number of tables were small. Moreover, this was not only happen by patchset - it happened even if we used HEAD. Detailed analysis will be done later. Mored deital, please see the excel file. It contains all the results of measurement. ## Detailed configuration * Powerful machine was used: - Number of CPU: 120 - Memory: 755 GB * Both publisher and subscriber were on the same machine. * Following GUC settings were used for both pub/sub: ``` wal_level = logical shared_buffers = 40GB max_worker_processes = 32 max_parallel_maintenance_workers = 24 max_parallel_workers = 32 synchronous_commit = off checkpoint_timeout = 1d max_wal_size = 24GB min_wal_size = 15GB autovacuum = off max_wal_senders = 200 max_replication_slots = 200 ``` Best Regards, Hayato Kuroda FUJITSU LIMITED perftest_result.xlsx Description: perftest_result.xlsx do_one_test.sh Description: do_one_test.sh
Re: [PATCH] Reuse Workers and Replication Slots during Logical Replication
Hi. Here are some review comments for the patch v16-0001 == Commit message. 1. Also; most of the code shared by both worker types are already combined in LogicalRepApplyLoop(). There is no need to combine the rest in ApplyWorkerMain() anymore. ~ /are already/is already/ /Also;/Also,/ ~~~ 2. This commit introduces TablesyncWorkerMain() as a new entry point for tablesync workers and separates both type of workers from each other. This aims to increase code readability and help to maintain logical replication workers separately. 2a. /This commit/This patch/ ~ 2b. "and separates both type of workers from each other" Maybe that part can all be removed. The following sentence says the same again anyhow. == src/backend/replication/logical/worker.c 3. static void stream_write_change(char action, StringInfo s); static void stream_open_and_write_change(TransactionId xid, char action, StringInfo s); static void stream_close_file(void); +static void set_stream_options(WalRcvStreamOptions *options, +char *slotname, +XLogRecPtr *origin_startpos); ~ Maybe a blank line was needed here because this static should not be grouped with the other functions that are grouped for "Serialize and deserialize changes for a toplevel transaction." comment. ~~~ 4. set_stream_options + /* set_stream_options + * Set logical replication streaming options. + * + * This function sets streaming options including replication slot name and + * origin start position. Workers need these options for logical replication. + */ +static void +set_stream_options(WalRcvStreamOptions *options, The indentation is not right for this function comment. ~~~ 5. set_stream_options + /* + * Even when the two_phase mode is requested by the user, it remains as + * the tri-state PENDING until all tablesyncs have reached READY state. + * Only then, can it become ENABLED. + * + * Note: If the subscription has no tables then leave the state as + * PENDING, which allows ALTER SUBSCRIPTION ... REFRESH PUBLICATION to + * work. + */ + if (MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING && + AllTablesyncsReady()) + options->proto.logical.twophase = true; +} This part of the refactoring seems questionable... IIUC this new function was extracted from code in originally in function ApplyWorkerMain() But in that original code, this fragment above was guarded by the condition if (!am_tablesync_worker()) But now where is that condition? e.g. What is stopping tablesync working from getting into this code it previously would not have executed? ~~~ 6. AbortOutOfAnyTransaction(); - pgstat_report_subscription_error(MySubscription->oid, !am_tablesync_worker()); + pgstat_report_subscription_error(MySubscription->oid, + !am_tablesync_worker()); Does this change have anything to do with this patch? Is it a quirk of running pg_indent? ~~~ 7. run_tablesync_worker Since the stated intent of the patch is the separation of apply and tablesync workers then shouldn't this function belong in the tablesync.c file? ~~~ 8. run_tablesync_worker + * Runs the tablesync worker. + * It starts syncing tables. After a successful sync, sets streaming options + * and starts streaming to catchup. + */ +static void +run_tablesync_worker(WalRcvStreamOptions *options, Nicer to have a blank line after the first sentence of that function comment? ~~~ 9. run_apply_worker +/* + * Runs the leader apply worker. + * It sets up replication origin, streaming options and then starts streaming. + */ +static void +run_apply_worker(WalRcvStreamOptions *options, Nicer to have a blank line after the first sentence of that function comment? ~~~ 10. InitializeLogRepWorker +/* + * Common initialization for logical replication workers; leader apply worker, + * parallel apply worker and tablesync worker. * * Initialize the database connection, in-memory subscription and necessary * config options. */ void -InitializeApplyWorker(void) +InitializeLogRepWorker(void) typo: /workers;/workers:/ ~~~ 11. TablesyncWorkerMain Since the stated intent of the patch is the separation of apply and tablesync workers then shouldn't this function belong in the tablesync.c file? == src/include/replication/worker_internal.h 12. #define isParallelApplyWorker(worker) ((worker)->leader_pid != InvalidPid) +extern void finish_sync_worker(void); ~ I think the macro isParallelApplyWorker is associated with the am_XXX inline functions that follow it, so it doesn’t seem the best place to jam this extern in the middle of that. -- Kind Regards, Peter Smith. Fujitsu Australia
Re: [PATCH] Reuse Workers and Replication Slots during Logical Replication
On Wed, Jul 5, 2023 at 1:48 AM Melih Mutlu wrote: > > Hayato Kuroda (Fujitsu) , 4 Tem 2023 Sal, > 08:42 tarihinde şunu yazdı: > > > > But in the later patch the tablesync worker tries to reuse the slot > > > > during the > > > > synchronization, so in this case the application_name should be same as > > > slotname. > > > > > > > > > > Fair enough. I am slightly afraid that if we can't show the benefits > > > with later patches then we may need to drop them but at this stage I > > > feel we need to investigate why those are not helping? > > > > Agreed. Now I'm planning to do performance testing independently. We can > > discuss > > based on that or Melih's one. > > Here I attached what I use for performance testing of this patch. > > I only benchmarked the patch set with reusing connections very roughly > so far. But seems like it improves quite significantly. For example, > it took 611 ms to sync 100 empty tables, it was 1782 ms without > reusing connections. > First 3 patches from the set actually bring a good amount of > improvement, but not sure about the later patches yet. > I suggest then we should focus first on those 3, get them committed and then look at the remaining. > Amit Kapila , 3 Tem 2023 Pzt, 08:59 tarihinde > şunu yazdı: > > On thinking about this, I think the primary benefit we were expecting > > by saving network round trips for slot drop/create but now that we > > anyway need an extra round trip to establish a snapshot, so such a > > benefit was not visible. This is just a theory so we should validate > > it. The another idea as discussed before [1] could be to try copying > > multiple tables in a single transaction. Now, keeping a transaction > > open for a longer time could have side-effects on the publisher node. > > So, we probably need to ensure that we don't perform multiple large > > syncs and even for smaller tables (and later sequences) perform it > > only for some threshold number of tables which we can figure out by > > some tests. Also, the other safety-check could be that anytime we need > > to perform streaming (sync with apply worker), we won't copy more > > tables in same transaction. > > > > Thoughts? > > Yeah, maybe going to the publisher for creating a slot or only a > snapshot does not really make enough difference. I was hoping that > creating only snapshot by an existing replication slot would help the > performance. I guess I was either wrong or am missing something in the > implementation. > > The tricky bit with keeping a long transaction to copy multiple tables > is deciding how many tables one transaction can copy. > Yeah, I was thinking that we should not allow copying some threshold data in one transaction. After every copy, we will check the size of the table and add it to the previously copied table size in the same transaction. Once the size crosses a certain threshold, we will end the transaction. This may not be a very good scheme but I think it this helps then it would be much simpler than creating-only-snapshot approach. -- With Regards, Amit Kapila.
Re: [PATCH] Reuse Workers and Replication Slots during Logical Replication
Hayato Kuroda (Fujitsu) , 4 Tem 2023 Sal, 08:42 tarihinde şunu yazdı: > > > But in the later patch the tablesync worker tries to reuse the slot > > > during the > > > synchronization, so in this case the application_name should be same as > > slotname. > > > > > > > Fair enough. I am slightly afraid that if we can't show the benefits > > with later patches then we may need to drop them but at this stage I > > feel we need to investigate why those are not helping? > > Agreed. Now I'm planning to do performance testing independently. We can > discuss > based on that or Melih's one. Here I attached what I use for performance testing of this patch. I only benchmarked the patch set with reusing connections very roughly so far. But seems like it improves quite significantly. For example, it took 611 ms to sync 100 empty tables, it was 1782 ms without reusing connections. First 3 patches from the set actually bring a good amount of improvement, but not sure about the later patches yet. Amit Kapila , 3 Tem 2023 Pzt, 08:59 tarihinde şunu yazdı: > On thinking about this, I think the primary benefit we were expecting > by saving network round trips for slot drop/create but now that we > anyway need an extra round trip to establish a snapshot, so such a > benefit was not visible. This is just a theory so we should validate > it. The another idea as discussed before [1] could be to try copying > multiple tables in a single transaction. Now, keeping a transaction > open for a longer time could have side-effects on the publisher node. > So, we probably need to ensure that we don't perform multiple large > syncs and even for smaller tables (and later sequences) perform it > only for some threshold number of tables which we can figure out by > some tests. Also, the other safety-check could be that anytime we need > to perform streaming (sync with apply worker), we won't copy more > tables in same transaction. > > Thoughts? Yeah, maybe going to the publisher for creating a slot or only a snapshot does not really make enough difference. I was hoping that creating only snapshot by an existing replication slot would help the performance. I guess I was either wrong or am missing something in the implementation. The tricky bit with keeping a long transaction to copy multiple tables is deciding how many tables one transaction can copy. Thanks, -- Melih Mutlu Microsoft --- on publisher SELECT 'CREATE TABLE manytables_'||i||'(i int);' FROM generate_series(1, 100) g(i) \gexec SELECT pg_create_logical_replication_slot('mysub_slot', 'pgoutput'); --- on subscriber SELECT 'CREATE TABLE manytables_'||i||'(i int);' FROM generate_series(1, 100) g(i) \gexec CREATE OR REPLACE PROCEDURE log_rep_test(max INTEGER) AS $$ DECLARE counter INTEGER := 1; total_duration INTERVAL := '0'; avg_duration FLOAT := 0.0; start_time TIMESTAMP; end_time TIMESTAMP; BEGIN WHILE counter <= max LOOP EXECUTE 'DROP SUBSCRIPTION IF EXISTS mysub;'; start_time := clock_timestamp(); EXECUTE 'CREATE SUBSCRIPTION mysub CONNECTION ''dbname=postgres port=5432'' PUBLICATION mypub WITH (create_slot=false, slot_name=''mysub_slot'');'; COMMIT; WHILE EXISTS (SELECT 1 FROM pg_subscription_rel WHERE srsubstate != 'r') LOOP COMMIT; END LOOP; end_time := clock_timestamp(); EXECUTE 'ALTER SUBSCRIPTION mysub DISABLE;'; EXECUTE 'ALTER SUBSCRIPTION mysub SET (slot_name = none);'; total_duration := total_duration + (end_time - start_time); counter := counter + 1; END LOOP; IF max > 0 THEN avg_duration := EXTRACT(EPOCH FROM total_duration) / max * 1000; END IF; RAISE NOTICE '%', avg_duration; END; $$ LANGUAGE plpgsql; call log_rep_test(5);
Re: [PATCH] Reuse Workers and Replication Slots during Logical Replication
On Wed, 28 Jun 2023 at 12:02, Hayato Kuroda (Fujitsu) wrote: > > Dear Amit, > > > > > This actually makes sense. I quickly try to do that without adding any > > > > new replication message. As you would expect, it did not work. > > > > I don't really know what's needed to make a connection to last for > > > > more than one iteration. Need to look into this. Happy to hear any > > > > suggestions and thoughts. > > > > > > > It is not clear to me what exactly you tried here which didn't work. > > Can you please explain a bit more? > > Just to confirm, this is not my part. Melih can answer this... > > > > I have analyzed how we handle this. Please see attached the patch (0003) > > > which > > > allows reusing connection. > > > > > > > Why did you change the application name during the connection? > > It was because the lifetime of tablesync worker is longer than slots's one and > tablesync worker creates temporary replication slots many times, per the > target > relation. The name of each slots has relid, so I thought that it was not > suitable. > But in the later patch the tablesync worker tries to reuse the slot during the > synchronization, so in this case the application_name should be same as > slotname. > > I added comment in 0003, and new file 0006 file to use slot name as > application_name > again. Note again that the separation was just for specifying changes, Melih > can > include them to one part of files if needed. Few comments: 1) Should these error messages say "Could not create a snapshot by replication slot": + if (!pubnames_str) + ereport(ERROR, + (errcode(ERRCODE_OUT_OF_MEMORY), /* likely guess */ +errmsg("could not start WAL streaming: %s", + pchomp(PQerrorMessage(conn->streamConn); + pubnames_literal = PQescapeLiteral(conn->streamConn, pubnames_str, + strlen(pubnames_str)); + if (!pubnames_literal) + ereport(ERROR, + (errcode(ERRCODE_OUT_OF_MEMORY), /* likely guess */ +errmsg("could not start WAL streaming: %s", + pchomp(PQerrorMessage(conn->streamConn); + appendStringInfo(&cmd, ", publication_names %s", pubnames_literal); + PQfreemem(pubnames_literal); + pfree(pubnames_str); 2) These checks are present in CreateReplicationSlot too, can we have a common function to check these for both CreateReplicationSlot and CreateReplicationSnapshot: + if (!IsTransactionBlock()) + ereport(ERROR, + (errmsg("%s must be called inside a transaction", + "CREATE_REPLICATION_SNAPSHOT ..."))); + + if (XactIsoLevel != XACT_REPEATABLE_READ) + ereport(ERROR, + (errmsg("%s must be called in REPEATABLE READ isolation mode transaction", + "CREATE_REPLICATION_SNAPSHOT ..."))); + + if (!XactReadOnly) + ereport(ERROR, + (errmsg("%s must be called in a read only transaction", + "CREATE_REPLICATION_SNAPSHOT ..."))); + + if (FirstSnapshotSet) + ereport(ERROR, + (errmsg("%s must be called before any query", + "CREATE_REPLICATION_SNAPSHOT ..."))); + + if (IsSubTransaction()) + ereport(ERROR, + (errmsg("%s must not be called in a subtransaction", + "CREATE_REPLICATION_SNAPSHOT ..."))); 3) Probably we can add the function header at this point of time: +/* + * TODO + */ +static void +libpqrcv_slot_snapshot(WalReceiverConn *conn, + char *slotname, + const WalRcvStreamOptions *options, + XLogRecPtr *lsn) 4) Either or relation name or relid should be sufficient here, no need to print both: StartTransactionCommand(); + ereport(LOG, + (errmsg("%s for subscription \"%s\" has moved to sync table \"%s\" with relid %u.", + get_worker_name(), + MySubscription->name, + get_rel_name(MyLogicalRepWorker->relid), + MyLogicalRepWorker->relid))); + CommitTransactionCommand(); 5) Why is this check of logicalrep_worker_find is required required, will it not be sufficient to pick the relations that are in SUBREL_STATE_INIT state? + /* + * Pick the table for the next run if it is not already picked up + * by another worker. + * + * Take exclusive lock to prevent any other sync worker from picking + * the same table. + */ + LWLockAcquire(LogicalRepWorkerLock, LW_EXCLU
Re: [PATCH] Reuse Workers and Replication Slots during Logical Replication
On Mon, Jul 3, 2023 at 9:42 AM Amit Kapila wrote: > > On Wed, Jun 28, 2023 at 12:02 PM Hayato Kuroda (Fujitsu) > wrote: > > > But in the later patch the tablesync worker tries to reuse the slot during > > the > > synchronization, so in this case the application_name should be same as > > slotname. > > > > Fair enough. I am slightly afraid that if we can't show the benefits > with later patches then we may need to drop them but at this stage I > feel we need to investigate why those are not helping? > On thinking about this, I think the primary benefit we were expecting by saving network round trips for slot drop/create but now that we anyway need an extra round trip to establish a snapshot, so such a benefit was not visible. This is just a theory so we should validate it. The another idea as discussed before [1] could be to try copying multiple tables in a single transaction. Now, keeping a transaction open for a longer time could have side-effects on the publisher node. So, we probably need to ensure that we don't perform multiple large syncs and even for smaller tables (and later sequences) perform it only for some threshold number of tables which we can figure out by some tests. Also, the other safety-check could be that anytime we need to perform streaming (sync with apply worker), we won't copy more tables in same transaction. Thoughts? [1] - https://www.postgresql.org/message-id/CAGPVpCRWEVhXa7ovrhuSQofx4to7o22oU9iKtrOgAOtz_%3DY6vg%40mail.gmail.com -- With Regards, Amit Kapila.
Re: [PATCH] Reuse Workers and Replication Slots during Logical Replication
On Wed, Jun 28, 2023 at 12:02 PM Hayato Kuroda (Fujitsu) wrote: > > > > I have analyzed how we handle this. Please see attached the patch (0003) > > > which > > > allows reusing connection. > > > > > > > Why did you change the application name during the connection? > > It was because the lifetime of tablesync worker is longer than slots's one and > tablesync worker creates temporary replication slots many times, per the > target > relation. The name of each slots has relid, so I thought that it was not > suitable. > Okay, but let's try to give a unique application name to each tablesync worker for the purpose of pg_stat_activity and synchronous replication (as mentioned in existing comments as well). One idea is to generate a name like pg__sync_ but feel free to suggest if you have any better ideas. > But in the later patch the tablesync worker tries to reuse the slot during the > synchronization, so in this case the application_name should be same as > slotname. > Fair enough. I am slightly afraid that if we can't show the benefits with later patches then we may need to drop them but at this stage I feel we need to investigate why those are not helping? -- With Regards, Amit Kapila.
Re: [PATCH] Reuse Workers and Replication Slots during Logical Replication
On Tue, Jun 27, 2023 at 1:12 PM Hayato Kuroda (Fujitsu) wrote: > > > This actually makes sense. I quickly try to do that without adding any > > new replication message. As you would expect, it did not work. > > I don't really know what's needed to make a connection to last for > > more than one iteration. Need to look into this. Happy to hear any > > suggestions and thoughts. > It is not clear to me what exactly you tried here which didn't work. Can you please explain a bit more? > I have analyzed how we handle this. Please see attached the patch (0003) which > allows reusing connection. > Why did you change the application name during the connection? -- With Regards, Amit Kapila.
RE: [PATCH] Reuse Workers and Replication Slots during Logical Replication
Dear Melih, Thanks for updating the patch. Followings are my comments. Note that some lines exceeds 80 characters and some other lines seem too short. And comments about coding conventions were skipped. 0001 01. logicalrep_worker_launch() ``` if (is_parallel_apply_worker) + { snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ParallelApplyWorkerMain"); - else - snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ApplyWorkerMain"); - - if (OidIsValid(relid)) snprintf(bgw.bgw_name, BGW_MAXLEN, -"logical replication worker for subscription %u sync %u", subid, relid); - else if (is_parallel_apply_worker) +"logical replication parallel apply worker for subscription %u", subid); snprintf(bgw.bgw_name, BGW_MAXLEN, "logical replication parallel apply worker for subscription %u", subid); ``` Latter snprintf(bgw.bgw_name...) should be snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication worker"). 02. ApplyWorkerMain ``` /* * Setup callback for syscache so that we know when something changes in -* the subscription relation state. +* the subscription relation state. Do this outside the loop to avoid +* exceeding MAX_SYSCACHE_CALLBACKS */ ``` I'm not sure this change is really needed. CacheRegisterSyscacheCallback() must be outside the loop to avoid duplicated register, and it seems trivial. 0002 03. TablesyncWorkerMain() Regarding the inner loop, the exclusive lock is acquired even if the rstate is SUBREL_STATE_SYNCDONE. Moreover, palloc() and memcpy() for rstate seemsed not needed. How about following? ``` for (;;) { List *rstates; - SubscriptionRelState *rstate; ListCell *lc; ... - rstate = (SubscriptionRelState *) palloc(sizeof(SubscriptionRelState)); foreach(lc, rstates) { - memcpy(rstate, lfirst(lc), sizeof(SubscriptionRelState)); + SubscriptionRelState *rstate = + (SubscriptionRelState *) lfirst(lc); + + if (rstate->state == SUBREL_STATE_SYNCDONE) + continue; /* - * Pick the table for the next run if it is not already picked up - * by another worker. - * - * Take exclusive lock to prevent any other sync worker from picking - * the same table. - */ +* Take exclusive lock to prevent any other sync worker from +* picking the same table. +*/ LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE); - if (rstate->state != SUBREL_STATE_SYNCDONE && - !logicalrep_worker_find(MySubscription->oid, rstate->relid, false)) + + /* +* Pick the table for the next run if it is not already picked up +* by another worker. +*/ + if (!logicalrep_worker_find(MySubscription->oid, + rstate->relid, false)) ``` 04. TablesyncWorkerMain I think rstates should be pfree'd at the end of the outer loop, but it's OK if other parts do not. 05. repsponse for for post > I tried to move the logicalrep_worker_wakeup call from clean_sync_worker (end of an iteration) to finish_sync_worker (end of sync worker). I made table sync much slower for some reason, then I reverted that change. Maybe I should look a bit more into the reason why that happened some time. > I want to see the testing method to reproduce the same issue, could you please share it to -hackers? 0003, 0004 I did not checked yet but I could say same as above: I want to see the testing method to reproduce the same issue. Could you please share it to -hackers? My previous post (an approach for reuse connection) may help the performance. Best Regards, Hayato Kuroda FUJITSU LIMITED
Re: [PATCH] Reuse Workers and Replication Slots during Logical Replication
On Fri, Jun 23, 2023 at 7:03 PM Melih Mutlu wrote: > > You can find the updated patchset attached. > I worked to address the reviews and made some additional changes. > > Let me first explain the new patchset. > 0001: Refactors the logical replication code, mostly worker.c and > tablesync.c. Although this patch makes it easier to reuse workers, I > believe that it's useful even by itself without other patches. It does > not improve performance or anything but aims to increase readability > and such. > 0002: This is only to reuse worker processes, everything else stays > the same (replication slots/origins etc.). > 0003: Adds a new command for streaming replication protocol to create > a snapshot by an existing replication slot. > 0004: Reuses replication slots/origins together with workers. > > Even only 0001 and 0002 are enough to improve table sync performance > at the rates previously shared on this thread. This also means that > currently 0004 (reusing replication slots/origins) does not improve as > much as I would expect, even though it does not harm either. > I just wanted to share what I did so far, while I'm continuing to > investigate it more to see what I'm missing in patch 0004. > I think the reason why you don't see the benefit of the 0004 patches is that it still pays the cost of disconnect/connect and we haven't saved much on network transfer costs because of the new snapshot you are creating in patch 0003. Is it possible to avoid disconnect/connect each time the patch needs to reuse the same tablesync worker? Once, we do that and save the cost of drop_slot and associated network round trip, you may see the benefit of 0003 and 0004 patches. -- With Regards, Amit Kapila.
Re: [PATCH] Reuse Workers and Replication Slots during Logical Replication
On Fri, Jun 23, 2023 at 11:50 PM Melih Mutlu wrote: > > > src/backend/replication/logical/worker.c > > > > 10. General -- run_tablesync_worker, TablesyncWorkerMain > > > > IMO these functions would be more appropriately reside in the > > tablesync.c instead of the (common) worker.c. Was there some reason > > why they cannot be put there? > > I'm not really against moving those functions to tablesync.c. But > what's not clear to me is worker.c. Is it the places to put common > functions for all log. rep. workers? Then, what about apply worker? > Should we consider a separate file for apply worker too? IIUC - tablesync.c = for tablesync only - applyparallelworker = for parallel apply worker only - worker.c = for both normal apply worker, plus "common" worker code Regarding making another file (e.g. applyworker.c). It sounds sensible, but I guess you would need to first demonstrate the end result will be much cleaner to get support for such a big refactor. -- Kind Regards, Peter Smith. Fujitsu Australia
Re: [PATCH] Reuse Workers and Replication Slots during Logical Replication
Hi Peter, Thanks for your reviews. I tried to apply most of them. I just have some comments below for some of them. Peter Smith , 14 Haz 2023 Çar, 08:45 tarihinde şunu yazdı: > > 9. process_syncing_tables_for_sync > > @@ -378,7 +387,13 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn) > */ > replorigin_drop_by_name(originname, true, false); > > - finish_sync_worker(); > + /* > + * Sync worker is cleaned at this point. It's ready to sync next table, > + * if needed. > + */ > + SpinLockAcquire(&MyLogicalRepWorker->relmutex); > + MyLogicalRepWorker->ready_to_reuse = true; > + SpinLockRelease(&MyLogicalRepWorker->relmutex); > > 9a. > I did not quite follow the logic. It says "Sync worker is cleaned at > this point", but who is doing that? -- more details are needed. But, > why not just call clean_sync_worker() right here like it use to call > finish_sync_worker? I agree that these explanations at places where the worker decides to not continue with the current table were confusing. Even the name of ready_to_reuse was misleading. I renamed it and tried to improve comments in such places. Can you please check if those make more sense now? > == > src/backend/replication/logical/worker.c > > 10. General -- run_tablesync_worker, TablesyncWorkerMain > > IMO these functions would be more appropriately reside in the > tablesync.c instead of the (common) worker.c. Was there some reason > why they cannot be put there? I'm not really against moving those functions to tablesync.c. But what's not clear to me is worker.c. Is it the places to put common functions for all log. rep. workers? Then, what about apply worker? Should we consider a separate file for apply worker too? Thanks, -- Melih Mutlu Microsoft
Re: [PATCH] Reuse Workers and Replication Slots during Logical Replication
Hi, Thanks for your reviews. Hayato Kuroda (Fujitsu) , 13 Haz 2023 Sal, 13:06 tarihinde şunu yazdı: > 01. general > > Why do tablesync workers have to disconnect from publisher for every > iterations? > I think connection initiation overhead cannot be negligible in the postgres's > basic > architecture. I have not checked yet, but could we add a new replication > message > like STOP_STREAMING or CLEANUP? Or, engineerings for it is quite larger than > the benefit? This actually makes sense. I quickly try to do that without adding any new replication message. As you would expect, it did not work. I don't really know what's needed to make a connection to last for more than one iteration. Need to look into this. Happy to hear any suggestions and thoughts. > The sync worker sends a signal to its leader per the iteration, but it may be > too > often. Maybe it is added for changing the rstate to READY, however, it is OK > to > change it when the next change have come because > should_apply_changes_for_rel() > returns true even if rel->state == SUBREL_STATE_SYNCDONE. I think the > notification > should be done only at the end of sync workers. How do you think? I tried to move the logicalrep_worker_wakeup call from clean_sync_worker (end of an iteration) to finish_sync_worker (end of sync worker). I made table sync much slower for some reason, then I reverted that change. Maybe I should look a bit more into the reason why that happened some time. Thanks, -- Melih Mutlu Microsoft
Re: [PATCH] Reuse Workers and Replication Slots during Logical Replication
Here are some review comments for the patch v2-0001. == Commit message 1. General Better to use consistent terms in this message. Either "relations" or "tables" -- not a mixture of both. ~~~ 2. Before this commit, tablesync workers were capable of syncing only one relation. For each table, a new sync worker was launched and the worker would exit when the worker is done with the current table. ~ SUGGESTION (2nd sentence) For each table, a new sync worker was launched and that worker would exit when done processing the table. ~~~ 3. Now, tablesync workers are not only limited with one relation and can move to another relation in the same subscription. This reduces the overhead of launching a new background worker and exiting from that worker for each relation. ~ SUGGESTION (1st sentence) Now, tablesync workers are not limited to processing only one relation. When done, they can move to processing another relation in the same subscription. ~~~ 4. A new tablesync worker gets launched only if the number of tablesync workers for the subscription does not exceed max_sync_workers_per_subscription. If there is a table needs to be synced, a tablesync worker picks that up and syncs it.The worker continues to picking new tables to sync until there is no table left for synchronization in the subscription. ~ This seems to be missing the point that only "available" workers go looking for more tables to process. Maybe reword something like below: SUGGESTION If there is a table that needs to be synced, an "available" tablesync worker picks up that table and syncs it. Each tablesync worker continues to pick new tables to sync until there are no tables left requiring synchronization. If there was no "available" worker to process the table, then a new tablesync worker will be launched, provided the number of tablesync workers for the subscription does not exceed max_sync_workers_per_subscription. == src/backend/replication/logical/launcher.c 5. logicalrep_worker_launch @@ -460,8 +461,10 @@ retry: if (is_parallel_apply_worker) snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ParallelApplyWorkerMain"); - else + else if (!OidIsValid(relid)) snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ApplyWorkerMain"); + else + snprintf(bgw.bgw_function_name, BGW_MAXLEN, "TablesyncWorkerMain"); if (OidIsValid(relid)) snprintf(bgw.bgw_name, BGW_MAXLEN, ~ 5a. I felt at least these conditions can be rearranged, so you can use OidIsValid(relid) instead of !OidIsValid(relid). ~ 5b. Probably it can all be simplified, if you are happy to do it in one line: snprintf(bgw.bgw_function_name, BGW_MAXLEN, OidIsValid(relid) ? "TablesyncWorkerMain" : is_parallel_apply_worker ? "ParallelApplyWorkerMain" : "ApplyWorkerMain"); == src/backend/replication/logical/tablesync.c 6. finish_sync_worker This function is removed/renamed but there are still commenting in this file referring to 'finish-sync_worker' ~~~ 7. clean_sync_worker I agree with comment from Shi-san. There should still be logging somewhere that say this tablesync worker has completed the processing of the current table. ~~~ 8. sync_worker_exit There is inconsistent function naming for clean_sync_worker versus sync_worker_exit. How about: clean_sync_worker/exit_sync_worker? Or: sync_worker_clean/sync_worker_exit? ~~~ 9. process_syncing_tables_for_sync @@ -378,7 +387,13 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn) */ replorigin_drop_by_name(originname, true, false); - finish_sync_worker(); + /* + * Sync worker is cleaned at this point. It's ready to sync next table, + * if needed. + */ + SpinLockAcquire(&MyLogicalRepWorker->relmutex); + MyLogicalRepWorker->ready_to_reuse = true; + SpinLockRelease(&MyLogicalRepWorker->relmutex); 9a. I did not quite follow the logic. It says "Sync worker is cleaned at this point", but who is doing that? -- more details are needed. But, why not just call clean_sync_worker() right here like it use to call finish_sync_worker? ~ 9b. Shouldn't this "ready_to_use" flag be assigned within the clean_sync_worker() function, since that is the function making is clean for next re-use. The function comment even says so: "Prepares the synchronization worker for reuse or exit." == src/backend/replication/logical/worker.c 10. General -- run_tablesync_worker, TablesyncWorkerMain IMO these functions would be more appropriately reside in the tablesync.c instead of the (common) worker.c. Was there some reason why they cannot be put there? ~~~ 11. LogicalRepApplyLoop + /* + * apply_dispatch() may have gone into apply_handle_commit() + * which can go into process_syncing_tables_for_sync early. + * Before we were able to reuse tablesync workers, that + * process_syncing_tables_for_sync call would exit the worker + * instead of preparing for reuse. Now that tablesync workers + * can be reused and process_syncing_tables_for_sync is not + * responsible for exiting. We need to take care of memory + * c
RE: [PATCH] Reuse Workers and Replication Slots during Logical Replication
Dear Melih, Thank you for making the patch! I'm also interested in the patchset. Here are the comments for 0001. Some codes are not suit for our coding conventions, but followings do not contain them because patches seems in the early statge. Moreover, 0003 needs rebase. 01. general Why do tablesync workers have to disconnect from publisher for every iterations? I think connection initiation overhead cannot be negligible in the postgres's basic architecture. I have not checked yet, but could we add a new replication message like STOP_STREAMING or CLEANUP? Or, engineerings for it is quite larger than the benefit? 02. logicalrep_worker_launch() ``` - else + else if (!OidIsValid(relid)) snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ApplyWorkerMain"); + else + snprintf(bgw.bgw_function_name, BGW_MAXLEN, "TablesyncWorkerMain"); ``` You changed the entry point of tablesync workers, but bgw_type is still the same. Do you have any decisions about it? 03. process_syncing_tables_for_sync() ``` + /* +* Sync worker is cleaned at this point. It's ready to sync next table, +* if needed. +*/ + SpinLockAcquire(&MyLogicalRepWorker->relmutex); + MyLogicalRepWorker->ready_to_reuse = true; + SpinLockRelease(&MyLogicalRepWorker->relmutex); ``` Maybe acquiring the lock for modifying ready_to_reuse is not needed because all the sync workers check only the own attribute. Moreover, other processes do not read. 04. sync_worker_exit() ``` +/* + * Exit routine for synchronization worker. + */ +void +pg_attribute_noreturn() +sync_worker_exit(void) ``` I think we do not have to rename the function from finish_sync_worker(). 05. LogicalRepApplyLoop() ``` + if (MyLogicalRepWorker->ready_to_reuse) + { + endofstream = true; + } ``` We should add comments here to clarify the reason. 06. stream_build_options() I think we can set twophase attribute here. 07. TablesyncWorkerMain() ``` + ListCell *lc; ``` This variable should be declared inside the loop. 08. TablesyncWorkerMain() ``` + /* +* If a relation with INIT state is assigned, clean up the worker for +* the next iteration. +* +* If there is no more work left for this worker, break the loop to +* exit. +*/ + if ( MyLogicalRepWorker->relstate == SUBREL_STATE_INIT) + clean_sync_worker(); ``` The sync worker sends a signal to its leader per the iteration, but it may be too often. Maybe it is added for changing the rstate to READY, however, it is OK to change it when the next change have come because should_apply_changes_for_rel() returns true even if rel->state == SUBREL_STATE_SYNCDONE. I think the notification should be done only at the end of sync workers. How do you think? Best Regards, Hayato Kuroda FUJITSU LIMITED
RE: [PATCH] Reuse Workers and Replication Slots during Logical Replication
On Thu, Jun 1, 2023 6:54 PM Melih Mutlu wrote: > > Hi, > > I rebased the patch and addressed the following reviews. > Thanks for updating the patch. Here are some comments on 0001 patch. 1. - ereport(LOG, - (errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has finished", - MySubscription->name, - get_rel_name(MyLogicalRepWorker->relid; Could we move this to somewhere else instead of removing it? 2. + if (!OidIsValid(originid)) + originid = replorigin_create(originname); + replorigin_session_setup(originid, 0); + replorigin_session_origin = originid; + *origin_startpos = replorigin_session_get_progress(false); + CommitTransactionCommand(); + + /* Is the use of a password mandatory? */ + must_use_password = MySubscription->passwordrequired && + !superuser_arg(MySubscription->owner); + LogRepWorkerWalRcvConn = walrcv_connect(MySubscription->conninfo, true, + must_use_password, + MySubscription->name, &err); It seems that there is a problem when refactoring. See commit e7e7da2f8d5. 3. + /* Set this to false for safety, in case we're already reusing the worker */ + MyLogicalRepWorker->ready_to_reuse = false; + I am not sure do we need to lock when setting it. 4. + /* +* Allocate the origin name in long-lived context for error context +* message. +*/ + StartTransactionCommand(); + ReplicationOriginNameForLogicalRep(MySubscription->oid, + MyLogicalRepWorker->relid, + originname, + originname_size); + CommitTransactionCommand(); Do we need the call to StartTransactionCommand() and CommitTransactionCommand() here? Besides, the comment here is the same as the comment atop set_apply_error_context_origin(), do we need it? 5. I saw a segmentation fault when debugging. It happened when calling sync_worker_exit() called (see the code below in LogicalRepSyncTableStart()). In the case that this is not the first table the worker synchronizes, clean_sync_worker() has been called before (in TablesyncWorkerMain()), and LogRepWorkerWalRcvConn has been set to NULL. Then, a segmentation fault happened because LogRepWorkerWalRcvConn is a null pointer. switch (relstate) { case SUBREL_STATE_SYNCDONE: case SUBREL_STATE_READY: case SUBREL_STATE_UNKNOWN: sync_worker_exit(); /* doesn't return */ } Here is the backtrace. #0 0x7fc8a8ce4c95 in libpqrcv_disconnect (conn=0x0) at libpqwalreceiver.c:757 #1 0x0092b8c0 in clean_sync_worker () at tablesync.c:150 #2 0x0092b8ed in sync_worker_exit () at tablesync.c:164 #3 0x0092d8f6 in LogicalRepSyncTableStart (origin_startpos=0x7ffd50f30f08) at tablesync.c:1293 #4 0x00934f76 in start_table_sync (origin_startpos=0x7ffd50f30f08, myslotname=0x7ffd50f30e80) at worker.c:4457 #5 0x0093513b in run_tablesync_worker (options=0x7ffd50f30ec0, slotname=0x0, originname=0x7ffd50f30f10 "pg_16394_16395", originname_size=64, origin_startpos=0x7ffd50f30f08) at worker.c:4532 #6 0x00935a3a in TablesyncWorkerMain (main_arg=1) at worker.c:4853 #7 0x008e97f6 in StartBackgroundWorker () at bgworker.c:864 #8 0x008f350b in do_start_bgworker (rw=0x12fc1a0) at postmaster.c:5762 #9 0x008f38b7 in maybe_start_bgworkers () at postmaster.c:5986 #10 0x008f2975 in process_pm_pmsignal () at postmaster.c:5149 #11 0x008ee98a in ServerLoop () at postmaster.c:1770 #12 0x008ee3bb in PostmasterMain (argc=3, argv=0x12c4af0) at postmaster.c:1463 #13 0x007b6d3a in main (argc=3, argv=0x12c4af0) at main.c:198 The steps to reproduce: Worker1, in TablesyncWorkerMain(), the relstate of new table to sync (obtained by GetSubscriptionRelations) is SUBREL_STATE_INIT, and in the foreach loop, before the following Check (it needs a breakpoint before locking), LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE); if (rstate->state != SUBREL_STATE_SYNCDONE && !logicalrep_worker_find(MySubscription->oid, rstate->relid, false)) { /* Update worker state for the next table */ MyLogicalRepWorker->relid = rstate->relid; MyLogicalRepWorker->relsta
Re: [PATCH] Reuse Workers and Replication Slots during Logical Replication
On Thu, Jun 1, 2023 at 7:22 AM Melih Mutlu wrote: > > Hi Peter, > > Peter Smith , 26 May 2023 Cum, 10:30 tarihinde > şunu yazdı: > > > > On Thu, May 25, 2023 at 6:59 PM Melih Mutlu wrote: > > Yes, I was mostly referring to the same as point 1 below about patch > > 0001. I guess I just found the concept of mixing A) launching TSW (via > > apply worker) with B) reassigning TSW to another relation (by the TSW > > battling with its peers) to be a bit difficult to understand. I > > thought most of the refactoring seemed to arise from choosing to do it > > that way. > > No, the refactoring is not related to the way of assigning a new > table. In fact, the patch did not include such refactoring a couple > versions earlier [1] and was still assigning tables the same way. It > was suggested here [2]. Then, I made the patch 0001 which includes > some refactoring and only reuses the worker and nothing else. Also I > find it more understandable this way, maybe it's a bit subjective. > > I feel that logical replication related files are getting more and > more complex and hard to understand with each change. IMHO, even > without reusing anything, those need some refactoring anyway. But for > this patch, refactoring some places made it simpler to reuse workers > and/or replication slots, regardless of how tables are assigned to > TSW's. If refactoring is wanted anyway (regardless of the chosen "reuse" logic), then will it be better to split off a separate 0001 patch just to get that part out of the way first? -- Kind Regards, Peter Smith. Fujitsu Australia
Re: [PATCH] Reuse Workers and Replication Slots during Logical Replication
Hi Peter, Peter Smith , 26 May 2023 Cum, 10:30 tarihinde şunu yazdı: > > On Thu, May 25, 2023 at 6:59 PM Melih Mutlu wrote: > Yes, I was mostly referring to the same as point 1 below about patch > 0001. I guess I just found the concept of mixing A) launching TSW (via > apply worker) with B) reassigning TSW to another relation (by the TSW > battling with its peers) to be a bit difficult to understand. I > thought most of the refactoring seemed to arise from choosing to do it > that way. No, the refactoring is not related to the way of assigning a new table. In fact, the patch did not include such refactoring a couple versions earlier [1] and was still assigning tables the same way. It was suggested here [2]. Then, I made the patch 0001 which includes some refactoring and only reuses the worker and nothing else. Also I find it more understandable this way, maybe it's a bit subjective. I feel that logical replication related files are getting more and more complex and hard to understand with each change. IMHO, even without reusing anything, those need some refactoring anyway. But for this patch, refactoring some places made it simpler to reuse workers and/or replication slots, regardless of how tables are assigned to TSW's. > +1. I think it would be nice to see POC of both ways for benchmark > comparison because IMO performance is not the only consideration -- > unless there is an obvious winner, then they need to be judged also by > the complexity of the logic, the amount of code that needed to be > refactored, etc. Will try to do that. But, like I mentioned above, I don't think that such a change would reduce the complexity or number of lines changed. > But it is difficult to get an overall picture of the behaviour. Mostly > when benchmarks were posted you hold one variable fixed and show only > one other varying. It always leaves me wondering -- what about not > empty tables, or what about different numbers of tables etc. Is it > possible to make some script to gather a bigger set of results so we > can see everything at once? Perhaps then it will become clear there is > some "sweet spot" where the patch is really good but beyond that it > degrades (actually, who knows what it might show). I actually shared the benchmarks with different numbers of tables and sizes. But those were all with 2 workers. I guess you want a similar benchmark with different numbers of workers. Will work on this and share soon. [1] https://www.postgresql.org/message-id/CAGPVpCQmEE8BygXr%3DHi2N2t2kOE%3DPJwofn9TX0J9J4crjoXarQ%40mail.gmail.com [2] https://www.postgresql.org/message-id/CAAKRu_YKGyF%2BsvRQqe1th-mG9xLdzneWgh9H1z1DtypBkawkkw%40mail.gmail.com Thanks, -- Melih Mutlu Microsoft
Re: [PATCH] Reuse Workers and Replication Slots during Logical Replication
On Thu, May 25, 2023 at 6:59 PM Melih Mutlu wrote: > > Hi, > > > Peter Smith , 24 May 2023 Çar, 05:59 tarihinde şunu > yazdı: >> >> Hi, and thanks for the patch! It is an interesting idea. >> >> I have not yet fully read this thread, so below are only my first >> impressions after looking at patch 0001. Sorry if some of these were >> already discussed earlier. >> >> TBH the patch "reuse-workers" logic seemed more complicated than I had >> imagined it might be. > > > If you mean patch 0001 by the patch "reuse-workers", most of the complexity > comes with some refactoring to split apply worker and tablesync worker paths. > [1] > If you mean the whole patch set, then I believe it's because reusing > replication slots also requires having a proper snapshot each time the worker > moves to a new table. [2] > Yes, I was mostly referring to the same as point 1 below about patch 0001. I guess I just found the concept of mixing A) launching TSW (via apply worker) with B) reassigning TSW to another relation (by the TSW battling with its peers) to be a bit difficult to understand. I thought most of the refactoring seemed to arise from choosing to do it that way. >> >> >> 1. >> IIUC with patch 0001, each/every tablesync worker (a.k.a. TSW) when it >> finishes dealing with one table then goes looking to find if there is >> some relation that it can process next. So now every TSW has a loop >> where it will fight with every other available TSW over who will get >> to process the next relation. >> >> Somehow this seems all backwards to me. Isn't it strange for the TSW >> to be the one deciding what relation it would deal with next? >> >> IMO it seems more natural to simply return the finished TSW to some >> kind of "pool" of available workers and the main Apply process can >> just grab a TSW from that pool instead of launching a brand new one in >> the existing function process_syncing_tables_for_apply(). Or, maybe >> those "available" workers can be returned to a pool maintained within >> the launcher.c code, which logicalrep_worker_launch() can draw from >> instead of launching a whole new process? >> >> (I need to read the earlier posts to see if these options were already >> discussed and rejected) > > > I think ([3]) relying on a single apply worker for the assignment of several > tablesync workers might bring some overhead, it's possible that some > tablesync workers wait in idle until the apply worker assigns them something. > OTOH yes, the current approach makes tablesync workers race for a new table > to sync. Yes, it might be slower than the 'patched' code because "available" workers might be momentarily idle while they wait to be re-assigned to the next relation. We would need to try it to find out. > TBF both ways might be worth discussing/investigating more, before deciding > which way to go. +1. I think it would be nice to see POC of both ways for benchmark comparison because IMO performance is not the only consideration -- unless there is an obvious winner, then they need to be judged also by the complexity of the logic, the amount of code that needed to be refactored, etc. > >> >> 2. >> AFAIK the thing that identifies a tablesync worker is the fact that >> only TSW will have a 'relid'. >> >> But it feels very awkward to me to have a TSW marked as "available" >> and yet that LogicalRepWorker must still have some OLD relid field >> value lurking (otherwise it will forget that it is a "tablesync" >> worker!). >> >> IMO perhaps it is time now to introduce some enum 'type' to the >> LogicalRepWorker. Then an "in_use" type=TSW would have a valid 'relid' >> whereas an "available" type=TSW would have relid == InvalidOid. > > > Hmm, relid will be immediately updated when the worker moves to a new table. > And the time between finishing sync of a table and finding a new table to > sync should be minimal. I'm not sure how having an old relid for such a small > amount of time can do any harm. There is no "harm", but it just didn't feel right to make the LogicalRepWorker to transition through some meaningless state ("available" for re-use but still assigned some relid), just because it was easy to do it that way. I think it is more natural for the 'relid' to be valid only when it is valid for the worker and to be InvalidOid when it is not valid. --- Maybe this gripe would become more apparent if the implementation use the "free-list" idea because then you would have a lot of bogus relids assigned to the workers of that list for longer than just a moment. > >> >> 3. >> Maybe I am mistaken, but it seems the benchmark results posted are >> only using quite a small/default values for >> "max_sync_workers_per_subscription", so I wondered how those results >> are affected by increasing that GUC. I think having only very few >> workers would cause more sequential processing, so conveniently the >> effect of the patch avoiding re-launch might be seen in the best >> possible light. OTOH, using more TSW in
Re: [PATCH] Reuse Workers and Replication Slots during Logical Replication
Hi, Peter Smith , 24 May 2023 Çar, 05:59 tarihinde şunu yazdı: > Hi, and thanks for the patch! It is an interesting idea. > > I have not yet fully read this thread, so below are only my first > impressions after looking at patch 0001. Sorry if some of these were > already discussed earlier. > > TBH the patch "reuse-workers" logic seemed more complicated than I had > imagined it might be. > If you mean patch 0001 by the patch "reuse-workers", most of the complexity comes with some refactoring to split apply worker and tablesync worker paths. [1] If you mean the whole patch set, then I believe it's because reusing replication slots also requires having a proper snapshot each time the worker moves to a new table. [2] > > 1. > IIUC with patch 0001, each/every tablesync worker (a.k.a. TSW) when it > finishes dealing with one table then goes looking to find if there is > some relation that it can process next. So now every TSW has a loop > where it will fight with every other available TSW over who will get > to process the next relation. > > Somehow this seems all backwards to me. Isn't it strange for the TSW > to be the one deciding what relation it would deal with next? > > IMO it seems more natural to simply return the finished TSW to some > kind of "pool" of available workers and the main Apply process can > just grab a TSW from that pool instead of launching a brand new one in > the existing function process_syncing_tables_for_apply(). Or, maybe > those "available" workers can be returned to a pool maintained within > the launcher.c code, which logicalrep_worker_launch() can draw from > instead of launching a whole new process? > > (I need to read the earlier posts to see if these options were already > discussed and rejected) > I think ([3]) relying on a single apply worker for the assignment of several tablesync workers might bring some overhead, it's possible that some tablesync workers wait in idle until the apply worker assigns them something. OTOH yes, the current approach makes tablesync workers race for a new table to sync. TBF both ways might be worth discussing/investigating more, before deciding which way to go. > 2. > AFAIK the thing that identifies a tablesync worker is the fact that > only TSW will have a 'relid'. > > But it feels very awkward to me to have a TSW marked as "available" > and yet that LogicalRepWorker must still have some OLD relid field > value lurking (otherwise it will forget that it is a "tablesync" > worker!). > > IMO perhaps it is time now to introduce some enum 'type' to the > LogicalRepWorker. Then an "in_use" type=TSW would have a valid 'relid' > whereas an "available" type=TSW would have relid == InvalidOid. > Hmm, relid will be immediately updated when the worker moves to a new table. And the time between finishing sync of a table and finding a new table to sync should be minimal. I'm not sure how having an old relid for such a small amount of time can do any harm. > 3. > Maybe I am mistaken, but it seems the benchmark results posted are > only using quite a small/default values for > "max_sync_workers_per_subscription", so I wondered how those results > are affected by increasing that GUC. I think having only very few > workers would cause more sequential processing, so conveniently the > effect of the patch avoiding re-launch might be seen in the best > possible light. OTOH, using more TSW in the first place might reduce > the overall tablesync time because the subscriber can do more work in > parallel. So I'm not quite sure what the goal is here. E.g. if the user doesn't care much about how long tablesync phase takes then there is maybe no > need for this patch at all. OTOH, I thought if a user does care about > the subscription startup time, won't those users be opting for a much > larger "max_sync_workers_per_subscription" in the first place? > Therefore shouldn't the benchmarking be using a larger number too? Regardless of how many tablesync workers there are, reusing workers will speed things up if a worker has a chance to sync more than one table. Increasing the number of tablesync workers, of course, improves the tablesync performance. But if it doesn't make 100% parallel ( meaning that # of sync workers != # of tables to sync), then reusing workers can bring an additional improvement. Here are some benchmarks similar to earlier, but with 100 tables and different number of workers: ++-+-+-++ || 2 workers | 4 workers | 6 workers | 8 workers | ++-+-+-++ | master | 2579.154 ms | 1383.153 ms | 1001.559 ms | 911.758 ms | ++-+-+-++ | patch | 1724.230 ms | 853.894 ms | 601.176 ms | 496.395 ms | ++-+-+-++ So yes, increasing the number of workers makes it faster. But reusing workers can still improve more.
Re: [PATCH] Reuse Workers and Replication Slots during Logical Replication
Hi, and thanks for the patch! It is an interesting idea. I have not yet fully read this thread, so below are only my first impressions after looking at patch 0001. Sorry if some of these were already discussed earlier. TBH the patch "reuse-workers" logic seemed more complicated than I had imagined it might be. 1. IIUC with patch 0001, each/every tablesync worker (a.k.a. TSW) when it finishes dealing with one table then goes looking to find if there is some relation that it can process next. So now every TSW has a loop where it will fight with every other available TSW over who will get to process the next relation. Somehow this seems all backwards to me. Isn't it strange for the TSW to be the one deciding what relation it would deal with next? IMO it seems more natural to simply return the finished TSW to some kind of "pool" of available workers and the main Apply process can just grab a TSW from that pool instead of launching a brand new one in the existing function process_syncing_tables_for_apply(). Or, maybe those "available" workers can be returned to a pool maintained within the launcher.c code, which logicalrep_worker_launch() can draw from instead of launching a whole new process? (I need to read the earlier posts to see if these options were already discussed and rejected) ~~ 2. AFAIK the thing that identifies a tablesync worker is the fact that only TSW will have a 'relid'. But it feels very awkward to me to have a TSW marked as "available" and yet that LogicalRepWorker must still have some OLD relid field value lurking (otherwise it will forget that it is a "tablesync" worker!). IMO perhaps it is time now to introduce some enum 'type' to the LogicalRepWorker. Then an "in_use" type=TSW would have a valid 'relid' whereas an "available" type=TSW would have relid == InvalidOid. ~~ 3. Maybe I am mistaken, but it seems the benchmark results posted are only using quite a small/default values for "max_sync_workers_per_subscription", so I wondered how those results are affected by increasing that GUC. I think having only very few workers would cause more sequential processing, so conveniently the effect of the patch avoiding re-launch might be seen in the best possible light. OTOH, using more TSW in the first place might reduce the overall tablesync time because the subscriber can do more work in parallel. So I'm not quite sure what the goal is here. E.g. if the user doesn't care much about how long tablesync phase takes then there is maybe no need for this patch at all. OTOH, I thought if a user does care about the subscription startup time, won't those users be opting for a much larger "max_sync_workers_per_subscription" in the first place? Therefore shouldn't the benchmarking be using a larger number too? == Here are a few other random things noticed while looking at patch 0001: 1. Commit message 1a. typo /sequantially/sequentially/ 1b. Saying "killed" and "killing" seemed a bit extreme and implies somebody else is killing the process. But I think mostly tablesync is just ending by a normal proc exit, so maybe reword this a bit. ~~~ 2. It seemed odd that some -- clearly tablesync specific -- functions are in the worker.c instead of in tablesync.c. 2a. e.g. clean_sync_worker 2b. e.g. sync_worker_exit ~~~ 3. process_syncing_tables_for_sync + /* + * Sync worker is cleaned at this point. It's ready to sync next table, + * if needed. + */ + SpinLockAcquire(&MyLogicalRepWorker->relmutex); + MyLogicalRepWorker->ready_to_reuse = true; SpinLockRelease(&MyLogicalRepWorker->relmutex); + } + + SpinLockRelease(&MyLogicalRepWorker->relmutex); Isn't there a double release of that mutex happening there? -- Kind Regards, Peter Smith. Fujitsu Australia
Re: [PATCH] Reuse Workers and Replication Slots during Logical Replication
On Sun, 26 Feb 2023 at 19:11, Melanie Plageman wrote: > > This is cool! Thanks for working on this. > I had a chance to review your patchset and I had some thoughts and > questions. It looks like this patch got a pretty solid review from Melanie Plageman in February just before the CF started. It was never set to Waiting on Author but I think that may be the right state for it. -- Gregory Stark As Commitfest Manager
Re: [PATCH] Reuse Workers and Replication Slots during Logical Replication
On Wed, Feb 22, 2023 at 8:04 AM Melih Mutlu wrote: > > Hi Wang, > > Thanks for reviewing. > Please see updated patches. [1] This is cool! Thanks for working on this. I had a chance to review your patchset and I had some thoughts and questions. I notice that you've added a new user-facing option to make a snapshot. I think functionality to independently make a snapshot for use elsewhere has been discussed in the past for the implementation of different features (e.g. [1] pg_dump but they ended up using replication slots for this I think?), but I'm not quite sure I understand all the implications for providing a user-visible create snapshot command. Where can it be used? When can the snapshot be used? In your patch's case, you know that you can use the snapshot you are creating, but I just wonder if any restrictions or caveats need be taken for its general use. For the worker reuse portion of the code, could it be a separate patch in the set? It could be independently committable and would be easier to review (separate from repl slot reuse). Given table sync worker reuse, I think it is worth considering a more explicit structure for the table sync worker code now -- i.e. having a TableSyncWorkerMain() function. Though they still do the LogicalRepApplyLoop(), much of what else they do is different than the apply leader. Apply worker leader does: ApplyWorkerMain() walrcv_startstreaming() LogicalRepApplyLoop() launch table sync workers walrcv_endstreaming() proc_exit() Table Sync workers master: ApplyWorkerMain() start_table_sync() walrcv_create_slot() copy_table() walrcv_startstreaming() start_apply() LogicalRepApplyLoop() walrcv_endstreaming() proc_exit() Now table sync workers need to loop back and do start_table_sync() again for their new table. You have done this in ApplyWorkerMain(). But I think that this could be a separate main function since their main loop is effectively totally different now than an apply worker leader. Something like: TableSyncWorkerMain() TableSyncWorkerLoop() start_table_sync() walrcv_startstreaming() LogicalRepApplyLoop() walrcv_endstreaming() wait_for_new_rel_assignment() proc_exit() You mainly have this structure, but it is a bit hidden and some of the shared functions that previously may have made sense for table sync worker and apply workers to share don't really make sense to share anymore. The main thing that table sync workers and apply workers share is the logic in LogicalRepApplyLoop() (table sync workers use when they do catchup), so perhaps we should make the other code separate? Also on the topic of worker reuse, I was wondering if having workers find their own next table assignment (as you have done in process_syncing_tables_for_sync()) makes sense. The way the whole system would work now (with your patch applied), as I understand it, the apply leader would loop through the subscription rel states and launch workers up to max_sync_workers_per_subscription for every candidate table needing sync. The apply leader will continue to do this, even though none of those workers would exit unless they die unexpectedly. So, once it reaches max_sync_workers_per_subscription, it won't launch any more workers. When one of these sync workers is finished with a table (it is synced and caught up), it will search through the subscription rel states itself looking for a candidate table to work on. It seems it would be common for workers to be looking through the subscription rel states at the same time, and I don't really see how you prevent races in who is claiming a relation to work on. Though you take a shared lock on the LogicalRepWorkerLock, what if in between logicalrep_worker_find() and updating my MyLogicalRepWorker->relid, someone else also updates their relid to that relid. I don't think you can update LogicalRepWorker->relid with only a shared lock. I wonder if it is not better to have the apply leader, in process_syncing_tables_for_apply(), first check for an existing worker for the rel, then check for an available worker without an assignment, then launch a worker? Workers could then sleep after finishing their assignment and wait for the leader to give them a new assignment. Given an exclusive lock on LogicalRepWorkerLock, it may be okay for workers to find their own table assignments from the subscriptionrel -- and perhaps this will be much more efficient from a CPU perspective. It feels just a bit weird to have the code doing that buried in process_syncing_tables_for_sync(). It seems like it should at least return out to a main table sync worker loop in which workers loop through finding a table and assigning it to themselves, syncing the table, and catching the table up. - Melanie [1] https://www.postgresql.org/message-id/flat/CA%2BU5nMLRjGtpskUkYSzZOEYZ_8OMc02k%2BO6FDi4una3mB4rS1w%40mail.gmail.com#45692f75a1e79d4ce2d4f6a0
Re: [PATCH] Reuse Workers and Replication Slots during Logical Replication
Hi Wang, Thanks for reviewing. Please see updated patches. [1] wangw.f...@fujitsu.com , 7 Şub 2023 Sal, 10:28 tarihinde şunu yazdı: > 1. In the function ApplyWorkerMain. > I think we need to keep the worker name as "leader apply worker" in the > comment > like the current HEAD. > Done. > I think in this case we also need to pop the error context stack before > returning. Otherwise, I think we might use the wrong callback > (apply error_callback) after we return from this function. > Done. > 3. About the function UpdateSubscriptionRelReplicationSlot. > This newly introduced function UpdateSubscriptionRelReplicationSlot does > not > seem to be invoked. Do we need this function? Removed. I think if 'need_full_snapshot' is false, it means we will create a snapshot > that can read only catalogs. (see SnapBuild->building_full_snapshot) Fixed. ``` > 'use' will use the snapshot for the current transaction executing the > command. > This option must be used in a transaction, and CREATE_REPLICATION_SLOT > must be > the first command run in that transaction. > ``` So I think in the function CreateDecodingContext, when "need_full_snapshot" > is > true, we seem to need the following check, just like in the function > CreateInitDecodingContext: ``` > if (IsTransactionState() && > GetTopTransactionIdIfAny() != InvalidTransactionId) > ereport(ERROR, > (errcode(ERRCODE_ACTIVE_SQL_TRANSACTION), > errmsg("cannot create logical replication > slot in transaction that has performed writes"))); > ``` You're right to "use" the snapshot, it must be the first command in the transaction. And that check happens here [2]. CreateReplicationSnapshot has also similar check. I think the check you're referring to is needed to actually create a replication slot and it performs whether the snapshot will be "used" or "exported". This is not the case for CreateReplicationSnapshot. It seems that we also need to invoke the function > CheckLogicalDecodingRequirements in the new function > CreateReplicationSnapshot, > just like the function CreateReplicationSlot and the function > StartLogicalReplication. Added this check. 3. The invocation of startup_cb_wrapper in the function > CreateDecodingContext. > I think we should change the third input parameter to true when invoke > function > startup_cb_wrapper for CREATE_REPLICATION_SNAPSHOT. BTW, after applying > patch > v10-0002*, these settings will be inconsistent when sync workers use > "CREATE_REPLICATION_SLOT" and "CREATE_REPLICATION_SNAPSHOT" to take > snapshots. > This input parameter (true) will let us disable streaming and two-phase > transactions in function pgoutput_startup. See the last paragraph of the > commit > message for 4648243 for more details. I'm not sure if "is_init" should be set to true. CreateDecodingContext only creates a context for an already existing logical slot and does not initialize new one. I think inconsistencies between "CREATE_REPLICATION_SLOT" and "CREATE_REPLICATION_SNAPSHOT" are expected since one creates a new replication slot and the other does not. CreateDecodingContext is also used in other places as well. Not sure how this change would affect those places. I'll look into this more. Please let me know if I'm missing something. [1] https://www.postgresql.org/message-id/CAGPVpCQmEE8BygXr%3DHi2N2t2kOE%3DPJwofn9TX0J9J4crjoXarQ%40mail.gmail.com [2] https://github.com/postgres/postgres/blob/master/src/backend/replication/walsender.c#L1108 Thanks, -- Melih Mutlu Microsoft
Re: [PATCH] Reuse Workers and Replication Slots during Logical Replication
Hi Shveta, Thanks for reviewing. Please see attached patches. shveta malik , 2 Şub 2023 Per, 14:31 tarihinde şunu yazdı: > On Wed, Feb 1, 2023 at 5:37 PM Melih Mutlu wrote: > for (int64 i = 1; i <= lastusedid; i++) > { > charoriginname_to_drop[NAMEDATALEN] = {0}; > snprintf(originname_to_drop, > sizeof(originname_to_drop), "pg_%u_%lld", subid, (long long) i); > ... > } > > --Is it better to use the function > 'ReplicationOriginNameForLogicalRep' here instead of sprintf, just to > be consistent everywhere to construct origin-name? > ReplicationOriginNameForLogicalRep creates a slot name with current "lastusedid" and doesn't accept that id as parameter. Here the patch needs to check all possible id's. > /* Drop replication origin */ > replorigin_drop_by_name(originname, true, false); > } > > --Are we passing missing_ok as true (second arg) intentionally here in > replorigin_drop_by_name? Once we fix the issue reported in my earlier > email (ASSERT), do you think it makes sense to pass missing_ok as > false here? > Yes, missing_ok is intentional. The user might be concurrently refreshing the sub or the apply worker might already drop the origin at that point. So, missing_ok is set to true. This is also how origin drops before the worker exits are handled on HEAD too. I only followed the same approach. > --Do we need to palloc for each relation separately? Shall we do it > once outside the loop and reuse it? Also pfree is not done for rstate > here. > Removed palloc from the loop. No need to pfree here since the memory context will be deleted with the next CommitTransactionCommand call. > Can you please review the above flow (I have given line# along with), > I think it could be problematic. We alloced prev_slotname, assigned it > to slotname, freed prev_slotname and used slotname after freeing the > prev_slotname. > Also slotname is allocated some memory too, that is not freed. > Right, I used memcpy instead of assigning prev_slotname to slotname. slotname is returned in the end and pfree'd later [1] I also addressed your other reviews that I didn't explicitly mention in this email. I simply applied the changes you pointed out. Also added some more logs as well. I hope it's more useful now. [1] https://github.com/postgres/postgres/blob/master/src/backend/replication/logical/worker.c#L4359 Thanks, -- Melih Mutlu Microsoft v8-0001-Add-replication-protocol-cmd-to-create-a-snapsho.patch Description: Binary data v11-0002-Reuse-Logical-Replication-Background-worker.patch Description: Binary data