RE: Perform streaming logical transactions by background workers and parallel apply
Dear Hou, Thanks for updating the patch! Followings are my comments. === 01. applyparallelworker.c - SIZE_STATS_MESSAGE ``` /* * There are three fields in each message received by the parallel apply * worker: start_lsn, end_lsn and send_time. Because we have updated these * statistics in the leader apply worker, we can ignore these fields in the * parallel apply worker (see function LogicalRepApplyLoop). */ #define SIZE_STATS_MESSAGE (2 * sizeof(XLogRecPtr) + sizeof(TimestampTz)) ``` According to other comment styles, it seems that the first sentence of the comment should represent the datatype and usage, not the detailed reason. For example, about ParallelApplyWorkersList, you said "A list ...". How about adding like following message: The message size that can be skipped by parallel apply worker ~~~ 02. applyparallelworker.c - parallel_apply_start_subtrans ``` if (current_xid != top_xid && !list_member_xid(subxactlist, current_xid)) ``` A macro TransactionIdEquals is defined in access/transam.h. Should we use it, or is it too trivial? ~~~ 03. applyparallelwprker.c - LogicalParallelApplyLoop ``` case SHM_MQ_WOULD_BLOCK: { int rc; if (!in_streamed_transaction) { /* * If we didn't get any transactions for a while there might be * unconsumed invalidation messages in the queue, consume them * now. */ AcceptInvalidationMessages(); maybe_reread_subscription(); } MemoryContextReset(ApplyMessageContext); ``` Is MemoryContextReset() needed? IIUC no one uses ApplyMessageContext if we reach here. ~~~ 04. applyparallelwprker.c - HandleParallelApplyMessages ``` else if (res == SHM_MQ_SUCCESS) { StringInfoData msg; initStringInfo(&msg); appendBinaryStringInfo(&msg, data, nbytes); HandleParallelApplyMessage(winfo, &msg); pfree(msg.data); } ``` In LogicalParallelApplyLoop(), appendBinaryStringInfo() is not used but initialized StringInfoData directly initialized. Why there is a difrerence? The function will do repalloc() and memcpy(), so it may be inefficient. ~~~ 05. applyparallelwprker.c - parallel_apply_send_data ``` if (result != SHM_MQ_SUCCESS) ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("could not send data to shared-memory queue"))); ``` I checked the enumeration of shm_mq_result, and I felt that shm_mq_send(nowait = false) failed only when the opposite process has been exited. How about add a hint or detailed message like "lost connection to parallel apply worker"? === 06. worker.c - nchanges ``` /* * The number of changes sent to parallel apply workers during one streaming * block. */ static uint32 nchanges = 0; ``` I found that the name "nchanges" has been already used in apply_spooled_messages(). It works well because the local variable is always used when name collision between local and global variables is occurred, but I think it may be confused. ~~~ 07. worker.c - apply_handle_commit_internal I think we can add an assertion like Assert(replorigin_session_origin_lsn != InvalidXLogRecPtr && replorigin_session_origin = InvalidRepOriginId), to avoid missing replorigin_session_setup. Previously it was set at the entry point at never reset. ~~~ 08. worker.c - apply_handle_prepare_internal Same as above. ~~~ 09. worker.c - maybe_reread_subscription ``` /* * Exit if any parameter that affects the remote connection was changed. * The launcher will start a new worker. */ if (strcmp(newsub->conninfo, MySubscription->conninfo) != 0 || strcmp(newsub->name, MySubscription->name) != 0 || strcmp(newsub->slotname, MySubscription->slotname) != 0 || newsub->binary != MySubscription->binary || newsub->stream != MySubscription->stream || strcmp(newsub->origin, MySubscription->origin) != 0 || newsub->owner != MySubscription->owner || !equal(newsub->publications, MySubscription->publications)) { ereport(LOG, (errmsg("logical replication apply worker for
RE: TRAP: FailedAssertion("prev_first_lsn < cur_txn->first_lsn", File: "reorderbuffer.c", Line: 927, PID: 568639)
Dear Sawada-san, Amit, > IIUC Change-2 is required in v16 and HEAD but not mandatory in v15 and > v14. The reason why we need Change-2 is that there is a case where we > mark only subtransactions as containing catalog change while not doing > that for its top-level transaction. In v15 and v14, since we mark both > subtransactions and top-level transaction in > SnapBuildXidSetCatalogChanges() as containing catalog changes, we > don't get the assertion failure at "Assert(!needs_snapshot || > needs_timetravel)". Incidentally, I agreed that Change-2 is needed for HEAD (and v16), not v15 and v14. Best Regards, Hayato Kuroda FUJITSU LIMITED
RE: [Proposal] Add foreign-server health checks infrastructure
Dear Osumi-san, > I mainly followed the steps there and > replaced the command "SELECT" for the remote table at 6-9 with "INSERT" > command. > Then, after waiting for few seconds, the "COMMIT" succeeded like below output, > even after the server stop of the worker side. > Additionally, the last reference "SELECT" which failed above can succeed, > if I restart the worker server before the "SELECT" command to the remote > table. > This means the transaction looks successful but the data isn't there ? > Could you please have a look at this issue ? Good catch. This was occurred because we disconnected the crashed server. Previously the failed server had been disconnected in the pgfdw_connection_check(). It was OK if the transaction(or statement) was surely cacneled. But currently the statement might be not canceled because QueryCancelPending might be cleaned up[1]. If we failed to cancel statements and reached pgfdw_xact_callback(), the connection would not be used because entry->conn is NULL. That's why you sucseeded to do "COMMIT". However, the backend did not send "COMMIT" command to the srever, so inserted data was vanished on remote server. I understood that we should not call disconnect_pg_server(entry->conn) even if we detect the disconnection. If should be controlled in Xact callback. This will be modified in next version. Moreover, I noticed that we should enable timer even if the QueryCancelMessage was not NULL. If we do not turn on here, the checking will be never enabled again. [1]: https://www.postgresql.org/message-id/TYAPR01MB5866CE34430424A588F60FC2F55D9%40TYAPR01MB5866.jpnprd01.prod.outlook.com Best Regards, Hayato Kuroda FUJITSU LIMITED
RE: [Proposal] Add foreign-server health checks infrastructure
Dear Horiguchi-san, > Might be on slight different direction, but it looks to me a bit too > much to use WaitEventSet to check only if a socket is live or not. > > A quick search in the tree told me that we could use pqSocketCheck() > instead, and I think it would be the something that "could potentially > go into libpq-fe.h" as Önder mentioned, if I understand what he said > correctly. Based on your suggestion, I tried to add like following function to fe-misc.c: ``` int PQconncheck(PGconn *conn) { /* Raise an ERROR if invalid socket has come */ if (conn == NULL || PQsocket(conn) == PGINVALID_SOCKET) return -1; return pqSocketCheck(conn, 1, 1, -1); } ``` ... and replace pgfdw_connection_check_internal() to PQconncheck(), but it did not work well. To be exact, pqSocketCheck() said the socket was "readable" and "writable" even if the connection has been killed. IIUC, pqSocketCheck () calls pqSocketPoll(), and in the pqSocketPoll() we poll()'d the POLLIN or POLLOUT event. But according to [1], we must wait POLLRDHUP event, so we cannot reuse it straightforward. If we really want to move the checking function anyway, we must follow AddWaitEventToSet() and some WaitEventAdjust functions. I'm not sure whether it is really good. [1]: https://man7.org/linux/man-pages/man2/poll.2.html Best Regards, Hayato Kuroda FUJITSU LIMITED
RE: [Proposal] Add foreign-server health checks infrastructure
Dear Önder, Thanks for giving suggestions! > Still, the reestablish mechanism can be further simplified with > WL_SOCKET_CLOSED event such as the following (where we should probably > rename pgfdw_connection_check_internal): Sounds reasonable. I think it may be included in this patch. I will try to next (or later) version. > In other words, a variation of pgfdw_connection_check_internal() > could potentially go into interfaces/libpq/libpq-fe.h > (backend/libpq/pqcomm.c or src/interfaces/libpq/fe-connect.c). Hmm, IIUC libpq related function and data structures cannot be accessed from core source, so we cannot move to pqcomm.c. (This is a motivation for introducing libpqwalreceiver library. It is used to avoid to link libpq directly) And functions in libpq/fe-connect.c will be included libpq.so, but latch related functions like WaitEventSetWait() should not be called from client application. So it is also not appropriate. In short, there are no good position to place the function because this requires both of libpq and core functions. > I still think that it is probably too much work/code to detect the > mentioned use-case you described on [1]. Each backend constantly > calling CallCheckingRemoteServersCallbacks() for this purpose doesn't sound > the optimal way to approach the "check whether server down" problem. You > typically try to decide whether a server is down by establishing a > connection (or ping etc), not going over all the existing connections. Yes, the approach that establishes a new connection is very simple, but I think it has some holes. For example, if the DNS server or some routing software may is stopped, we will fail to connect to foreign servers. In your approach, we regard the case as "failed" and try to invalidate the server even if the existing connection can be used. Another case is that if a server goes down and failover has occurred, we will succeed to connect to foreign servers. We may regard the case as a "success", but we cannot COMMIT the transaction. > As far as I can think of, it should probably be a single background task > checking whether the server is down. If so, sending an invalidation message > to all the backends such that related backends could act on the > invalidation and throw an error. This is to cover the use-case you > described on [1]. Indeed your approach covers the use case I said, but I'm not sure whether it is really good. In your approach, once the background worker process will manage all foreign servers. It may be OK if there are a few servers, but if there are hundreds of servers, the time interval during checks will be longer. Currently, each FDW can decide whether we do health checks or not per the backend. For example, we can skip health checks if the foreign server is not used now. The background worker cannot control such a way. Moreover, methods to connect to foreign servers and check health are different per FDW. In terms of mysql_fdw [1], we must do mysql_init() and mysql_real_connect(). About file_fdw, we do not have to connect, but developers may want to calculate checksum and compare. Therefore, we must provide callback functions anyway. Based on the above, I do not agree that we introduce a new background worker and make it to do a health check. > This is of course how I would approach this problem. I think some other > perspectives on this would be very useful to hear. Yes, we can hear other opinions :-). [1]: https://github.com/EnterpriseDB/mysql_fdw Best Regards, Hayato Kuroda FUJITSU LIMITED
RE: Add mssing test to test_decoding/meson.build
Dear Michael, > Thanks, applied. This was an oversight of 7f13ac8, and the CI accepts > the test. I confirmed your commit. Great thanks! Best Regards, Hayato Kuroda FUJITSU LIMITED
Add mssing test to test_decoding/meson.build
Hi hackers, I found that the test catalog_change_snapshot was missed in test_decoding/meson.build file. PSA the patch to fix it. Best Regards, Hayato Kuroda FUJITSU LIMITED 0001-add-missing-test-to-test_decoding-meson.build.patch Description: 0001-add-missing-test-to-test_decoding-meson.build.patch
RE: [PATCH] Use indexes on the subscriber when REPLICA IDENTITY is full on the publisher
Dear Önder, Thanks for updating the patch! I think your saying seems reasonable. I have no comments anymore now. Thanks for updating so quickly. Best Regards, Hayato Kuroda FUJITSU LIMITED
RE: test_decoding assertion failure for the loss of top-sub transaction relationship
Dear Sawada-san, > FYI, as I just replied to the related thread[1], the assertion failure > in REL14 and REL15 can be fixed by the patch proposed there. So I'd > like to see how the discussion goes. Regardless of this proposed fix, > the patch proposed by Kuroda-san is required for HEAD, REL14, and > REL15, in order to fix the assertion failure in SnapBuildCommitTxn(). I understood that my patches for REL14 and REL15 might be not needed. I will check the thread later. Thanks! Best Regards, Hayato Kuroda FUJITSU LIMITED
RE: test_decoding assertion failure for the loss of top-sub transaction relationship
Dear Sawada-san, Thank you for reviewing HEAD patch! PSA v3 patch. > +# Test that we can force the top transaction to do timetravel when one of sub > +# transactions needs that. This is necessary when we restart decoding > from RUNNING_XACT > +# without the wal to associate subtransaction to its top transaction. > > I don't think the second sentence is necessary. > > --- > The last decoding > +# starts from the first checkpoint and NEW_CID of "s0_truncate" > doesn't mark the top > +# transaction as catalog modifying transaction. In this scenario, the > enforcement sets > +# needs_timetravel to true even if the top transaction is regarded as > that it does not > +# have catalog changes and thus the decoding works without a > contradition that one > +# subtransaction needed timetravel while its top transaction didn't. > > I don't understand the last sentence, probably it's a long sentence. > > How about the following description? > > # Test that we can handle the case where only subtransaction is marked > as containing > # catalog changes. The last decoding starts from NEW_CID generated by > "s0_truncate" and > # marks only the subtransaction as containing catalog changes but we > don't create the > # association between top-level transaction and subtransaction yet. > When decoding the > # commit record of the top-level transaction, we must force the > top-level transaction > # to do timetravel since one of its subtransactions is marked as > containing catalog changes. Seems good, I replaced all of comments to yours. > + elog(DEBUG2, "forced transaction %u to do timetravel due to one of > its subtransaction", > + xid); > + needs_timetravel = true; > > I think "one of its subtransaction" should be "one of its subtransactions". Fixed. Best Regards, Hayato Kuroda FUJITSU LIMITED HEAD-v3-0001-Fix-assertion-failure-during-logical-decoding.patch Description: HEAD-v3-0001-Fix-assertion-failure-during-logical-decoding.patch
RE: [PATCH] Use indexes on the subscriber when REPLICA IDENTITY is full on the publisher
Dear Önder, Thank you for updating the patch! > It is not about CREATE INDEX being async. It is about pg_stat_all_indexes > being async. If we do not wait, the tests become flaky, because sometimes > the update has not been reflected in the view immediately. Make sense, I forgot how stats collector works... Followings are comments for v16. Only for test codes. ~~~ 01. 032_subscribe_use_index.pl - SUBSCRIPTION CAN UPDATE THE INDEX IT USES AFTER ANALYZE ``` # show that index_b is not used $node_subscriber->poll_query_until( 'postgres', q{select idx_scan=0 from pg_stat_all_indexes where indexrelname = 'index_b';} ) or die "Timed out while waiting for check subscriber tap_sub_rep_full updates two rows via index scan with index on high cardinality column-2"; ``` poll_query_until() is still remained here, it should be replaced to is(). 02. 032_subscribe_use_index.pl - SUBSCRIPTION BEHAVIOR WITH ENABLE_INDEXSCAN ``` # show that the unique index on replica identity is used even when enable_indexscan=false $result = $node_subscriber->safe_psql('postgres', "select idx_scan from pg_stat_all_indexes where indexrelname = 'test_replica_id_full_idx'"); is($result, qq(0), 'ensure subscriber has not used index with enable_indexscan=false'); ``` Is the comment wrong? The index test_replica_id_full_idx is not used here. 032_subscribe_use_index.pl - SUBSCRIPTION CAN UPDATE THE INDEX IT USES AFTER ANALYZE ``` # show that index_b is not used $node_subscriber->poll_query_until( 'postgres', q{select idx_scan=0 from pg_stat_all_indexes where indexrelname = 'index_b';} ) or die "Timed out while waiting for check subscriber tap_sub_rep_full updates two rows via index scan with index on high cardinality column-2"; ``` poll_query_until() is still remained here, it should be replaced to is() 032_subscribe_use_index.pl - SUBSCRIPTION BEHAVIOR WITH ENABLE_INDEXSCAN ``` # show that the unique index on replica identity is used even when enable_indexscan=false $result = $node_subscriber->safe_psql('postgres', "select idx_scan from pg_stat_all_indexes where indexrelname = 'test_replica_id_full_idx'"); is($result, qq(0), 'ensure subscriber has not used index with enable_indexscan=false'); ``` Is the comment wrong? The index test_replica_id_full_idx is not used here. 03. 032_subscribe_use_index.pl - SUBSCRIPTION BEHAVIOR WITH ENABLE_INDEXSCAN ``` $node_publisher->safe_psql('postgres', "ALTER TABLE test_replica_id_full REPLICA IDENTITY USING INDEX test_replica_id_full_unique;"); ``` I was not sure why ALTER TABLE REPLICA IDENTITY USING INDEX was done on the publisher side. IIUC this feature works when REPLICA IDENTITY FULL is specified on a publisher, so it might not be altered here. If so, an index does not have to define on the publisher too. 04. 032_subscribe_use_index.pl - SUBSCRIPTION BEHAVIOR WITH ENABLE_INDEXSCAN ``` $node_subscriber->poll_query_until( 'postgres', q{select (idx_scan=1) from pg_stat_all_indexes where indexrelname = 'test_replica_id_full_unique'} ) or die "Timed out while waiting ensuring subscriber used unique index as replica identity even with enable_indexscan=false'"; ``` 03 comment should be added here. Best Regards, Hayato Kuroda FUJITSU LIMITED
RE: [PATCH] Use indexes on the subscriber when REPLICA IDENTITY is full on the publisher
Dear Önder, Thanks for updating the patch! I checked yours and almost good. Followings are just cosmetic comments. === 01. relation.c - GetCheapestReplicaIdentityFullPath ``` * The reason that the planner would not pick partial indexes and indexes * with only expressions based on the way currently baserestrictinfos are * formed (e.g., col_1 = $1 ... AND col_N = $2). ``` Is "col_N = $2" a typo? I think it should be "col_N = $N" or "attr1 = $1 ... AND attrN = $N". === 02. 032_subscribe_use_index.pl If a table has a primary key on the subscriber, it will be used even if enable_indexscan is false(legacy behavior). Should we test it? ~~~ 03. 032_subscribe_use_index.pl - SUBSCRIPTION RE-CALCULATES INDEX AFTER CREATE/DROP INDEX I think this test seems to be not trivial, so could you write down the motivation? ~~~ 04. 032_subscribe_use_index.pl - SUBSCRIPTION RE-CALCULATES INDEX AFTER CREATE/DROP INDEX ``` # wait until the index is created $node_subscriber->poll_query_until( 'postgres', q{select count(*)=1 from pg_stat_all_indexes where indexrelname = 'test_replica_id_full_idx';} ) or die "Timed out while waiting for check subscriber tap_sub_rep_full_0 updates one row via index"; ``` CREATE INDEX is a synchronous behavior, right? If so we don't have to wait here. ...And the comment in case of die may be wrong. (There are some cases like this) ~~~ 05. 032_subscribe_use_index.pl - SUBSCRIPTION USES INDEX UPDATEs MULTIPLE ROWS ``` # Testcase start: SUBSCRIPTION USES INDEX UPDATEs MULTIPLE ROWS # # Basic test where the subscriber uses index # and touches 50 rows with UPDATE ``` "touches 50 rows with UPDATE" -> "updates 50 rows", per other tests. ~~~ 06. 032_subscribe_use_index.pl - SUBSCRIPTION CAN UPDATE THE INDEX IT USES AFTER ANALYZE I think this test seems to be not trivial, so could you write down the motivation? (Same as Re-calclate) ~~~ 07. 032_subscribe_use_index.pl - SUBSCRIPTION CAN UPDATE THE INDEX IT USES AFTER ANALYZE ``` # show that index_b is not used $node_subscriber->poll_query_until( 'postgres', q{select idx_scan=0 from pg_stat_all_indexes where indexrelname = 'index_b';} ) or die "Timed out while waiting for check subscriber tap_sub_rep_full updates two rows via index scan with index on high cardinality column-2"; ``` I think we don't have to wait here, is() should be used instead. poll_query_until() should be used only when idx_scan>0 is checked. (There are some cases like this) ~~~ 08. 032_subscribe_use_index.pl - SUBSCRIPTION USES INDEX ON PARTITIONED TABLES ``` # make sure that the subscriber has the correct data $node_subscriber->poll_query_until( 'postgres', q{select sum(user_id+value_1+value_2)=550070 AND count(DISTINCT(user_id,value_1, value_2))=981 from users_table_part;} ) or die "ensure subscriber has the correct data at the end of the test"; ``` I think we can replace it to wait_for_catchup() and is()... Moreover, we don't have to wait here because in above line we wait until the index is used on the subscriber. (There are some cases like this) Best Regards, Hayato Kuroda FUJITSU LIMITED
RE: Perform streaming logical transactions by background workers and parallel apply
Dear Hou, > Thanks for the suggestion. > > I tried to add a WaitLatch, but it seems affect the performance > because the Latch might not be set when leader send some > message to parallel apply worker which means it will wait until > timeout. Yes, currently it leader does not notify anything. To handle that leader must set a latch in parallel_apply_send_data(). It can be done if leader accesses to winfo->shared-> logicalrep_worker_slot_no, and sets a latch for LogicalRepCtxStruct->worker[slot_no]. Best Regards, Hayato Kuroda FUJITSU LIMITED
RE: Perform streaming logical transactions by background workers and parallel apply
Dear Hou, I put comments for v35-0001. 01. catalog.sgml ``` + Controls how to handle the streaming of in-progress transactions: + f = disallow streaming of in-progress transactions, + t = spill the changes of in-progress transactions to + disk and apply at once after the transaction is committed on the + publisher, + p = apply changes directly using a parallel apply + worker if available (same as 't' if no worker is available) ``` I'm not sure why 't' means "spill the changes to file". Is it compatibility issue? ~~~ 02. applyworker.c - parallel_apply_stream_abort The argument abort_data is not modified in the function. Maybe "const" modifier should be added. (Other functions should be also checked...) ~~~ 03. applyparallelworker.c - parallel_apply_find_worker ``` + ParallelApplyWorkerEntry *entry = NULL; ``` This may not have to be initialized here. ~~~ 04. applyparallelworker.c - HandleParallelApplyMessages ``` + static MemoryContext hpm_context = NULL; ``` I think "hpm" means "handle parallel message", so it should be "hpam". ~~~ 05. launcher.c - logicalrep_worker_launch() ``` if (is_subworker) snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication parallel worker"); else snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication worker"); ``` I'm not sure why there are only bgw_type even if there are three types of apply workers. Is it for compatibility? ~~~ 06. launcher.c - logicalrep_worker_stop_by_slot An assertion like Assert(slot_no >=0 && slot_no < max_logical_replication_workers) should be added at the top of this function. ~~~ 07. launcher.c - logicalrep_worker_stop_internal ``` +/* + * Workhorse for logicalrep_worker_stop(), logicalrep_worker_detach() and + * logicalrep_worker_stop_by_slot(). Stop the worker and wait for it to die. + */ +static void +logicalrep_worker_stop_internal(LogicalRepWorker *worker) ``` I think logicalrep_worker_stop_internal() may be not "Workhorse" for logicalrep_worker_detach(). In the function internal function is called for parallel apply worker, and it does not main part of the detach function. ~~~ 08. worker.c - handle_streamed_transaction() ``` + TransactionId current_xid = InvalidTransactionId; ``` This initialization is not needed. This is not used in non-streaming mode, otherwise it is substituted before used. ~~~ 09. worker.c - handle_streamed_transaction() ``` + case TRANS_PARALLEL_APPLY: + /* Define a savepoint for a subxact if needed. */ + parallel_apply_start_subtrans(current_xid, stream_xid); + return false; ``` Based on other case-block, Assert(am_parallel_apply_worker()) may be added at the top of this part. This suggestion can be said for other swith-case statements. ~~~ 10. worker.c - apply_handle_stream_start ``` + * + * XXX We can avoid sending pair of the START/STOP messages to the parallel + * worker because unlike apply worker it will process only one + * transaction-at-a-time. However, it is not clear whether that is worth the + * effort because it is sent after logical_decoding_work_mem changes. ``` I can understand that START message is not needed, but is STOP really removable? If leader does not send STOP to its child, does it lose a chance to change the worker-state to IDLE_IN_TRANSACTION? ~~~ 11. worker.c - apply_handle_stream_start Currently the number of received chunks have not counted, but it can do if a variable "nchunks" is defined and incremented in apply_handle_stream_start(). This this info may be useful to determine appropriate logical_decoding_work_mem for workloads. How do you think? ~~~ 12. worker.c - get_transaction_apply_action {} are not needed. Best Regards, Hayato Kuroda FUJITSU LIMITED
RE: Perform streaming logical transactions by background workers and parallel apply
Dear Amit, > Can't we use WaitLatch in the case of SHM_MQ_WOULD_BLOCK as we are > using it for the same case at some other place in the code? We can use > the same nap time as we are using in the leader apply worker. I'm not sure whether such a short nap time is needed or not. Because unlike leader apply worker, parallel apply workers do not have timeout like wal_receiver_timeout, so they do not have to check so frequently and send feedback to publisher. But basically I agree that we can use same logic as leader. Best Regards, Hayato Kuroda FUJITSU LIMITED
RE: [PATCH] Use indexes on the subscriber when REPLICA IDENTITY is full on the publisher
Dear Önder, Thank you for updating the patch! At first I replied to your comments. > My thinking on those functions is that they should probably stay > in src/backend/replication/logical/relation.c. My main motivation is that > those functions are so much tailored to the purposes of this file that I > cannot see any use-case for these functions in any other context. I was not sure what should be, but I agreed that functions will be not used from other parts. > Hmm, I cannot follow this comment. Can you please clarify? In your patch: ``` + /* Simple case, we already have a primary key or a replica identity index */ + idxoid = GetRelationIdentityOrPK(localrel); + if (OidIsValid(idxoid)) + return idxoid; + + /* +* If index scans are disabled, use a sequential scan. +* +* Note that we still allowed index scans above when there is a primary +* key or unique index replica identity, but that is the legacy behaviour +* (even when enable_indexscan is false), so we hesitate to move this +* enable_indexscan check to be done earlier in this function. +*/ ``` And the paragraph " Note that we..." should be at above of GetRelationIdentityOrPK(). Future readers will read the function from top to bottom, and when they read around GetRelationIdentityOrPK() they may be confused. > So, I think it is better to have specific names, no? OK. > The inlined comment in the function has a similar comment. Is that clear > enough? > /* * Generate restrictions for all columns in the form of col_1 = $1 AND * > col_2 = $2 ... */ Actually I missed it, but I still think that whole of emulated SQL should be clarified. > Though, I agree that we can improve the code a bit. I now > use targetrelkind and dropped localrelid to check whether the target is a > partitioned table. Is this better? Great improvement. Genus! > Well, I'm not sure if it is worth the complexity. There are only 4 usages > of the same table, and these are all pretty simple statements, and all > other tests seem to have a similar pattern. I have not seen any tests where > these simple statements are done in a function even if they are repeated. > I'd rather keep it so that this doesn't lead to other style discussions? If other tests do not combine such parts, it's OK. My motivation of these comments were to reduce the number of row for the test code. > Oh, I didn't know about this, thanks! Now meson test system do your test. OK. And followings are the comments for v14. They are mainly about comments. === 01. relation.c - logicalrep_rel_open ``` + /* +* Finding a usable index is an infrequent task. It occurs when an +* operation is first performed on the relation, or after invalidation +* of the relation cache entry (e.g., such as ANALYZE). +*/ + entry->usableIndexOid = FindLogicalRepUsableIndex(entry->localrel, remoterel); ``` I thought you can mention CREATE INDEX in the comment. According to your analysis [1] the relation cache will be invalidated if users do CREATE INDEX At that time the hash entry will be removed (logicalrep_relmap_invalidate_cb) and "usable" index will be checked again. ~~~ 02. relation.c - logicalrep_partition_open ``` + /* +* Finding a usable index is an infrequent task. It occurs when an +* operation is first performed on the relation, or after invalidation of +* the relation cache entry (e.g., such as ANALYZE). +*/ + entry->usableIndexOid = FindLogicalRepUsableIndex(partrel, remoterel); + ``` Same as above ~~~ 03. relation.c - GetIndexOidFromPath ``` + if (path->pathtype == T_IndexScan || path->pathtype == T_IndexOnlyScan) + { + IndexPath *index_sc = (IndexPath *) path; + + return index_sc->indexinfo->indexoid; + } ``` I thought Assert(OidIsValid(indexoid)) may be added here. Or is it quite trivial? ~~~ 04. relation.c - IndexOnlyOnExpression This method just returns "yes" or "no", so the name of method should be start "Has" or "Is". ~~~ 05. relation.c - SuitablePathsForRepIdentFull ``` +/* + * Iterates over the input path list and returns another + * path list that includes index [only] scans where paths + * with non-btree indexes, partial indexes or + * indexes on only expressions have been removed. + */ ``` These lines seems to be around 60 columns. Could you expand around 80? ~~~ 06. relation.c - SuitablePathsForRepIdentFull ``` + RelationindexRelation; + IndexInfo *indexInfo; + boolis_btree; + boolis_partial; + boolis_only_on_expression; + + indexRelation = index_open(idxoid, AccessShareLock); + indexInfo = BuildIndexInfo(indexR
RE: [Proposal] Add foreign-server health checks infrastructure
Dear Önder, > As far as I can see this patch is mostly useful for detecting the failures > on the initial remote command. This is especially common when the remote > server does a failover/switchover and postgres_fdw uses a cached connection > to access to the remote server. Sounds reasonable. Do you mean that we can add additional GUC like "postgres_fdw.initial_check", wait WL_SOCKET_CLOSED if the conneciton is found in the hash table, and do reconnection if it might be closed, right? > I think any extension that deals with multiple Postgres nodes can benefit > from such logic. In fact, the reason I realized this patch is that on the > Citus extension, we are trying to solve a similar problem [1], [2]. > Thinking even more, I think any extension that uses libpq and WaitEventSets > can benefit from such a function. OK, I agreed it may be useful, but where should this function be? This cannot be used from application, so it should not in interface/libpq dir. So backend/libpq/pqcomm.c? > I think it also depends on where you decide to put > pgfdw_connection_check_internal(). If you prefer the postgres_fdw side, > could we maybe use ConnCacheEntry in contrib/postgres_fdw/connection.c? > But if you decide to put it into the Postgres side, the API > for pgfdw_connection_check_internal() -- or equivalent function -- could be > discussed. Do we pass a WaitEventSet and if it is NULL create a new one, > else use what is passed to the function? Not sure, maybe you can come up > with a better API. Thank you for describing more detail. I can imagine you said. Best Regards, Hayato Kuroda FUJITSU LIMITED
RE: [Proposal] Add foreign-server health checks infrastructure
Dear Önder, Thank you for being interest to my patch! Your suggestions will be included to newer version. > In other words, what is the trade-off for calling > pgfdw_connection_check_internal() inside GetConnection() when we are about > to use a "cached" connection? I think that might simplify the patch as well If the checking function is called not periodically but GetConnection(), it means that the health of foreign servers will be check only when remote connections are used. So following workload described in [1] cannot handle the issue. BEGIN --- remote operations--- local operations --- COMMIT But, yes, I perfectly agreed that it could simplify the code because we can reduce the timer part. This is second plan of this patch, I may move on this approach if it is still useful. > Can we have this function/logic on Postgres core, so that other extensions > can also use? I was not sure about any use-case, but I think it can because it does quite general things. Is there any good motivation to do that? > What if PQsocket(conn) returns -1? Maybe we move certain connection state > checks into pgfdw_connection_check_internal() such that it is more generic? > I can think of checks like: conn!=NULL, PQsocket(conn) != PGINVALID_SOCKET, > PQstatus == CONNECTION_OK ereport(ERROR) will be thrown if PQsocket(conn) returns -1. All of you said should be handled here. I will modify it. > Do you see any performance implications of creating/freeing waitEventSets > per check? I wonder if we can somehow re-use the same waitEventSet by > modifyWaitEvent? I guess no, but still, if this check causes a performance > implication, can we somehow cache 1 waitEventSet per connection? I have not tested yet, but I agreed this will be caused performance decrease. In next version first I will re-use the event set anyway, and it must be considered later. Actually I'm not sure your suggestion, but you mean to say that we can add a hash table that associates PGconn and WaitEventSet, right? [1]: https://www.postgresql.org/message-id/TYAPR01MB58662809E678253B90E82CE5F5889%40TYAPR01MB5866.jpnprd01.prod.outlook.com Best Regards, Hayato Kuroda FUJITSU LIMITED
RE: [Proposal] Add foreign-server health checks infrastructure
Dear Andres, > This seems to reliably fail on windows. See Thanks for reporting. Actually this feature cannot be used on Windows machine. To check the status of each socket that connects to the foreign server, the socket event WL_SOCKET_CLOSED is used. The event is only enabled on some OSes, and Windows machine cannot. The part must be skipped if the system cannot be used the event, but I was not sure how to do that... Best Regards, Hayato Kuroda FUJITSU LIMITED
RE: Question: test "aggregates" failed in 32-bit machine
Dear Tom, > Hmm, we're not seeing any such failures in the buildfarm's 32-bit > animals, so there must be some additional condition needed to make > it happen. Can you be more specific? Hmm, I was not sure about additional conditions, sorry. I could reproduce with followings steps: $ git clone https://github.com/postgres/postgres.git $ cd postgres $ ./configure --enable-cassert --enable-debug $ make -j2 $ make check LANG=C -> aggregates ... FAILED 3562 ms The hypervisor of the virtual machine is " VMware vSphere 7.0" And I picked another information related with the machine. Could you find something? ``` pg_config]$ ./pg_config ... CONFIGURE = '--enable-cassert' '--enable-debug' CC = gcc -std=gnu99 CPPFLAGS = -D_GNU_SOURCE CFLAGS = -Wall -Wmissing-prototypes -Wpointer-arith -Wdeclaration-after-statement -Werror=vla -Wendif-labels -Wmissing-format-attribute -Wformat-security -fno-strict-aliasing -fwrapv -g -O2 CFLAGS_SL = -fPIC LDFLAGS = -Wl,--as-needed -Wl,-rpath,'/usr/local/pgsql/lib',--enable-new-dtags LDFLAGS_EX = LDFLAGS_SL = LIBS = -lpgcommon -lpgport -lz -lreadline -lrt -ldl -lm VERSION = PostgreSQL 16devel $ locale LANG=C ... $ arch i686 $cat /proc/cpuinfo processor : 0 vendor_id : GenuineIntel cpu family : 6 model : 85 model name : Intel(R) Xeon(R) Platinum 8260 CPU @ 2.40GHz stepping: 7 microcode : 83898371 cpu MHz : 2394.374 cache size : 36608 KB physical id : 0 siblings: 1 core id : 0 cpu cores : 1 apicid : 0 initial apicid : 0 fdiv_bug: no hlt_bug : no f00f_bug: no coma_bug: no fpu : yes fpu_exception : yes cpuid level : 22 wp : yes flags : fpu vme de pse tsc msr pae mce cx8 apic sep mtrr pge mca cmov pat pse36 clflush mmx fxsr sse sse2 ss nx pdpe1gb rdtscp lm constant_tsc arch_perfmon xtopology tsc_reliable nonstop_tsc unfair_spinlock eagerfpu pni pclmulqdq ssse3 fma cx16 pcid sse4_1 sse4_2 x2apic movbe popcnt tsc_deadline_timer aes xsave avx f16c rdrand hypervisor lahf_lm abm 3dnowprefetch arat xsaveopt ssbd ibrs ibpb stibp fsgsbase bmi1 avx2 smep bmi2 invpcid avx512f rdseed adx avx512cd md_clear flush_l1d arch_capabilities bogomips: 4788.74 clflush size: 64 cache_alignment : 64 address sizes : 43 bits physical, 48 bits virtual power management: processor : 1 vendor_id : GenuineIntel cpu family : 6 model : 85 model name : Intel(R) Xeon(R) Platinum 8260 CPU @ 2.40GHz stepping: 7 microcode : 83898371 cpu MHz : 2394.374 cache size : 36608 KB physical id : 2 siblings: 1 core id : 0 cpu cores : 1 apicid : 2 initial apicid : 2 fdiv_bug: no hlt_bug : no f00f_bug: no coma_bug: no fpu : yes fpu_exception : yes cpuid level : 22 wp : yes flags : fpu vme de pse tsc msr pae mce cx8 apic sep mtrr pge mca cmov pat pse36 clflush mmx fxsr sse sse2 ss nx pdpe1gb rdtscp lm constant_tsc arch_perfmon xtopology tsc_reliable nonstop_tsc unfair_spinlock eagerfpu pni pclmulqdq ssse3 fma cx16 pcid sse4_1 sse4_2 x2apic movbe popcnt tsc_deadline_timer aes xsave avx f16c rdrand hypervisor lahf_lm abm 3dnowprefetch arat xsaveopt ssbd ibrs ibpb stibp fsgsbase bmi1 avx2 smep bmi2 invpcid avx512f rdseed adx avx512cd md_clear flush_l1d arch_capabilities bogomips: 4788.74 clflush size: 64 cache_alignment : 64 address sizes : 43 bits physical, 48 bits virtual power management ``` Best Regards, Hayato Kuroda FUJITSU LIMITED
RE: Perform streaming logical transactions by background workers and parallel apply
Dear Hou, Thanks for updating patch. I will review yours soon, but I reply to your comment. > > 04. applyparallelworker.c - LogicalParallelApplyLoop() > > > > ``` > > + shmq_res = shm_mq_receive(mqh, &len, &data, false); > > ... > > + if (ConfigReloadPending) > > + { > > + ConfigReloadPending = false; > > + ProcessConfigFile(PGC_SIGHUP); > > + } > > ``` > > > > > > Here the parallel apply worker waits to receive messages and after > > dispatching > > it ProcessConfigFile() is called. > > It means that .conf will be not read until the parallel apply worker > > receives new > > messages and apply them. > > > > It may be problematic when users change log_min_message to debugXXX for > > debugging but the streamed transaction rarely come. > > They expected that detailed description appears on the log from next > > streaming chunk, but it does not. > > > > This does not occur in leader worker when it waits messages from publisher, > > because it uses libpqrcv_receive(), which works asynchronously. > > > > I 'm not sure whether it should be documented that the evaluation of GUCs > > may > > be delayed, how do you think? > > I changed the shm_mq_receive to asynchronous mode which is also consistent > with > what we did for Gather node when reading data from parallel query workers. I checked your implementation, but it seemed that the parallel apply worker will not sleep even if there are no messages or signals. It might be very inefficient. In gather node - gather_readnext(), the same way is used, but I think there is a premise that the wait-time is short because it is related with only one gather node. In terms of parallel apply worker, however, we cannot predict the wait-time because it is related with the streamed transactions. If such transactions rarely come, parallel apply workers may spend many CPU time. I think we should wait during short time or until leader notifies, if shmq_res == SHM_MQ_WOULD_BLOCK. How do you think? Best Regards, Hayato Kuroda FUJITSU LIMITED
Question: test "aggregates" failed in 32-bit machine
Hi hackers, While running `make check LANC=C` with 32-bit virtual machine, I found that it was failed at "aggregates". PSA the a1b3bca1_regression.diffs. IIUC that part has been added by db0d67db. I checked out the source, tested, and got same result. PSA the db0d67db_regression.diffs I'm not sure about it, but is it an expected behavior? I know that we do not have to consider about "row" ordering, Followings show the environment. Please tell me if another information is needed. OS: RHEL 6.10 server Arch: i686 Gcc: 4.4.7 $ uname -a Linux VMX 2.6.32-754.41.2.el6.i686 #1 SMP Sat Jul 10 04:21:20 EDT 2021 i686 i686 i386 GNU/Linux Configure option: --enable-cassert --enable-debug --enable-tap-tests Best Regards, Hayato Kuroda FUJITSU LIMITED a1b3bca1_regression.diffs Description: a1b3bca1_regression.diffs db0d67db_regression.diffs Description: db0d67db_regression.diffs
RE: [PATCH] Use indexes on the subscriber when REPLICA IDENTITY is full on the publisher
Dear Önder: Thank you for updating patch! Your documentation seems OK, and I could not find any other places to be added Followings are my comments. 01 relation.c - general Many files are newly included. I was not sure but some codes related with planner may be able to move to src/backend/optimizer/plan. How do you and any other one think? 02 relation.c - FindLogicalRepUsableIndex ``` +/* + * Returns an index oid if we can use an index for the apply side. If not, + * returns InvalidOid. + */ +static Oid +FindLogicalRepUsableIndex(Relation localrel, LogicalRepRelation *remoterel) ``` I grepped files, but I cannot find the word "apply side". How about "subscriber" instead? 03 relation.c - FindLogicalRepUsableIndex ``` + /* Simple case, we already have an identity or pkey */ + idxoid = GetRelationIdentityOrPK(localrel); + if (OidIsValid(idxoid)) + return idxoid; + + /* +* If index scans are disabled, use a sequential scan. +* +* Note that we still allowed index scans above when there is a primary +* key or unique index replica identity, but that is the legacy behaviour +* (even when enable_indexscan is false), so we hesitate to move this +* enable_indexscan check to be done earlier in this function. +*/ + if (!enable_indexscan) + return InvalidOid; ``` a. I think "identity or pkey" should be "replica identity key or primary key" or "RI or PK" b. Later part should be at around GetRelationIdentityOrPK. 04 relation.c - FindUsableIndexForReplicaIdentityFull ``` + MemoryContext usableIndexContext; ... + usableIndexContext = AllocSetContextCreate(CurrentMemoryContext, + "usableIndexContext", + ALLOCSET_DEFAULT_SIZES); ``` I grepped other sources, and I found that the name like "tmpcxt" is used for the temporary MemoryContext. 05 relation.c - SuitablePathsForRepIdentFull ``` + indexRelation = index_open(idxoid, AccessShareLock); + indexInfo = BuildIndexInfo(indexRelation); + is_btree = (indexInfo->ii_Am == BTREE_AM_OID); + is_partial = (indexInfo->ii_Predicate != NIL); + is_only_on_expression = IndexOnlyOnExpression(indexInfo); + index_close(indexRelation, NoLock); ``` Why the index is closed with NoLock? AccessShareLock is acquired, so shouldn't same lock be released? 06 relation.c - GetCheapestReplicaIdentityFullPath IIUC a query like "SELECT tbl WHERE attr1 = $1 AND attr2 = $2 ... AND attrN = $N" is emulated, right? you can write explicitly it as comment 07 relation.c - GetCheapestReplicaIdentityFullPath ``` + Path *path = (Path *) lfirst(lc); + Oid idxoid = GetIndexOidFromPath(path); + + if (!OidIsValid(idxoid)) + { + /* Unrelated Path, skip */ + suitableIndexList = lappend(suitableIndexList, path); + } ``` I was not clear about here. IIUC in the function we want to extract "good" scan plan and based on that the cheapest one is chosen. GetIndexOidFromPath() seems to return InvalidOid when the input path is not index scan, so why is it appended to the suitable list? === 08 worker.c - usable_indexoid_internal I think this is not "internal" function, such name should be used for like "apply_handle_commit" - "apply_handle_commit_internal", or "apply_handle_insert" - "apply_handle_insert_internal". How about "get_usable_index" or something? 09 worker.c - usable_indexoid_internal ``` + Oid targetrelid = targetResultRelInfo->ri_RelationDesc->rd_rel->oid; + Oid localrelid = relinfo->ri_RelationDesc->rd_id; + + if (targetrelid != localrelid) ``` I think these lines are very confusable. IIUC targetrelid is corresponded to the "parent", and localrelid is corresponded to the "child", right? How about changing name to "partitionedoid" and "leadoid" or something? === 10 032_subscribe_use_index.pl ``` # create tables pub and sub $node_publisher->safe_psql('postgres', "CREATE TABLE test_replica_id_full (x int)"); $node_publisher->safe_psql('postgres', "ALTER TABLE test_replica_id_full REPLICA IDENTITY FULL;"); $node_subscriber->safe_psql('postgres', "CREATE TABLE test_replica_id_full (x int)"); $node_subscriber->safe_psql('postgres', "CREATE INDEX test_replica_id_full_idx ON test_replica_id_full(x)"); ``` In many places same table is defined, altered as "REPLICA IDENTITY FULL", and index is created. Could you combine them into function? 11 032_subscribe_use_index.pl ``` # wait until the index is used on the subscri
RE: [small patch] Change datatype of ParallelMessagePending from "volatile bool" to "volatile sig_atomic_t"
Dear Michael, > Yeah, at least as of the cancel callback psql_cancel_callback() that > handle_sigint() would call on SIGINT as this is set by psql. So it > does not seem right to use a boolean rather than a sig_atomic_t in > this case, as well. PSA fix patch. Note that PromptInterruptContext.enabled was also fixed because it is substituted from sigint_interrupt_enabled Best Regards, Hayato Kuroda FUJITSU LIMITED 0001-Mark-sigint_interrupt_enabled-as-sig_atomic_t.patch Description: 0001-Mark-sigint_interrupt_enabled-as-sig_atomic_t.patch
RE: Perform streaming logical transactions by background workers and parallel apply
Dear Wang, Followings are comments for your patchset. 0001 01. launcher.c - logicalrep_worker_stop_internal() ``` + + Assert(LWLockHeldByMe(LogicalRepWorkerLock)); + ``` I think it should be Assert(LWLockHeldByMeInMode(LogicalRepWorkerLock, LW_SHARED)) because the lock is released one and acquired again as LW_SHARED. If newer function has been acquired lock as LW_EXCLUSIVE and call logicalrep_worker_stop_internal(), its lock may become weaker after calling it. 02. launcher.c - apply_handle_stream_start() ``` + /* +* Notify handle methods we're processing a remote in-progress +* transaction. +*/ + in_streamed_transaction = true; - MyLogicalRepWorker->stream_fileset = palloc(sizeof(FileSet)); - FileSetInit(MyLogicalRepWorker->stream_fileset); + /* +* Start a transaction on stream start, this transaction will be +* committed on the stream stop unless it is a tablesync worker in +* which case it will be committed after processing all the +* messages. We need the transaction for handling the buffile, +* used for serializing the streaming data and subxact info. +*/ + begin_replication_step(); ``` Previously in_streamed_transaction was set after the begin_replication_step(), but the ordering is modified. Maybe we don't have to modify it if there is no particular reason. 03. launcher.c - apply_handle_stream_stop() ``` + /* Commit the per-stream transaction */ + CommitTransactionCommand(); + + /* Reset per-stream context */ + MemoryContextReset(LogicalStreamingContext); + + pgstat_report_activity(STATE_IDLE, NULL); + + in_streamed_transaction = false; ``` Previously in_streamed_transaction was set after the MemoryContextReset(), but the ordering is modified. Maybe we don't have to modify it if there is no particular reason. 04. applyparallelworker.c - LogicalParallelApplyLoop() ``` + shmq_res = shm_mq_receive(mqh, &len, &data, false); ... + if (ConfigReloadPending) + { + ConfigReloadPending = false; + ProcessConfigFile(PGC_SIGHUP); + } ``` Here the parallel apply worker waits to receive messages and after dispatching it ProcessConfigFile() is called. It means that .conf will be not read until the parallel apply worker receives new messages and apply them. It may be problematic when users change log_min_message to debugXXX for debugging but the streamed transaction rarely come. They expected that detailed description appears on the log from next streaming chunk, but it does not. This does not occur in leader worker when it waits messages from publisher, because it uses libpqrcv_receive(), which works asynchronously. I 'm not sure whether it should be documented that the evaluation of GUCs may be delayed, how do you think? === 0004 05. logical-replication.sgml ``` ... In that case, it may be necessary to change the streaming mode to on or off and cause the same conflicts again so the finish LSN of the failed transaction will be written to the server log. ... ``` Above sentence is added by 0001, but it is not modified by 0004. Such transactions will be retried as streaming=on mode, so some descriptions related with it should be added. Best Regards, Hayato Kuroda FUJITSU LIMITED
RE: [small patch] Change datatype of ParallelMessagePending from "volatile bool" to "volatile sig_atomic_t"
Dear Michael, > Done this one. I have scanned the code, but did not notice a similar > mistake. I found your commit, thanks! > It is worth noting that we have only one remaining "volatile > bool" in the headers now. Maybe you mentioned about sigint_interrupt_enabled, and it also seems to be modified in the signal handler. But I think any race conditions may be not occurred here because if the value is set in the handler the code jump will be also happened. Of course it's OK to mark the variable to sig_atomic_t too if there is no problem. Best Regards, Hayato Kuroda FUJITSU LIMITED
RE: Perform streaming logical transactions by background workers and parallel apply
Dear Wang, Thanks for updating patch!... but cfbot says that it cannot be accepted [1]. I thought the header should be included, like miscadmin.h. [1]: https://cirrus-ci.com/task/5909508684775424 Best Regards, Hayato Kuroda FUJITSU LIMITED
[small patch] Change datatype of ParallelMessagePending from "volatile bool" to "volatile sig_atomic_t"
Hi hackers, While reviewing [1], I and Amit noticed that a flag ParallelMessagePending is defined as "volatile bool", but other flags set by signal handlers are defined as "volatile sig_atomic_t". The datatype has been defined in standard C, and it says that variables referred by signal handlers should be "volatile sig_atomic_t". (Please see my observation [2]) This may be not needed because any failures had been reported, but I thought their datatype should be same and attached a small patch. How do you think? [1]: https://commitfest.postgresql.org/39/3621/ [2]: https://www.postgresql.org/message-id/TYAPR01MB5866C056BB9F81A42B85D20BF54E9%40TYAPR01MB5866.jpnprd01.prod.outlook.com Best Regards, Hayato Kuroda FUJITSU LIMITED 0001-Change-datatype-of-ParallelMessagePending-to-keep-co.patch Description: 0001-Change-datatype-of-ParallelMessagePending-to-keep-co.patch
RE: Perform streaming logical transactions by background workers and parallel apply
Hi Amit, > > I checked other flags that are set by signal handlers, their datatype > > seemed to > be sig_atomic_t. > > Is there any reasons that you use normal bool? It should be changed if not. > > > > It follows the logic similar to ParallelMessagePending. Do you see any > problem with it? Hmm, one consideration is: what will happen if the signal handler HandleParallelApplyMessageInterrupt() is fired during "ParallelApplyMessagePending = false;"? IIUC sig_atomic_t has been needed to avoid writing to same data at the same time. According to C99 standard(I grepped draft version [1]), the behavior seems to be undefined if the signal handler attaches to not "volatile sig_atomic_t" data. ...But I'm not sure whether this is really problematic in the current system, sorry... ``` If the signal occurs other than as the result of calling the abort or raise function, the behavior is undefined if the signal handler refers to any object with static storage duration other than by assigning a value to an object declared as volatile sig_atomic_t, or the signal handler calls any function in the standard library other than the abort function, the _Exit function, or the signal function with the first argument equal to the signal number corresponding to the signal that caused the invocation of the handler. ``` > > a. > > I was not sure when the cell should be cleaned. Currently we clean up > ParallelApplyWorkersList() only in the parallel_apply_start_worker(), > > but we have chances to remove such a cell like HandleParallelApplyMessages() > or HandleParallelApplyMessage(). How do you think? > > > > Note that HandleParallelApply* are invoked during interrupt handling, > so it may not be advisable to remove it there. > > > > > 12. ConfigureNamesInt > > > > ``` > > + { > > + {"max_parallel_apply_workers_per_subscription", > > + PGC_SIGHUP, > > + REPLICATION_SUBSCRIBERS, > > + gettext_noop("Maximum number of parallel apply > workers per subscription."), > > + NULL, > > + }, > > + &max_parallel_apply_workers_per_subscription, > > + 2, 0, MAX_BACKENDS, > > + NULL, NULL, NULL > > + }, > > ``` > > > > This parameter can be changed by pg_ctl reload, so the following corner case > may be occurred. > > Should we add a assign hook to handle this? Or, can we ignore it? > > > > I think we can ignore this as it will eventually start respecting the > threshold. Both of you said are reasonable for me. [1]: https://www.open-std.org/JTC1/SC22/WG14/www/docs/n1256.pdf Best Regards, Hayato Kuroda FUJITSU LIMITED
RE: Perform streaming logical transactions by background workers and parallel apply
Dear Wang, Thanks for updating the patch! Followings are comments for v33-0001. === libpqwalreceiver.c 01. inclusion ``` +#include "catalog/pg_subscription.h" ``` We don't have to include it because the analysis of parameters is done at caller. === launcher.c 02. logicalrep_worker_launch() ``` + /* +* Return silently if the number of parallel apply workers reached the +* limit per subscription. +*/ + if (is_subworker && nparallelapplyworkers >= max_parallel_apply_workers_per_subscription) ``` a. I felt that it might be kind if we output some debug messages. b. The if statement seems to be more than 80 characters. You can move to new line around "nparallelapplyworkers >= ...". === applyparallelworker.c 03. declaration ``` +/* + * Is there a message pending in parallel apply worker which we need to + * receive? + */ +volatile bool ParallelApplyMessagePending = false; ``` I checked other flags that are set by signal handlers, their datatype seemed to be sig_atomic_t. Is there any reasons that you use normal bool? It should be changed if not. 04. HandleParallelApplyMessages() ``` + if (winfo->error_mq_handle == NULL) + continue; ``` a. I was not sure when the cell should be cleaned. Currently we clean up ParallelApplyWorkersList() only in the parallel_apply_start_worker(), but we have chances to remove such a cell like HandleParallelApplyMessages() or HandleParallelApplyMessage(). How do you think? b. Comments should be added even if we keep this, like "exited worker, skipped". ``` + else + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), +errmsg("lost connection to the leader apply worker"))); ``` c. This function is called on the leader apply worker, so the hint should be "lost connection to the parallel apply worker". 05. parallel_apply_setup_worker() `` + if (launched) + { + ParallelApplyWorkersList = lappend(ParallelApplyWorkersList, winfo); + } ``` {} should be removed. 06. parallel_apply_wait_for_xact_finish() ``` + /* If any workers have died, we have failed. */ ``` This function checked only about a parallel apply worker, so the comment should be "if worker has..."? === worker.c 07. handle_streamed_transaction() ``` + * For non-streamed transactions, returns false; ``` "returns false;" -> "returns false" apply_handle_commit_prepared(), apply_handle_abort_prepared() These functions are not expected that parallel worker calls so I think Assert() should be added. 08. UpdateWorkerStats() ``` -static void +void UpdateWorkerStats(XLogRecPtr last_lsn, TimestampTz send_time, bool reply) ``` This function is called only in worker.c, should be static. 09. subscription_change_cb() ``` -static void +void subscription_change_cb(Datum arg, int cacheid, uint32 hashvalue) ``` This function is called only in worker.c, should be static. 10. InitializeApplyWorker() ``` +/* + * Initialize the database connection, in-memory subscription and necessary + * config options. + */ void -ApplyWorkerMain(Datum main_arg) +InitializeApplyWorker(void) ``` Some comments should be added about this is a common part of leader and parallel apply worker. === logicalrepworker.h 11. declaration ``` extern PGDLLIMPORT volatile bool ParallelApplyMessagePending; ``` Please refer above comment. === guc_tables.c 12. ConfigureNamesInt ``` + { + {"max_parallel_apply_workers_per_subscription", + PGC_SIGHUP, + REPLICATION_SUBSCRIBERS, + gettext_noop("Maximum number of parallel apply workers per subscription."), + NULL, + }, + &max_parallel_apply_workers_per_subscription, + 2, 0, MAX_BACKENDS, + NULL, NULL, NULL + }, ``` This parameter can be changed by pg_ctl reload, so the following corner case may be occurred. Should we add a assign hook to handle this? Or, can we ignore it? 1. set max_parallel_apply_workers_per_subscription to 4. 2. start replicating two streaming transactions. 3. commit transactions === Two parallel workers will be remained === 4. change max_parallel_apply_workers_per_subscription to 3 5. We expected that only one worker remains, but two parallel workers remained. It will be not stopped until another streamed transaction is started and committed. Best Regards, Hayato Kuroda FUJITSU LIMITED
RE: [Proposal] Add foreign-server health checks infrastructure
Dear Fujii-san, Thanks for checking! > These failed to be applied to the master branch cleanly. Could you update > them? PSA rebased patches. I reviewed my myself and they contain changes. E.g., move GUC-related code to option.c. > + this option relies on kernel events exposed by Linux, macOS, > > s/this/This Fixed. > > + GUC_check_errdetail("pgfdw_health_check_interval must be set > to 0 on this platform"); > > The actual parameter name "postgres_fdw.health_check_interval" > should be used for the message instead of internal variable name. Fixed. > This registered signal handler does lots of things. But that's not acceptable > and they should be performed outside signal handler. No? I modified like v09 or earlier versions, which has a mechanism for registering CheckingRemoteServersCallback. It had been removed because we want to keep core simpler, but IIUC it is needed if the signal handler just sets some flags. The core-side does not consider the current status of transaction and running query for simpleness. Best Regards, Hayato Kuroda FUJITSU LIMITED v15-0001-Add-an-infrastracture-for-checking-remote-server.patch Description: v15-0001-Add-an-infrastracture-for-checking-remote-server.patch v15-0002-postgres_fdw-Implement-health-check-feature.patch Description: v15-0002-postgres_fdw-Implement-health-check-feature.patch v15-0003-add-doc.patch Description: v15-0003-add-doc.patch v15-0004-add-test.patch Description: v15-0004-add-test.patch
RE: [PATCH] Use indexes on the subscriber when REPLICA IDENTITY is full on the publisher
> One last thing - do you think there is any need to mention this > behaviour in the pgdocs, or is OK just to be a hidden performance > improvement? FYI - I put my opinion. We have following sentence in the logical-replication.sgml: ``` ... If the table does not have any suitable key, then it can be set to replica identity full, which means the entire row becomes the key. This, however, is very inefficient and should only be used as a fallback if no other solution is possible. ... ``` Here the word "very inefficient" may mean that sequential scans will be executed every time. I think some descriptions can be added around here. Best Regards, Hayato Kuroda FUJITSU LIMITED
RE: [PATCH] Use indexes on the subscriber when REPLICA IDENTITY is full on the publisher
Dear Önder, Thanks for updating the patch! I will check it later. Currently I just reply to your comments. > Hmm, I couldn't realize this comment earlier. So you suggest "slow" here > refers to the additional function call "GetRelationIdentityOrPK"? If so, yes > I'll update that. Yes I meant to say that, because functions will be called like: GetRelationIdentityOrPK() -> RelationGetPrimaryKeyIndex() -> RelationGetIndexList() -> .. and according to comments last one seems to do the heavy lifting. > Makes sense, simplified the function. Though, it is always hard to pick good > names for these kinds of helper functions. I picked > GenerateDummySelectPlannerInfoForRelation(), does that sound good to you as > well? I could not find any better naming than yours. Best Regards, Hayato Kuroda FUJITSU LIMITED
RE: [PATCH] Use indexes on the subscriber when REPLICA IDENTITY is full on the publisher
Dear Önder, Thank you for proposing good feature. I'm also interested in the patch, So I started to review this. Followings are initial comments. === For execRelation.c 01. RelationFindReplTupleByIndex() ``` /* Start an index scan. */ InitDirtySnapshot(snap); - scan = index_beginscan(rel, idxrel, &snap, - IndexRelationGetNumberOfKeyAttributes(idxrel), - 0); /* Build scan key. */ - build_replindex_scan_key(skey, rel, idxrel, searchslot); + scankey_attoff = build_replindex_scan_key(skey, rel, idxrel, searchslot); + scan = index_beginscan(rel, idxrel, &snap, scankey_attoff, 0); ``` I think "/* Start an index scan. */" should be just above index_beginscan(). === For worker.c 02. sable_indexoid_internal() ``` + * Note that if the corresponding relmapentry has InvalidOid usableIndexOid, + * the function returns InvalidOid. + */ +static Oid +usable_indexoid_internal(ApplyExecutionData *edata, ResultRelInfo *relinfo) ``` "InvalidOid usableIndexOid" should be "invalid usableIndexOid," 03. check_relation_updatable() ``` * We are in error mode so it's fine this is somewhat slow. It's better to * give user correct error. */ - if (OidIsValid(GetRelationIdentityOrPK(rel->localrel))) + if (OidIsValid(rel->usableIndexOid)) { ``` Shouldn't we change the above comment to? The check is no longer slow. === For relation.c 04. GetCheapestReplicaIdentityFullPath() ``` +static Path * +GetCheapestReplicaIdentityFullPath(Relation localrel) +{ + PlannerInfo *root; + Query *query; + PlannerGlobal *glob; + RangeTblEntry *rte; + RelOptInfo *rel; + int attno; + RangeTblRef *rt; + List *joinList; + Path *seqScanPath; ``` I think the part that constructs dummy-planner state can be move to another function because that part is not meaningful for this. Especially line 824-846 can. === For 032_subscribe_use_index.pl 05. general ``` +# insert some initial data within the range 0-1000 +$node_publisher->safe_psql('postgres', + "INSERT INTO test_replica_id_full SELECT i%20 FROM generate_series(0,1000)i;" +); ``` It seems that the range of initial data seems [0, 19]. Same mistake-candidates are found many place. 06. general ``` +# updates 1000 rows +$node_publisher->safe_psql('postgres', + "UPDATE test_replica_id_full SET x = x + 1 WHERE x = 15;"); ``` Only 50 tuples are modified here. Same mistake-candidates are found many place. 07. general ``` +# we check if the index is used or not +$node_subscriber->poll_query_until( + 'postgres', q{select (idx_scan = 200) from pg_stat_all_indexes where indexrelname = 'test_replica_id_full_idx';} +) or die "Timed out while waiting for check subscriber tap_sub_rep_full_3 updates 200 rows via index"; ``` The query will be executed until the index scan is finished, but it may be not commented. How about changing it to "we wait until the index used on the subscriber-side." or something? Same comments are found in many place. 08. test related with ANALYZE ``` +# Testcase start: SUBSCRIPTION CAN UPDATE THE INDEX IT USES AFTER ANALYZE - PARTITIONED TABLE +# ``` "Testcase start:" should be "Testcase end:" here. 09. general In some tests results are confirmed but in other test they are not. I think you can make sure results are expected in any case if there are no particular reasons. Best Regards, Hayato Kuroda FUJITSU LIMITED
RE: logical replication restrictions
Hi, Sorry for noise but I found another bug. When the 032_apply_delay.pl is modified like following, the test will be always failed even if my patch is applied. ``` # Disable subscription. worker should die immediately. -$node_subscriber->safe_psql('postgres', - "ALTER SUBSCRIPTION tap_sub DISABLE" +$node_subscriber->safe_psql('postgres', q{ +BEGIN; +ALTER SUBSCRIPTION tap_sub DISABLE; +SELECT pg_sleep(1); +COMMIT; +} ); ``` The point of failure is same as I reported previously. ``` ... 2022-09-14 12:00:48.891 UTC [11330] 032_apply_delay.pl LOG: statement: ALTER SUBSCRIPTION tap_sub SET (min_apply_delay = 8646) 2022-09-14 12:00:48.910 UTC [11226] DEBUG: sending feedback (force 0) to recv 0/1690220, write 0/1690220, flush 0/1690220 2022-09-14 12:00:48.937 UTC [11208] DEBUG: server process (PID 11328) exited with exit code 0 2022-09-14 12:00:48.950 UTC [11226] DEBUG: logical replication apply delay: 86459996 ms 2022-09-14 12:00:48.950 UTC [11226] CONTEXT: processing remote data for replication origin "pg_16393" during "BEGIN" in transaction 734 finished at 0/16902A8 2022-09-14 12:00:48.979 UTC [11208] DEBUG: forked new backend, pid=11334 socket=6 2022-09-14 12:00:49.007 UTC [11334] 032_apply_delay.pl LOG: statement: BEGIN; 2022-09-14 12:00:49.008 UTC [11334] 032_apply_delay.pl LOG: statement: ALTER SUBSCRIPTION tap_sub DISABLE; 2022-09-14 12:00:49.009 UTC [11334] 032_apply_delay.pl LOG: statement: SELECT pg_sleep(1); 2022-09-14 12:00:49.009 UTC [11226] DEBUG: check status of MySubscription 2022-09-14 12:00:49.009 UTC [11226] CONTEXT: processing remote data for replication origin "pg_16393" during "BEGIN" in transaction 734 finished at 0/16902A8 2022-09-14 12:00:49.009 UTC [11226] DEBUG: logical replication apply delay: 86459937 ms 2022-09-14 12:00:49.009 UTC [11226] CONTEXT: processing remote data for replication origin "pg_16393" during "BEGIN" in transaction 734 finished at 0/16902A8 ... ``` I think it may be caused that waken worker read catalogs that have not modified yet. In AlterSubscription(), the backend kicks the apply worker ASAP, but it should be at end of the transaction, like ApplyLauncherWakeupAtCommit() and AtEOXact_ApplyLauncher(). ``` + /* +* If this subscription has been disabled and it has an apply +* delay set, wake up the logical replication worker to finish +* it as soon as possible. +*/ + if (!opts.enabled && sub->applydelay > 0) + logicalrep_worker_wakeup(sub->oid, InvalidOid); + ``` How do you think? Best Regards, Hayato Kuroda FUJITSU LIMITED
RE: Perform streaming logical transactions by background workers and parallel apply
Hi, > > 01. filename > > The word-ordering of filename seems not good > > because you defined the new worker as "parallel apply worker". > > > > I think in the future we may have more files for apply work (like > applyddl.c for DDL apply work), so it seems okay to name all apply > related files in a similar way. > > That flag is set only when an apply worker spill the transaction to the > > disk. > > How about "in_streamed_transaction" -> "in_spilled_transaction"? > > > > Isn't this an existing variable? If so, it doesn't seem like a good > idea to change the name unless we are changing its meaning. Both of you said are reasonable. They do not have to be modified. Best Regards, Hayato Kuroda FUJITSU LIMITED
RE: Perform streaming logical transactions by background workers and parallel apply
Dear Hou-san, > I will dig your patch more, but I send partially to keep the activity of the > thread. More minor comments about v28. === About 0002 For 015_stream.pl 14. check_parallel_log ``` +# Check the log that the streamed transaction was completed successfully +# reported by parallel apply worker. +sub check_parallel_log +{ + my ($node_subscriber, $offset, $is_parallel)= @_; + my $parallel_message = 'finished processing the transaction finish command'; + + if ($is_parallel) + { + $node_subscriber->wait_for_log(qr/$parallel_message/, $offset); + } +} ``` I think check_parallel_log() should be called only when streaming = 'parallel' and if-statement is not needed === For 016_stream_subxact.pl 15. test_streaming ``` + INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3, 500) s(i); ``` "3" should be "3". === About 0003 For applyparallelworker.c 16. parallel_apply_relation_check() ``` + if (rel->parallel_apply_safe == PARALLEL_APPLY_SAFETY_UNKNOWN) + logicalrep_rel_mark_parallel_apply(rel); ``` I was not clear when logicalrep_rel_mark_parallel_apply() is called here. IIUC parallel_apply_relation_check() is called when parallel apply worker handles changes, but before that relation is opened via logicalrep_rel_open() and parallel_apply_safe is set here. If it guards some protocol violation, we may use Assert(). === For create_subscription.sgml 17. The restriction about foreign key does not seem to be documented. === About 0004 For 015_stream.pl 18. check_parallel_log I heard that the removal has been reverted, but in the patch check_parallel_log() is removed again... :-( === About throughout I checked the test coverage via `make coverage`. About appluparallelworker.c and worker.c, both function coverage is 100%, and line coverages are 86.2 % and 94.5 %. Generally it's good. But I read the report and following parts seems not tested. In parallel_apply_start_worker(): ``` if (tmp_winfo->error_mq_handle == NULL) { /* * Release the worker information and try next one if the parallel * apply worker exited cleanly. */ ParallelApplyWorkersList = foreach_delete_current(ParallelApplyWorkersList, lc); shm_mq_detach(tmp_winfo->mq_handle); dsm_detach(tmp_winfo->dsm_seg); pfree(tmp_winfo); } ``` In HandleParallelApplyMessage(): ``` case 'X': /* Terminate, indicating clean exit */ { shm_mq_detach(winfo->error_mq_handle); winfo->error_mq_handle = NULL; break; } ``` Does it mean that we do not test the termination of parallel apply worker? If so I think it should be tested. Best Regards, Hayato Kuroda FUJITSU LIMITED
RE: Perform streaming logical transactions by background workers and parallel apply
Dear Hou-san, Thank you for updating the patch! Followings are comments for v28-0001. I will dig your patch more, but I send partially to keep the activity of the thread. === For applyparallelworker.c 01. filename The word-ordering of filename seems not good because you defined the new worker as "parallel apply worker". 02. global variable ``` +/* Parallel apply workers hash table (initialized on first use). */ +static HTAB *ParallelApplyWorkersHash = NULL; + +/* + * List that stores the information of parallel apply workers that were + * started. Newly added worker information will be removed from the list at the + * end of the transaction when there are enough workers in the pool. Besides, + * exited workers will be removed from the list after being detected. + */ +static List *ParallelApplyWorkersList = NIL; ``` Could you add descriptions about difference between the list and hash table? IIUC the Hash stores the parallel workers that are assigned to transacitons, and the list stores all alive ones. 03. parallel_apply_find_worker ``` + /* Return the cached parallel apply worker if valid. */ + if (stream_apply_worker != NULL) + return stream_apply_worker; ``` This is just a question - Why the given xid and the assigned xid to the worker are not checked here? Is there chance to find wrong worker? 04. parallel_apply_start_worker ``` +/* + * Start a parallel apply worker that will be used for the specified xid. + * + * If a parallel apply worker is not in use then re-use it, otherwise start a + * fresh one. Cache the worker information in ParallelApplyWorkersHash keyed by + * the specified xid. + */ +void +parallel_apply_start_worker(TransactionId xid) ``` "parallel_apply_start_worker" should be "start_parallel_apply_worker", I think 05. parallel_apply_stream_abort ``` for (i = list_length(subxactlist) - 1; i >= 0; i--) { xid = list_nth_xid(subxactlist, i); if (xid == subxid) { found = true; break; } } ``` Please not reuse the xid, declare and use another variable in the else block or something. 06. parallel_apply_free_worker ``` + if (napplyworkers > (max_parallel_apply_workers_per_subscription / 2)) + { ``` Please add a comment like: "Do we have enough workers in the pool?" or something. === For worker.c 07. general In many lines if-else statement is used for apply_action, but I think they should rewrite as switch-case statement. 08. global variable ``` -static bool in_streamed_transaction = false; +bool in_streamed_transaction = false; ``` a. It seems that in_streamed_transaction is used only in the worker.c, so we can change to stati variable. b. That flag is set only when an apply worker spill the transaction to the disk. How about "in_streamed_transaction" -> "in_spilled_transaction"? 09. apply_handle_stream_prepare ``` - elog(DEBUG1, "received prepare for streamed transaction %u", prepare_data.xid); ``` I think this debug message is still useful. 10. apply_handle_stream_stop ``` + if (apply_action == TA_APPLY_IN_PARALLEL_WORKER) + { + pgstat_report_activity(STATE_IDLEINTRANSACTION, NULL); + } + else if (apply_action == TA_SEND_TO_PARALLEL_WORKER) + { ``` The ordering of the STREAM {STOP, START} is checked only when an apply worker spill the transaction to the disk. (This is done via in_streamed_transaction) I think checks should be added here, like if (!stream_apply_worker) or something. 11. apply_handle_stream_abort ``` + if (in_streamed_transaction) + ereport(ERROR, + (errcode(ERRCODE_PROTOCOL_VIOLATION), +errmsg_internal("STREAM ABORT message without STREAM STOP"))); ``` I think the check by stream_apply_worker should be added. 12. apply_handle_stream_commit a. ``` if (in_streamed_transaction) ereport(ERROR, (errcode(ERRCODE_PROTOCOL_VIOLATION), errmsg_internal("STREAM COMMIT message without STREAM STOP"))); ``` I think the check by stream_apply_worker should be added. b. ``` - elog(DEBUG1, "received commit for streamed transaction %u", xid); ``` I think this debug message is still useful. === For launcher.c 13. logicalrep_worker_stop_by_slot ``` + LogicalRepWorker *worker = &LogicalRepCtx->workers[slot_no]; + + LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); + + /* Return if the generation doesn't match or the worker is not alive. */ + if (worker->generation != generation || + worker->proc == NULL) + return; + ``` a. LWLockAcquire(LogicalRepWorkerLock) is needed before reading slots. b. LWLockRelease(LogicalRepWorker
RE: test_decoding assertion failure for the loss of top-sub transaction relationship
Dear Amit, Thanks for giving comments! > Did you get this new assertion failure after you applied the patch for > the first failure? Because otherwise, how can you reach it with the > same test case? The first failure is occurred only in the HEAD, so I did not applied the first patch to REL14 and REL15. This difference is caused because the commit [Fix catalog lookup...] in REL15(272248a) and older is different from the HEAD one. In order versions SnapBuildXidSetCatalogChanges() has been added. In the function a transaction will be marked as containing catalog changes if the transaction is in InitialRunningXacts, and after that the relation between sub-top transactions is assigned based on the parsed->subxact. The marking avoids the first failure, but the assignment triggers new failure. > About patch: > else if (sub_needs_timetravel) > { > - /* track toplevel txn as well, subxact alone isn't meaningful */ > + elog(DEBUG2, "forced transaction %u to do timetravel due to one of > its subtransaction", > + xid); > + needs_timetravel = true; > SnapBuildAddCommittedTxn(builder, xid); > > Why did you remove the above comment? I think it still makes sense to retain > it. Fixed. Best Regards, Hayato Kuroda FUJITSU LIMITED HEAD-v2-0001-Fix-assertion-failure-during-logical-decoding.patch Description: HEAD-v2-0001-Fix-assertion-failure-during-logical-decoding.patch
RE: test_decoding assertion failure for the loss of top-sub transaction relationship
> I was not sure what's the proper way to fix it. > The solution I've thought at first was transporting all invalidations from > sub to top > like ReorderBufferTransferSnapToParent(), > but I do not know its side effect. Moreover, how do we deal with > ReorderBufferChange? > Should we transfer them too? If so, how about the ordering of changes? > Alternative solustion was just remove the assertion, but was it OK? PSA the PoC patch for discussion. In this patch only invalidation messages are transported, changes hold by subtxn are ignored. This can be passed the reported workload. Best Regards, Hayato Kuroda FUJITSU LIMITED REL14-0001-PoC-Fix-assertion-failure-during-logical-decoding.patch Description: REL14-0001-PoC-Fix-assertion-failure-during-logical-decoding.patch
RE: test_decoding assertion failure for the loss of top-sub transaction relationship
Dear hackers, > I agreed both that DEBUG2 messages are still useful but we should not > change the condition for output. So I prefer the idea suggested by Amit. PSA newer patch, which contains the fix and test. > > I have not verified but I think we need to backpatch this till 14 > > because prior to that in DecodeCommit, we use to set the top-level txn > > as having catalog changes based on the if there are invalidation > > messages in the commit record. So, in the current scenario shared by > > Osumi-San, before SnapBuildCommitTxn(), the top-level txn will be > > marked as having catalog changes. > > I and Osumi-san are now investigating that, so please wait further reports and > patches. We investigated it about older versions, and in some versions *another stack-trace* has been found. About PG10-13, indeed, the failure was not occurred. In these versions transactions are regarded as that have catalog changes when the commit record has XACT_XINFO_HAS_INVALS flag. This flag will be set if the transaction has invalidation messages. When sub transaction changes system catalogs and user commits, all invalidation messages allocated in sub transaction will be transferred to top transaction. Therefore both transactions will be marked as containing catalog changes. About PG14 and 15, however, another stack-trace has been found. While executing the same workload, we got followings at the same SQL statement; ``` (gdb) backtrace #0 0x7fa78c6dc387 in raise () from /lib64/libc.so.6 #1 0x7fa78c6dda78 in abort () from /lib64/libc.so.6 #2 0x00b16680 in ExceptionalCondition (conditionName=0xcd3ab0 "txn->ninvalidations == 0", errorType=0xcd3284 "FailedAssertion", fileName=0xcd32d0 "reorderbuffer.c", lineNumber=2936) at assert.c:69 #3 0x008e9e70 in ReorderBufferForget (rb=0x12b5b10, xid=735, lsn=24125384) at reorderbuffer.c:2936 #4 0x008d9493 in DecodeCommit (ctx=0x12a2d20, buf=0x7ffe08b236b0, parsed=0x7ffe08b23510, xid=734, two_phase=false) at decode.c:733 #5 0x008d8962 in DecodeXactOp (ctx=0x12a2d20, buf=0x7ffe08b236b0) at decode.c:279 #6 0x008d85e2 in LogicalDecodingProcessRecord (ctx=0x12a2d20, record=0x12a30e0) at decode.c:142 #7 0x008dfef2 in pg_logical_slot_get_changes_guts (fcinfo=0x129acb0, confirm=true, binary=false) at logicalfuncs.c:296 #8 0x008e002f in pg_logical_slot_get_changes (fcinfo=0x129acb0) at logicalfuncs.c:365 ... (gdb) frame 4 #4 0x008d9493 in DecodeCommit (ctx=0x14cfd20, buf=0x7ffc638b0ca0, parsed=0x7ffc638b0b00, xid=734, two_phase=false) at decode.c:733 733 ReorderBufferForget(ctx->reorder, parsed->subxacts[i], buf->origptr); (gdb) list 728 */ 729 if (DecodeTXNNeedSkip(ctx, buf, parsed->dbId, origin_id)) 730 { 731 for (i = 0; i < parsed->nsubxacts; i++) 732 { 733 ReorderBufferForget(ctx->reorder, parsed->subxacts[i], buf->origptr); 734 } 735 ReorderBufferForget(ctx->reorder, xid, buf->origptr); 736 737 return; (gdb) frame 3 #3 0x008e9e70 in ReorderBufferForget (rb=0x14e2b10, xid=735, lsn=24125152) at reorderbuffer.c:2936 2936Assert(txn->ninvalidations == 0); (gdb) list 2931 */ 2932if (txn->base_snapshot != NULL && txn->ninvalidations > 0) 2933ReorderBufferImmediateInvalidation(rb, txn->ninvalidations, 2934 txn->invalidations); 2935else 2936Assert(txn->ninvalidations == 0); 2937 2938/* remove potential on-disk data, and deallocate */ 2939ReorderBufferCleanupTXN(rb, txn); 2940} (gdb) print *txn $1 = {txn_flags = 3, xid = 735, toplevel_xid = 734, gid = 0x0, first_lsn = 24113488, final_lsn = 24125152, end_lsn = 0, toptxn = 0x14ecb98, restart_decoding_lsn = 24113304, origin_id = 0, origin_lsn = 0, commit_time = 0, base_snapshot = 0x0, base_snapshot_lsn = 0, base_snapshot_node = {prev = 0x14ecc00, next = 0x14e2b28}, snapshot_now = 0x0, command_id = 4294967295, nentries = 5, nentries_mem = 5, changes = {head = {prev = 0x14eecf8, next = 0x14eeb18}}, tuplecids = {head = {prev = 0x14ecb10, next = 0x14ecb10}}, ntuplecids = 0, tuplecid_hash = 0x0, toast_hash = 0x0, subtxns = {head = {prev = 0x14ecb38, next = 0x14ecb38}}, nsubtxns = 0, ninvalidations = 3, invalidations = 0x14e2d28, node = {prev = 0x14ecc68, next = 0x14ecc68}, size = 452, total_size = 452, concurrent_abort = false, output_plugin_private = 0x0} ``` In these versions DecodeCommit() said OK. However, we have met another failure because the ReorderBufferTXN of the sub transaction had invalidation messages but it did not have base_snapshot. I thought that this failure was occurred the only the base_snapsh
RE: test_decoding assertion failure for the loss of top-sub transaction relationship
> > I'm basically fine, too. But this is a bug that needs back-patching > > back to 10. > > > > I have not verified but I think we need to backpatch this till 14 > because prior to that in DecodeCommit, we use to set the top-level txn > as having catalog changes based on the if there are invalidation > messages in the commit record. So, in the current scenario shared by > Osumi-San, before SnapBuildCommitTxn(), the top-level txn will be > marked as having catalog changes. I and Osumi-san are now investigating that, so please wait further reports and patches. > > This change changes the condition for the DEBUG2 message. > > So we need to add an awkward if() condition for the DEBUG2 message. > > Looking that the messages have different debug-level, I doubt there > > have been a chance they are useful. If we remove the two DEBUGx > > messages, I'm fine with the change. > > > > I think these DEBUG2 messages could be useful, so instead of removing > these, I suggest we should follow Dilip's proposed fix and maybe add a > new DEBUG2 message on the lines of (("forced transaction %u to do > timetravel due to one of its subtransaction", xid) in the else if > (sub_needs_timetravel) condition if we think that will be useful too > but I am fine leaving the addition of new DEBUG2 message. I agreed both that DEBUG2 messages are still useful but we should not change the condition for output. So I prefer the idea suggested by Amit. Best Regards, Hayato Kuroda FUJITSU LIMITED
RE: test_decoding assertion failure for the loss of top-sub transaction relationship
Dear Horiguchi-san, Dilip, Thank you for replying! > > It seems that SnapBuildCommitTxn() is already taking care of adding > > the top transaction to the committed transaction if any subtransaction > > has the catalog changes, it has just missed setting the flag so I > > think just setting the flag like this should be sufficient no? > > Oops! That's right. Basically I agreed, but I was not sure the message "found top level transaction..." should be output or not. It may be useful even if one of sub transactions contains the change. How about following? diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c index bf72ad45ec..a630522907 100644 --- a/src/backend/replication/logical/snapbuild.c +++ b/src/backend/replication/logical/snapbuild.c @@ -1086,8 +1086,17 @@ SnapBuildCommitTxn(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid, } } - /* if top-level modified catalog, it'll need a snapshot */ - if (SnapBuildXidHasCatalogChanges(builder, xid, xinfo)) + /* +* if top-level or one of sub modified catalog, it'll need a snapshot. +* +* Normally the second check is not needed because the relation between +* top-sub transactions is tracked on the ReorderBuffer layer, and the top +* transaction is marked as containing catalog changes if its children are. +* But in some cases the relation may be missed, in which case only the sub +* transaction may be marked as containing catalog changes. +*/ + if (SnapBuildXidHasCatalogChanges(builder, xid, xinfo) + || sub_needs_timetravel) { elog(DEBUG2, "found top level transaction %u, with catalog changes", xid); @@ -1095,11 +1104,6 @@ SnapBuildCommitTxn(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid, needs_timetravel = true; SnapBuildAddCommittedTxn(builder, xid); } - else if (sub_needs_timetravel) - { - /* track toplevel txn as well, subxact alone isn't meaningful */ - SnapBuildAddCommittedTxn(builder, xid); - } else if (needs_timetravel) { elog(DEBUG2, "forced transaction %u to do timetravel", xid); Best Regards, Hayato Kuroda FUJITSU LIMITED
RE: test_decoding assertion failure for the loss of top-sub transaction relationship
Hi Hackers, > Therefore, this leads to the failure for the assert that can check > the consistency that when one sub transaction modifies the catalog, > its top transaction should be marked so as well. > > I feel we need to remember the relationship between top transaction and sub > transaction > in the serialized snapshot even before changing catalog at decoding > RUNNING_XACT, > so that we can keep track of the association after the restart. What do you > think ? PSA patch that fixes the failure. This adds pairs of sub-top transactions to the SnapBuild, and it will be serialized and restored. The pair will be checked when we mark the ReorderBufferTXN as RBTXN_HAS_CATALOG_CHANGES. Thanks to off-list discussion with Osumi-san. Best Regards, Hayato Kuroda FUJITSU LIMITED 0001-mark-RBTXN_HAS_CATALOG_CHANGES-to-the-top-transactio.patch Description: 0001-mark-RBTXN_HAS_CATALOG_CHANGES-to-the-top-transactio.patch
RE: patch: Add missing descriptions for rmgr APIs
> Your observation seems correct to me but you have not updated the > comment for the mask. Is there a reason for the same? Oh, it seems that I attached wrong one. There is no reason. PSA the newer version. Best Regards, Hayato Kuroda FUJITSU LIMITED v2-0001-add-a-missing-comment.patch Description: v2-0001-add-a-missing-comment.patch
patch: Add missing descriptions for rmgr APIs
Hi hackers, While reading codes related with logical decoding, I thought that following comment in rmgrlist.h is not consistent. > /* symbol name, textual name, redo, desc, identify, startup, cleanup */ This comment describes a set of APIs that the resource manager should have, but functions for {mask, decode} are missed here. Did we have any reasons for that? I thought it might be not friendly, so I attached a patch. Best Regards, Hayato Kuroda FUJITSU LIMITED 0001-add-a-missing-comment.patch Description: 0001-add-a-missing-comment.patch
RE: Perform streaming logical transactions by background workers and parallel apply
Dear Wang, Followings are my comments about v23-0003. Currently I do not have any comments about 0002 and 0004. 09. general It seems that logicalrep_rel_mark_parallel_apply() is always called when relations are opened on the subscriber-side, but is it really needed? There checks are required only for streaming parallel apply, so it may be not needed in case of streaming = 'on' or 'off'. 10. commit message 2) There cannot be any non-immutable functions used by the subscriber-side replicated table. Look for functions in the following places: * a. Trigger functions * b. Column default value expressions and domain constraints * c. Constraint expressions * d. Foreign keys "Foreign key" should not be listed here because it is not related with the mutability. I think it should be listed as 3), not d.. 11. create_subscription.sgml The constraint about foreign key should be described here. 11. relation.c 11.a + CacheRegisterSyscacheCallback(PROCOID, + logicalrep_relmap_reset_parallel_cb, + (Datum) 0); Isn't it needed another syscache callback for pg_type? Users can add any constraints via ALTER DOMAIN command, but the added constraint may be not checked. I checked AlterDomainAddConstraint(), and it invalidates only the relcache for pg_type. 11.b + /* +* If the column is of a DOMAIN type, determine whether +* that domain has any CHECK expressions that are not +* immutable. +*/ + if (get_typtype(att->atttypid) == TYPTYPE_DOMAIN) + { I think the default value of *domain* must be also checked here. I tested like followings. === 1. created a domain that has a default value CREATE DOMAIN tmp INT DEFAULT 1 CHECK (VALUE > 0); 2. created a table CREATE TABLE foo (id tmp PRIMARY KEY); 3. checked pg_attribute and pg_class select oid, relname, attname, atthasdef from pg_attribute, pg_class where pg_attribute.attrelid = pg_class.oid and pg_class.relname = 'foo' and attname = 'id'; oid | relname | attname | atthasdef ---+-+-+--- 16394 | foo | id | f (1 row) Tt meant that functions might be not checked because the if-statement `if (att->atthasdef)` became false. === 12. 015_stream.pl, 016_stream_subxact.pl, 022_twophase_cascade.pl, 023_twophase_stream.pl - my ($node_publisher, $node_subscriber, $appname, $is_parallel) = @_; + my ($node_publisher, $node_subscriber, $appname) = @_; Why the parameter is removed? I think the test that waits the output from the apply background worker is meaningful. 13. 032_streaming_apply.pl The filename seems too general because apply background workers are tested in above tests. How about "streaming_apply_constraint" or something? Best Regards, Hayato Kuroda FUJITSU LIMITED
RE: Perform streaming logical transactions by background workers and parallel apply
Dear Wang, Thank you for updating the patch! Followings are comments about v23-0001 and v23-0005. v23-0001 01. logical-replication.sgml + + When the streaming mode is parallel, the finish LSN of + failed transactions may not be logged. In that case, it may be necessary to + change the streaming mode to on and cause the same + conflicts again so the finish LSN of the failed transaction will be written + to the server log. For the usage of finish LSN, please refer to ALTER SUBSCRIPTION ... + SKIP. + I was not sure about streaming='off' mode. Is there any reasons that only ON mode is focused? 02. protocol.sgml + + Int64 (XLogRecPtr) + + + The LSN of the abort. This field is available since protocol version + 4. + + + + + + Int64 (TimestampTz) + + + Abort timestamp of the transaction. The value is in number + of microseconds since PostgreSQL epoch (2000-01-01). This field is + available since protocol version 4. + + + + It seems that changes are in the variablelist for stream commit. I think these are included in the stream abort message, so it should be moved. 03. decode.c - ReorderBufferForget(ctx->reorder, parsed->subxacts[i], buf->origptr); + ReorderBufferForget(ctx->reorder, parsed->subxacts[i], buf->origptr, + commit_time); } - ReorderBufferForget(ctx->reorder, xid, buf->origptr); + ReorderBufferForget(ctx->reorder, xid, buf->origptr, commit_time); 'commit_time' has been passed as argument 'abort_time', I think it may be confusing. How about adding a comment above, like: "In case of streamed transactions, they are regarded as being aborted at commit_time" 04. launcher.c 04.a + worker->main_worker_pid = is_subworker ? MyProcPid : 0; You can use InvalidPid instead of 0. (I thought pid should be represented by the datatype pid_t, but in some codes it is defined as int...) 04.b + worker->main_worker_pid = 0; You can use InvalidPid instead of 0, same as above. 05. origin.c void -replorigin_session_setup(RepOriginId node) +replorigin_session_setup(RepOriginId node, int acquired_by) IIUC the same slot can be used only when the apply main worker has already acquired the slot and the subworker for the same subscription tries to acquire, but it cannot understand from comments. How about adding comments, or an assertion that acquired_by is same as session_replication_state->acquired_by ? Moreover acquired_by should be compared with InvalidPid, based on above comments. 06. proto.c void logicalrep_write_stream_abort(StringInfo out, TransactionId xid, - TransactionId subxid) + ReorderBufferTXN *txn, XLogRecPtr abort_lsn, + bool write_abort_lsn I think write_abort_lsn may be not needed, because abort_lsn can be used for controlling whether abort_XXX fields should be filled or not. 07. worker.c +/* + * The number of changes during one streaming block (only for apply background + * workers) + */ +static uint32 nchanges = 0; This variable is used only by the main apply worker, so the comment seems not correct. How about "...(only for SUBSTREAM_PARALLEL case)"? v23-0005 08. monitoring.sgml I cannot decide which option proposed in [1] is better, but followings descriptions are needed in both cases. (In [2] I had intended to propose something like option 2) 08.a You can add a description that the field 'relid' will be NULL even for apply background worker. 08.b You can add a description that fields 'received_lsn', 'last_msg_send_time', 'last_msg_receipt_time', 'latest_end_lsn', 'latest_end_time' will be NULL for apply background worker. [1]: https://www.postgresql.org/message-id/CAHut%2BPuPwdwZqXBJjtU%2BR9NULbOpxMG%3Di2hmqgg%2B7p0rmK0hrw%40mail.gmail.com [2]: https://www.postgresql.org/message-id/TYAPR01MB58660B4732E7F80B322174A3F5629%40TYAPR01MB5866.jpnprd01.prod.outlook.com Best Regards, Hayato Kuroda FUJITSU LIMITED
RE: Perform streaming logical transactions by background workers and parallel apply
Hi Wang, > 6.a > > It seems that the upper line represents the apply background worker, but I > think > last_msg_send_time and last_msg_receipt_time should be null. > Is it like initialization mistake? I checked again about the issue. Attributes worker->last_send_time, worker->last_recv_time, and worker->reply_time are initialized in logicalrep_worker_launch(): ``` ... TIMESTAMP_NOBEGIN(worker->last_send_time); TIMESTAMP_NOBEGIN(worker->last_recv_time); worker->reply_lsn = InvalidXLogRecPtr; TIMESTAMP_NOBEGIN(worker->reply_time); ... ``` And the macro is defined in timestamp.h, and it seems that the values are initialized as PG_INT64_MIN. ``` #define DT_NOBEGIN PG_INT64_MIN #define DT_NOENDPG_INT64_MAX #define TIMESTAMP_NOBEGIN(j)\ do {(j) = DT_NOBEGIN;} while (0) ``` However, in pg_stat_get_subscription(), these values are regarded as null if they are zero. ``` if (worker.last_send_time == 0) nulls[4] = true; else values[4] = TimestampTzGetDatum(worker.last_send_time); if (worker.last_recv_time == 0) nulls[5] = true; else values[5] = TimestampTzGetDatum(worker.last_recv_time); ``` I think above lines are wrong, these values should be compared with PG_INT64_MIN. Best Regards, Hayato Kuroda FUJITSU LIMITED
RE: Perform streaming logical transactions by background workers and parallel apply
Dear Wang, Thanks for updating patch sets! Followings are comments about v20-0001. 1. config.sgml ``` Specifies maximum number of logical replication workers. This includes both apply workers and table synchronization workers. ``` I think you can add a description in the above paragraph, like " This includes apply main workers, apply background workers, and table synchronization workers." 2. logical-replication.sgml 2.a Configuration Settings ``` max_logical_replication_workers must be set to at least the number of subscriptions, again plus some reserve for the table synchronization. ``` I think you can add a description in the above paragraph, like "... the number of subscriptions, plus some reserve for the table synchronization and the streaming transaction." 2.b Monitoring ``` Normally, there is a single apply process running for an enabled subscription. A disabled subscription or a crashed subscription will have zero rows in this view. If the initial data synchronization of any table is in progress, there will be additional workers for the tables being synchronized. ``` I think you can add a sentence in the above paragraph, like "... synchronized. Moreover, if the streaming transaction is applied parallelly, there will be additional workers" 3. launcher.c ``` + /* Sanity check : we don't support table sync in subworker. */ ``` I think "Sanity check :" should be "Sanity check:", per other files. 4. worker.c 4.a handle_streamed_transaction() ``` - /* not in streaming mode */ - if (!in_streamed_transaction) + /* Not in streaming mode */ + if (!(in_streamed_transaction || am_apply_bgworker())) ``` I think the comment should also mention about apply background worker case. 4.b handle_streamed_transaction() ``` - Assert(stream_fd != NULL); ``` I think this assersion seems reasonable in case of stream='on'. Could you revive it and move to later part in the function, like after subxact_info_add(current_xid)? 4.c apply_handle_prepare_internal() ``` * BeginTransactionBlock is necessary to balance the EndTransactionBlock * called within the PrepareTransactionBlock below. */ - BeginTransactionBlock(); + if (!IsTransactionBlock()) + BeginTransactionBlock(); + ``` I think the comment should be "We must be in transaction block to balance...". 4.d apply_handle_stream_prepare() ``` - * - * Logic is in two parts: - * 1. Replay all the spooled operations - * 2. Mark the transaction as prepared */ static void apply_handle_stream_prepare(StringInfo s) ``` I think these comments are useful when stream='on', so it should be moved to later part. 5. applybgworker.c 5.a apply_bgworker_setup() ``` + elog(DEBUG1, "setting up apply worker #%u", list_length(ApplyBgworkersList) + 1); ``` "apply worker" should be "apply background worker". 5.b LogicalApplyBgwLoop() ``` + elog(DEBUG1, "[Apply BGW #%u] ended processing streaming chunk," +"waiting on shm_mq_receive", shared->worker_id); ``` A blank is needed after comma. I checked serverlog, and the message outputed like: ``` [Apply BGW #1] ended processing streaming chunk,waiting on shm_mq_receive ``` 6. When I started up the apply background worker and did `SELECT * from pg_stat_subscription`, I got following lines: ``` postgres=# select * from pg_stat_subscription; subid | subname | pid | relid | received_lsn | last_msg_send_time | last_msg_receipt_time | latest_end_lsn |latest_end _time ---+-+---+---+--+---+---++-- - 16400 | sub | 22383 | | | -infinity | -infinity || -infinity 16400 | sub | 22312 | | 0/6734740| 2022-08-09 07:40:19.367676+00 | 2022-08-09 07:40:19.375455+00 | 0/6734740 | 2022-08-09 07:40: 19.367676+00 (2 rows) ``` 6.a It seems that the upper line represents the apply background worker, but I think last_msg_send_time and last_msg_receipt_time should be null. Is it like initialization mistake? ``` $ ps aux | grep 22383 ... postgres: logical replication apply background worker for subscription 16400 ``` 6.b Currently, the documentation doesn't clarify the method to determine the type of logical replication workers. Could you add descriptions about it? I think adding a column "subworker" is an alternative approach. Best Regards, Hayato Kuroda FUJITSU LIMITED
RE: Perform streaming logical transactions by background workers and parallel apply
Dear Wang, I found further comments about the test code. 11. src/test/regress/sql/subscription.sql ``` -- fail - streaming must be boolean CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, streaming = foo); ``` The comment is no longer correct: should be "streaming must be boolean or 'parallel'" 12. src/test/regress/sql/subscription.sql ``` -- now it works CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, streaming = true); ``` I think we should test the case of streaming = 'parallel'. 13. 015_stream.pl I could not find test about TRUNCATE. IIUC apply bgworker works well even if it gets LOGICAL_REP_MSG_TRUNCATE message from main worker. Can you add the case? Best Regards, Hayato Kuroda FUJITSU LIMITED
RE: Support load balancing in libpq
Dear Jelte, > With plain Postgres this assumption is probably correct. But the main reason > I'm interested in this patch was because I would like to be able to load > balance across the workers in a Citus cluster. These workers are all > primaries. > Similar usage would likely be possible with BDR (bidirectional replication). I agree this feature is useful for BDR-like solutions. > If the user takes such care when building their host list, they could simply > not add the load_balance_hosts=true option to the connection string. > If you know there's only one primary in the list and you're looking for > the primary, then there's no reason to use load_balance_hosts=true. You meant that it was the user responsibility to set correctly, right? It seemed reasonable because libpq was just a library for connecting to server and should not be smart. Best Regards, Hayato Kuroda FUJITSU LIMITED
RE: Perform streaming logical transactions by background workers and parallel apply
Dear Wang-san, Hi, I'm also interested in the patch and I started to review this. Followings are comments about 0001. 1. terminology In your patch a new worker "apply background worker" has been introduced, but I thought it might be confused because PostgreSQL has already the worker "background worker". Both of apply worker and apply bworker are categolized as bgworker. Do you have any reasons not to use "apply parallel worker" or "apply streaming worker"? (Note that I'm not native English speaker) 2. logicalrep_worker_stop() ``` - /* No worker, nothing to do. */ - if (!worker) - { - LWLockRelease(LogicalRepWorkerLock); - return; - } + if (worker) + logicalrep_worker_stop_internal(worker); + + LWLockRelease(LogicalRepWorkerLock); +} ``` I thought you could add a comment the meaning of if-statement, like "No main apply worker, nothing to do" 3. logicalrep_workers_find() I thought you could add a description about difference between this and logicalrep_worker_find() at the top of the function. IIUC logicalrep_workers_find() counts subworker, but logicalrep_worker_find() does not focus such type of workers. 4. logicalrep_worker_detach() ``` static void logicalrep_worker_detach(void) { + /* +* If we are the main apply worker, stop all the apply background workers +* we started before. +* ``` I thought "we are" should be "This is", based on other comments. 5. applybgworker.c ``` +/* Apply background workers hash table (initialized on first use) */ +static HTAB *ApplyWorkersHash = NULL; +static List *ApplyWorkersFreeList = NIL; +static List *ApplyWorkersList = NIL; ``` I thought they should be ApplyBgWorkersXXX, because they stores information only related with apply bgworkers. 6. ApplyBgworkerShared ``` + TransactionId stream_xid; + uint32 n; /* id of apply background worker */ +} ApplyBgworkerShared; ``` I thought the field "n" is too general, how about "proc_id" or "worker_id"? 7. apply_bgworker_wait_for() ``` + /* If any workers (or the postmaster) have died, we have failed. */ + if (status == APPLY_BGWORKER_EXIT) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), +errmsg("background worker %u failed to apply transaction %u", + wstate->shared->n, wstate->shared->stream_xid))) ``` 7.a I thought we should not mention about PM death here, because in this case apply worker will exit at WaitLatch(). 7.b The error message should be "apply background worker %u...". 8. apply_bgworker_check_status() ``` +errmsg("background worker %u exited unexpectedly", + wstate->shared->n))); ``` The error message should be "apply background worker %u...". 9. apply_bgworker_send_data() ``` + if (result != SHM_MQ_SUCCESS) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), +errmsg("could not send tuples to shared-memory queue"))); ``` I thought the error message should be "could not send data to..." because sent data might not be tuples. For example, in case of STEAM PREPARE, I thit does not contain tuple. 10. wait_event.h ``` WAIT_EVENT_HASH_GROW_BUCKETS_REINSERT, + WAIT_EVENT_LOGICAL_APPLY_WORKER_STATE_CHANGE, WAIT_EVENT_LOGICAL_SYNC_DATA, ``` I thought the event should be WAIT_EVENT_LOGICAL_APPLY_BG_WORKER_STATE_CHANGE, because this is used when apply worker waits until the status of bgworker changes. Best Regards, Hayato Kuroda FUJITSU LIMITED
RE: Collect ObjectAddress for ATTACH DETACH PARTITION to use in event trigger
Dear Hou-san, > Thanks for having a look. It was a bit difficult to add a test for this. > Because we currently don't have a user function which can return these > collected ObjectAddresses for ALTER TABLE. And It seems we don't have tests > for > already collected ObjectAddresses as well :( > The collected ObjectAddresses is in > "currentEventTriggerState->currentCommand->d.alterTable.subcmds.address" while > the public function pg_event_trigger_ddl_commands doesn't return these > information. It can only be used in user defined event trigger function (C > code). Thanks for explaining. I did not know the reason why the test is not in event_trigger.sql. > If we want to add some tests for both already existed and newly added > ObjectAddresses, we might need to add some test codes in test_ddl_deparse.c. > What do you think ? I thought tests for ObjectAddresses should be added to test_ddl_deparse.c, but it might be bigger because there were many ATExecXXX() functions. I thought they could be added separately in another thread or patch. Best Regards, Hayato Kuroda FUJITSU LIMITED
RE: Support load balancing in libpq
Dear Jelte, I like your idea. But do we have to sort randomly even if target_session_attr is set to 'primary' or 'read-write'? I think this parameter can be used when all listed servers have same data, and we can assume that one of members is a primary and others are secondary. In this case user maybe add a primary host to top of the list, so sorting may increase time durations for establishing connection. Best Regards, Hayato Kuroda FUJITSU LIMITED
RE: Collect ObjectAddress for ATTACH DETACH PARTITION to use in event trigger
Hi, > > I noticed that we didn't collect the ObjectAddress returned by > > ATExec[Attach|Detach]Partition. I think collecting this information can > > make it > > easier for users to get the partition OID of the attached or detached table > > in > > the event trigger. So how about collecting it like the attached patch ? > > Added to next CF. Sounds good. I grepped ATExecXXX() functions called in ATExecCmd(), and I confirmed that all returned values have been collected except them. While checking test code test about EVENT TRIGGER, I found there were no tests related with partitions in that. How about adding them? Best Regards, Hayato Kuroda FUJITSU LIMITED
RE: Multi-Master Logical Replication
Dear Takahashi-san, Thanks for giving feedbacks! > > I don't know if it requires the kind of code you are thinking but I > > agree that it is worth considering implementing it as an extension. > > I think the other advantage to implement as an extension is that users could > install the extension to older Postgres. > > As mentioned in previous email, the one use case of n-way replication is > migration > from older Postgres to newer Postgres. > > If we implement as an extension, users could use n-way replication for > migration > from PG10 to PG16. > I think even if LRG is implemented as contrib modules or any extensions, it will deeply depend on the subscription option "origin" proposed in [1]. So LRG cannot be used for older version, only PG16 or later. [1]: https://www.postgresql.org/message-id/caldanm3pt1cpeb3y9pe7ff91gzvpnxr91y4ztwiw6h+gayg...@mail.gmail.com Best Regards, Hayato Kuroda FUJITSU LIMITED
RE: Multi-Master Logical Replication
Dear hackers, I found another use-case for LRG. It might be helpful for migration. LRG for migration -- LRG may be helpful for machine migration, OS upgrade, or PostgreSQL itself upgrade. Assumes that users want to migrate database to other environment, e.g., PG16 on RHEL7 to PG18 on RHEL8. Users must copy all data into new server and catchup all changes. In this case streaming replication cannot be used because it requires same OS and same PostgreSQL major version. Moreover, it is desirable to be able to return to the original environment at any time in case of application or other environmental deficiencies. Operation steps with LRG -- LRG is appropriate for the situation. Following lines are the workflow that users must do: 1. Copy the table definition to the newer node(PG18), via pg_dump/pg_restore 2. Execute lrg_create() in the older node(PG16) 3. Execute lrg_node_attach() in PG18 === data will be shared here=== 4. Change the connection of the user application to PG18 5. Check whether ERROR is raised or not. If some ERRORs are raised, users can change back the connection to PG16. 6. Remove the created node group if application works well. These operations may reduce system downtime due to incompatibilities associated with version upgrades. Best Regards, Hayato Kuroda FUJITSU LIMITED
RE: Multi-Master Logical Replication
Hi hackers, I created a small PoC. Please see the attached patches. REQUIREMENT Before patching them, patches in [1] must also be applied. DIFFERENCES FROM PREVIOUS DESCRIPTIONS * LRG is now implemented as SQL functions, not as a contrib module. * New tables are added as system catalogs. Therefore, added tables have oid column. * The node_id is the strcat of system identifier and dbid. HOW TO USE In the document patch, a subsection 'Example' was added for understanding LRG. In short, we can do 1. lrg_create on one node 2. lrg_node_attach on another node Also attached is a test script that constructs a three-nodes system. LIMITATIONS This feature is under development, so there are many limitations for use case. * The function for detaching a node from a group is not implemented. * The function for removing a group is not implemented. * LRG does not lock system catalogs and databases. Concurrent operations may cause inconsistent state. * LRG does not wait until the upstream node reaches the latest lsn of the remaining nodes. * LRG does not support initial data sync. That is, it can work well only when all nodes do not have initial data. [1]: https://commitfest.postgresql.org/38/3610/ Best Regards, Hayato Kuroda FUJITSU LIMITED v1-0001-PoC-implement-LRG.patch Description: v1-0001-PoC-implement-LRG.patch v1-0002-add-doc.patch Description: v1-0002-add-doc.patch test.sh Description: test.sh
RE: Multi-Master Logical Replication
Dear Laurenz, Thank you for your interest in our works! > I am missing a discussion how replication conflicts are handled to > prevent replication from breaking or the databases from drifting apart. Actually we don't have plans for developing the feature that avoids conflict. We think that it should be done as core PUB/SUB feature, and this module will just use that. Best Regards, Hayato Kuroda FUJITSU LIMITED
RE: Handle infinite recursion in logical replication setup
Dear Peter, > FYI, here is a test script that is using the current patch (v6) to > demonstrate a way to share table data between different numbers of > nodes (up to 5 of them here). Thanks for sharing your script! It's very helpful for us. While reading your script, however, I had a question about it. Line 121-122, you defined subscriptions for 2-nodes cluster: psql -p $port_N1 -c "create subscription sub12 connection 'port=$port_N2' publication pub2 with ($copy_force);" psql -p $port_N2 -c "create subscription sub21 connection 'port=$port_N1' publication pub1 with ($copy_force);" But I was not sure it works well. N2 already have shared data from N1 when subscription sub21 is created. Did you assume that the initial copying is not so quick and data synchronization will be not done when creating sub21? Best Regards, Hayato Kuroda FUJITSU LIMITED
RE: Logical replication timeout problem
Dear Amit, Wang, > > I think maybe we do not need to deal with this use case. > > The maximum number of table columns allowed by PG is 1600 > > (macro MaxHeapAttributeNumber), and after loop through all columns in the > > function logicalrep_write_tuple, the function OutputPluginWrite will be > > invoked > > immediately to actually send the data to the subscriber. This refreshes the > > last time the subscriber received a message. > > So I think this loop will not cause timeout issues. > > > > Right, I also don't think it can be a source of timeout. OK. I have no comments for this version. Best Regards, Hayato Kuroda FUJITSU LIMITED
RE: Logical replication timeout problem
Dear Wang-san, Thank you for updating! ...but it also cannot be applied to current HEAD because of the commit 923def9a533. Your patch seems to conflict the adding an argument of logicalrep_write_insert(). It allows specifying columns to publish by skipping some columns in logicalrep_write_tuple() which is called from logicalrep_write_insert() and logicalrep_write_update(). Do we have to consider something special case for that? I thought timeout may occur if users have huge table and publish few columns, but it is corner case. Best Regards, Hayato Kuroda FUJITSU LIMITED
RE: Logical replication timeout problem
Dear Amit, > It seems by mistake you have removed the changes from pgoutput_message > and pgoutput_truncate functions. I have added those back. > Additionally, I made a few other changes: (a) moved the function > UpdateProgress to pgoutput.c as it is not used outside it, (b) change > the new parameter in plugin API from 'send_keep_alive' to 'last_write' > to make it look similar to WalSndPrepareWrite and WalSndWriteData, (c) > made a number of changes in WalSndUpdateProgress API, it is better to > move keep-alive code after lag track code because we do process > replies at that time and there it will compute the lag; (d) > changed/added comments in the code. LGTM, but the patch cannot be applied to current HEAD. Best Regards, Hayato Kuroda FUJITSU LIMITED
RE: Handle infinite recursion in logical replication setup
Dear Vignesh, > Thanks for kind explanation. > I read above and your doc in 0002, and I put some comments. I forgot a comment about 0002 doc. 5. create_subscription.sgml - about your example Three possibilities were listed in the doc, but I was not sure about b) case. In the situation Node1 and Node2 have already become multi-master, and data has already synced at that time. If so, how do we realize that "there is data present only in one Node"? Case a) and c) seem reasonable. Best Regards, Hayato Kuroda FUJITSU LIMITED
RE: Handle infinite recursion in logical replication setup
Dear Vignesh, Thank you for updating your patch! > Let's consider an existing Multi master logical replication setup > between Node1 and Node2 that is created using the following steps: > a) Node1 - Publication publishing employee table - pub1 > b) Node2 - Subscription subscribing from publication pub1 with > publish_local_only - sub1_pub1_node1 > c) Node2 - Publication publishing employee table - pub2 > d) Node1 - Subscription subscribing from publication pub2 with > publish_local_only - sub2_pub2_node2 > > To create a subscription in node3, we will be using the following steps: > a) Node2 - Publication publishing employee table. - pub3 > b) Node3 - Subscription subscribing from publication in Node2 with > publish_local_only - sub3_pub3_node2 > > When we create a subscription in Node3, Node3 will connect to > Node2(this will not be done in Node3) and check if the employee table > is present in pg_subscription_rel, in our case Node2 will have > employee table present in pg_subscription_rel (sub1_pub1_node1 > subscribing to employee table from pub1 in Node1). As employee table > is being subscribed in node2 from node1, we will throw an error like > below: > postgres=# create subscription sub2 CONNECTION 'dbname =postgres port > = ' publication pub2 with (publish_local_only=on); > ERROR: CREATE/ALTER SUBSCRIPTION with publish_local_only and > copy_data as true is not allowed when the publisher might have > replicated data, table:public.t1 might have replicated data in the > publisher > HINT: Use CREATE/ALTER SUBSCRIPTION with copy_data = off or force Thanks for kind explanation. I read above and your doc in 0002, and I put some comments. 1. alter_subscription.sgml ``` -copy_data (boolean) +copy_data (boolean | force) ``` I thought that it should be written as enum. For example, huge_pages GUC parameter can accept {on, off, try}, and it has been written as enum. 2. create_subscription.sgml ``` -copy_data (boolean) +copy_data (boolean | force) ``` Same as above. 3. create_subscription.sgml ``` + + + If the publication tables were also subscribing data in the publisher + from other publishers, it will affect the + CREATE SUBSCRIPTION based on the value specified + for publish_local_only option. Refer to the + for details. + ``` I seeked docs, but the words " publication tables " have not seen. How about "tables in the publication"? 4. create_subscription.sgml - about your example In the first section, we should describe about 2-nodes case more detail like Amit mentioned in [1]. I thought that Option-3 can be resolved by defining subscriptions in both nodes with publish_local_only = true and copy_data = force. > I was initially planning to add srreplicateddata field but I have > changed it slightly to keep the design simple. Now we just check if > the relation is present in pg_subscription_rel and throw an error if > copy_data and publish_local_only option is specified. The changes for > the same are available at [1]. > > [1] - > https://www.postgresql.org/message-id/CALDaNm0V%2B%3Db%3DCeZJNAAU > O2PmSXH5QzNX3jADXb-0hGO_jVj0vA%40mail.gmail.com > Thoughts? Actually I doubted that the column is really needed, so it is good. Best Regards, Hayato Kuroda FUJITSU LIMITED
RE: Handle infinite recursion in logical replication setup
Hi Vegnesh, While considering about second problem, I was very confusing about it. I'm happy if you answer my question. > To handle this if user has specified only_local option, we could throw > a warning or error out while creating subscription in this case, we > could have a column srreplicateddata in pg_subscription_rel which > could indicate if the table has any replicated data or not: > postgres=# select * from pg_subscription_rel; > srsubid | srrelid | srsubstate | srsublsn | srreplicateddata > -+-++---+-- >16389 | 16384 | r | 0/14A4640 |t >16389 | 16385 | r | 0/14A4690 |f > (1 row) > In the above example, srreplicateddata with true indicates, tabel t1 > whose relid is 16384 has replicated data and the other row having > srreplicateddata as false indicates table t2 whose relid is 16385 > does not have replicated data. > When creating a new subscription, the subscriber will connect to the > publisher and check if the relation has replicated data by checking > srreplicateddata in pg_subscription_rel table. > If the table has any replicated data, log a warning or error for this. IIUC srreplicateddata represents whether the subscribed data is not generated from the publisher, but another node. My first impression was that the name 'srreplicateddata' is not friendly because all subscribed data is replicated from publisher. Also I was not sure how value of the column was set. IIUC a filtering by replication origins is done in publisher node and subscriber node cannot know whether some data are really filtered or not. If we distinguish by subscriber option publish_local_only, it cannot reproduce your example because same subscriber have different 'srreplicateddata'. Best Regards, Hayato Kuroda FUJITSU LIMITED
RE: Logical replication timeout problem
Dear Wang, Thank you for updating! > > Do we need adding a test for them? I think it can be added to 100_bugs.pl. > > Actually I tried to send PoC, but it does not finish to implement that. > > I'll send if it is done. > I'm not sure if it is worth it. > Because the reproduced test of this bug might take some time and might risk > making the build farm slow, so I am not sure if others would like the > reproduced test of this bug. I was taught from you that it may suggest that it is difficult to stabilize and minimize the test. I withdraw the above. I put some comments for v2, mainly cosmetic ones. 1. pgoutput_change ``` + bool is_send = true; ``` My first impression is that is_send should be initialized to false, and it will change to true when OutputPluginWrite() is called. 2. pgoutput_change ``` + { + is_send = false; + break; + } ``` Here are too many indents, but I think they should be removed. See above comment. 3. WalSndUpdateProgress ``` + /* +* If half of wal_sender_timeout has lapsed without send message standby, +* send a keep-alive message to the standby. +*/ ``` The comment seems inconsistency with others. Here is "keep-alive", but other parts are "keepalive". 4. ReorderBufferProcessTXN ``` + change->data.inval.ninvalidations, + change->data.inval.invalidations); ``` Maybe these lines break 80-columns rule. Best Regards, Hayato Kuroda FUJITSU LIMITED
RE: Logical replication timeout problem
Dear Wang, Thank you for updating the patch! Good self-reviewing. > And I looked into the function WalSndUpdateProgress. I found function > WalSndUpdateProgress try to record the time of some message(by function > LagTrackerWrite) sent to subscriber, such as in function pgoutput_commit_txn. Yeah, I think you are right. > Then, when publisher receives the reply message from the subscriber(function > ProcessStandbyReplyMessage), publisher invokes LagTrackerRead to calculate > the > delay time(refer to view pg_stat_replication). > Referring to the purpose of LagTrackerWrite, I think it is no need to log time > when sending keepalive messages here. > So when the parameter send_keep_alive of function WalSndUpdateProgress is > true, > skip the recording time. I also read them. LagTracker records the elapsed time between sending commit from publisher and receiving reply from subscriber, right? It seems good. Do we need adding a test for them? I think it can be added to 100_bugs.pl. Actually I tried to send PoC, but it does not finish to implement that. I'll send if it is done. Best Regards, Hayato Kuroda FUJITSU LIMITED
RE: Handle infinite recursion in logical replication setup
Dear Vignesh, > I felt changing only_local option might be useful for the user while > modifying the subscription like setting it with a different set of > publications. Changes for this are included in the v2 patch attached > at [1]. +1, thanks. I'll post if I notice something to say. > Shall we get a few opinions on this and take it in that direction? I prefer subscriber-option, but I also think both are reasonable. +1 about asking other reviewers. Best Regards, Hayato Kuroda FUJITSU LIMITED
RE: Handle infinite recursion in logical replication setup
Dear Peter, > > So, why does the patch use syntax option 1? IMU it might be useful for the following case. Assuming that multi-master configuration with node1, node2. Node1 has a publication pub1 and a subscription sub2, node2 has pub2 and sub1. From that situation, please consider that new node node3 is added that subscribe some changes from node2. If the feature is introduced as option1, new publication must be defined in node2. If that is introduced as option2, however, maybe pub2 can be reused. i.e. multiple declaration of publications can be avoided. Best Regards, Hayato Kuroda FUJITSU LIMITED
RE: Logical replication timeout problem
Dear Wang, > Some codes were added in ReorderBufferProcessTXN() for treating DDL, My mailer went wrong, so I'll put comments again. Sorry. Some codes were added in ReorderBufferProcessTXN() for treating DDL, but I doubted updating accept_writes is needed. IMU, the parameter is read by OutputPluginPrepareWrite() in order align messages. They should have a header - like 'w' - before their body. But here only a keepalive message is sent, no meaningful changes, so I think it might be not needed. I commented out the line and tested like you did [1], and no timeout and errors were found. Do you have any reasons for that? https://www.postgresql.org/message-id/OS3PR01MB6275A95FD44DC6C46058EA389E3B9%40OS3PR01MB6275.jpnprd01.prod.outlook.com Best Regards, Hayato Kuroda FUJITSU LIMITED
RE: [Proposal] Add foreign-server health checks infrastructure
Hi Hackers, > It's not happy, but I'm not sure about a good solution. I made a timer > reschedule > if connection lost had detected. But if queries in the transaction are quite > short, > catching SIGINT may be fail. Attached uses another way: sets pending flags again if DoingCommandRead is true. If a remote server goes down while it is in idle_in_transaction, next query will fail because of ereport(ERROR). How do you think? Best Regards, Hayato Kuroda FUJITSU LIMITED v14_0001_expose_cancel_message.patch Description: v14_0001_expose_cancel_message.patch v14_0002_add_health_check.patch Description: v14_0002_add_health_check.patch v14_0003_add_doc.patch Description: v14_0003_add_doc.patch <>
RE: Logical replication timeout problem
Dear Wang, > Attach the new patch. [suggestion by Kuroda-San] > 1. Fix the typo. > 2. Improve comment style. > 3. Fix missing consideration. > 4. Add comments to clarifies above functions and function calls. Thank you for updating the patch! I confirmed they were fixed. ``` case REORDER_BUFFER_CHANGE_INVALIDATION: - /* Execute the invalidation messages locally */ - ReorderBufferExecuteInvalidations( - change->data.inval.ninvalidations, - change->data.inval.invalidations); - break; + { + LogicalDecodingContext *ctx = rb->private_data; + + Assert(!ctx->fast_forward); + + /* Set output state. */ + ctx->accept_writes = true; + ctx->write_xid = txn->xid; + ctx->write_location = change->lsn; ``` Some codes were added in ReorderBufferProcessTXN() for treating DDL, I'm also happy if you give the version number :-). Best Regards, Hayato Kuroda FUJITSU LIMITED > -Original Message- > From: Wang, Wei/王 威 > Sent: Wednesday, March 2, 2022 11:06 AM > To: Kuroda, Hayato/黒田 隼人 > Cc: Fabrice Chapuis ; Simon Riggs > ; Petr Jelinek > ; Tang, Haiying/唐 海英 > ; Amit Kapila ; > PostgreSQL Hackers ; Ajin Cherian > > Subject: RE: Logical replication timeout problem > > On Mon, Feb 28, 2022 at 6:58 PM Kuroda, Hayato/黒田 隼人 > wrote: > > Dear Wang, > > > > > Attached a new patch that addresses following improvements I have got > > > so far as > > > comments: > > > 1. Consider other changes that need to be skipped(truncate, DDL and > > > function calls pg_logical_emit_message). [suggestion by Ajin, Amit] > > > (Add a new function SendKeepaliveIfNecessary for trying to send > > > keepalive > > > message.) > > > 2. Set the threshold conservatively to a static value of > > > 1.[suggestion by Amit, Kuroda-San] 3. Reset sendTime in function > > > WalSndUpdateProgress when send_keep_alive is false. [suggestion by > > > Amit] > > > > Thank you for giving a good patch! I'll check more detail later, but it can > > be > > applied my codes and passed check world. > > I put some minor comments: > Thanks for your comments. > > > ``` > > + * Try to send keepalive message > > ``` > > > > Maybe missing "a"? > Fixed. Add missing "a". > > > ``` > > + /* > > + * After continuously skipping SKIPPED_CHANGES_THRESHOLD > changes, try > > to send a > > + * keepalive message. > > + */ > > ``` > > > > This comments does not follow preferred style: > > https://www.postgresql.org/docs/devel/source-format.html > Fixed. Correct wrong comment style. > > > ``` > > @@ -683,12 +683,12 @@ OutputPluginWrite(struct LogicalDecodingContext > *ctx, > > bool last_write) > > * Update progress tracking (if supported). > > */ > > void > > -OutputPluginUpdateProgress(struct LogicalDecodingContext *ctx) > > +OutputPluginUpdateProgress(struct LogicalDecodingContext *ctx, bool > > +send_keep_alive) > > ``` > > > > This function is no longer doing just tracking. > > Could you update the code comment above? > Fixed. Update the comment above function OutputPluginUpdateProgress. > > > ``` > > if (!is_publishable_relation(relation)) > > return; > > ``` > > > > I'm not sure but it seems that the function exits immediately if relation > > is a > > sequence, view, temporary table and so on. Is it OK? Does it never happen? > I did some checks to confirm this. After my confirmation, there are several > situations that can cause a timeout. For example, if I insert many date into > table sql_features in a long transaction, subscriber-side will time out. > Although I think users should not modify these tables arbitrarily, it could > happen. To be conservative, I think this use case should be addressed as well. > Fixed. Invoke function SendKeepaliveIfNecessary before return. > > > ``` > > + SendKeepaliveIfNecessary(ctx, false); > > ``` > > > > I think a comment is needed above which clarifies sending a keepalive > message. > Fixed. Before invoking function SendKeepaliveIfNecessary, add the > corresponding > comment. > > Attach the new patch. [suggestion by Kuroda-San] > 1. Fix the typo. > 2. Improve comment style. > 3. Fix missing consideration. > 4. Add comments to clarifies above functions and function calls. > > Regards, > Wang wei
RE: Handle infinite recursion in logical replication setup
Hi Vignesh, > In logical replication, currently Walsender sends the data that is > generated locally and the data that are replicated from other > instances. This results in infinite recursion in circular logical > replication setup. Thank you for good explanation. I understand that this fix can be used for a bidirectional replication. > Here there are two problems for the user: a) incremental > synchronization of table sending both local data and replicated data > by walsender b) Table synchronization of table using copy command > sending both local data and replicated data So you wanted to solve these two problem and currently focused on the first one, right? We can check one by one. > For the first problem "Incremental synchronization of table by > Walsender" can be solved by: > Currently the locally generated data does not have replication origin > associated and the data that has originated from another instance will > have a replication origin associated. We could use this information to > differentiate locally generated data and replicated data and send only > the locally generated data. This "only_local" could be provided as an > option while subscription is created: > ex: CREATE SUBSCRIPTION sub1 CONNECTION 'dbname =postgres port=5433' > PUBLICATION pub1 with (only_local = on); Sounds good, but I cannot distinguish whether the assumption will keep. I played with your patch, but it could not be applied to current master. I tested from bd74c40 and I confirmed infinite loop was not appeared. local_only could not be set from ALTER SUBSCRIPTION command. Is it expected? Best Regards, Hayato Kuroda FUJITSU LIMITED
RE: Logical replication timeout problem
Dear Wang, > Attached a new patch that addresses following improvements I have got so far > as > comments: > 1. Consider other changes that need to be skipped(truncate, DDL and function > calls pg_logical_emit_message). [suggestion by Ajin, Amit] > (Add a new function SendKeepaliveIfNecessary for trying to send keepalive > message.) > 2. Set the threshold conservatively to a static value of 1.[suggestion by > Amit, > Kuroda-San] > 3. Reset sendTime in function WalSndUpdateProgress when send_keep_alive is > false. [suggestion by Amit] Thank you for giving a good patch! I'll check more detail later, but it can be applied my codes and passed check world. I put some minor comments: ``` + * Try to send keepalive message ``` Maybe missing "a"? ``` + /* + * After continuously skipping SKIPPED_CHANGES_THRESHOLD changes, try to send a + * keepalive message. + */ ``` This comments does not follow preferred style: https://www.postgresql.org/docs/devel/source-format.html ``` @@ -683,12 +683,12 @@ OutputPluginWrite(struct LogicalDecodingContext *ctx, bool last_write) * Update progress tracking (if supported). */ void -OutputPluginUpdateProgress(struct LogicalDecodingContext *ctx) +OutputPluginUpdateProgress(struct LogicalDecodingContext *ctx, bool send_keep_alive) ``` This function is no longer doing just tracking. Could you update the code comment above? ``` if (!is_publishable_relation(relation)) return; ``` I'm not sure but it seems that the function exits immediately if relation is a sequence, view, temporary table and so on. Is it OK? Does it never happen? ``` + SendKeepaliveIfNecessary(ctx, false); ``` I think a comment is needed above which clarifies sending a keepalive message. Best Regards, Hayato Kuroda FUJITSU LIMITED
RE: Logical replication timeout problem
Dear Wang, Thank you for teaching some backgrounds about the patch. > According to our discussion, we need to send keepalive messages to subscriber > when skipping changes. > One approach is that **for each skipped change**, we try to send keepalive > message by calculating whether a timeout will occur based on the current time > and the last time the keepalive was sent. But this will brings slight > overhead. > So I want to try another approach: after **constantly skipping some changes**, > we try to send keepalive message by calculating whether a timeout will occur > based on the current time and the last time the keepalive was sent. You meant that calling system calls like GetCurrentTimestamp() should be reduced, right? I'm not sure how it affects but it seems reasonable. > IMO, we should send keepalive message after skipping a certain number of > changes constantly. > And I want to calculate the threshold dynamically by using a fixed value to > avoid adding too much code. > In addition, different users have different machine performance, and users can > modify wal_sender_timeout, so the threshold should be dynamically calculated > according to wal_sender_timeout. Your experiment seems not bad, but the background cannot be understand from code comments. I prefer a static threshold because it's more simple, which as Amit said in the following too: https://www.postgresql.org/message-id/CAA4eK1%2B-p_K_j%3DNiGGD6tCYXiJH0ypT4REX5PBKJ4AcUoF3gZQ%40mail.gmail.com > In the existing code, similar operations on wal_sender_timeout use the style > of > (wal_sender_timeout / 2), e.g. function WalSndKeepaliveIfNecessary. So I think > it should be consistent in this patch. > But I think it is better to use magic number too. Maybe we could improve it in > a new thread. I confirmed the code and +1 yours. We should treat it in another thread if needed. BTW, this patch cannot be applied to current master. Best Regards, Hayato Kuroda FUJITSU LIMITED
RE: [Proposal] Add foreign-server health checks infrastructure
Dear Fujii-san, Thank you for your quick reviewing! I attached new version. I found previous patches have wrong name. Sorry. > The connection check timer is re-scheduled repeatedly even while the backend > is > in idle state or is running a local transaction that doesn't access to any > foreign > servers. I'm not sure if it's really worth checking the connections even in > those > states. Even without the periodic connection checks, if the connections are > closed > in those states, subsequent GetConnection() will detect that closed connection > and re-establish the connection when starting remote transaction. Thought? Indeed. We can now control the timer in fdw layer, so disable_timeout() was added at the bottom of pgfdw_xact_callback(). > When a closed connection is detected in idle-in-transaction state and SIGINT > is > raised, nothing happens because there is no query running to be canceled by > SIGINT. Also in this case the connection check timer gets disabled. So we can > still > execute queries that don't access to foreign servers, in the same > transaction, and > then the transaction commit fails. Is this expected behavior? It's not happy, but I'm not sure about a good solution. I made a timer reschedule if connection lost had detected. But if queries in the transaction are quite short, catching SIGINT may be fail. > When I shutdowned the foreign server while the local backend is in > idle-in-transaction state, the connection check timer was triggered and > detected > the closed connection. Then when I executed COMMIT command, I got the > following WARNING message. Is this a bug? > > WARNING: leaked hash_seq_search scan for hash table 0x7fd2ca878f20 Fixed. It is caused because hash_seq_term() was not called when checker detects a connection lost. Best Regards, Hayato Kuroda FUJITSU LIMITED v13_0001_expose_cancel_message.patch Description: v13_0001_expose_cancel_message.patch v13_0002_add_health_check.patch Description: v13_0002_add_health_check.patch v13_0003_add_doc.patch Description: v13_0003_add_doc.patch <>
RE: [Proposal] Add foreign-server health checks infrastructure
Cfbot is still angry because of missing PGDLLIMPORT, so attached. Best Regards, Hayato Kuroda FUJITSU LIMITED v12_0001_add_checking_infrastracture.patch Description: v12_0001_add_checking_infrastracture.patch v12_0002_add_health_check.patch Description: v12_0002_add_health_check.patch v12_0003_add_doc.patch Description: v12_0003_add_doc.patch <>
RE: [Proposal] Add foreign-server health checks infrastructure
Dear Fujii-san, > cfbot is reporting that the 0002 patch fails to be applied cleanly. Could you > update > the patch? > http://cfbot.cputube.org/patch_37_3388.log Thanks for reporting and sorry for inconvenience. I repo was not latest version. Attached can be applied to 52e4f0c Best Regards, Hayato Kuroda FUJITSU LIMITED v11_0001_add_checking_infrastracture.patch Description: v11_0001_add_checking_infrastracture.patch v11_0002_add_health_check.patch Description: v11_0002_add_health_check.patch v11_0003_add_doc.patch Description: v11_0003_add_doc.patch <>
RE: [Proposal] Add foreign-server health checks infrastructure
Dear Horiguchi-san, Fujii-san, > > I understood here as removing following mechanism from core: > > > > * disable timeout at end of tx. > > * skip if held off or read commands > > I think we're on the same page. Anyway query cancel interrupt is > ignored while rading input. > > > > - If an existing connection is found to be dead, just try canceling > > > the query (or sending query cancel). > > > One issue with it is how to show the decent message for the query > > > cancel, but maybe we can have a global variable that suggests the > > > reason for the cancel. > > > > Currently I have no good idea for that but I'll try. > > However, I would like to hear others' opnions about the direction, of > course. > Based on the idea, I re-implemented the feature. Almost all feature is moved to postgres_fdw. The abstract of my patch is as follows: # core * Exposes QueryCancelMessage for error reporting * Uses above if it was not NULL # postgres_fdw * Defines new GUC postgres_fdw.health_check_interval. It is renamed simpler. * Registers a timeout when initializing connection hash. * Raises SIGINT and sets QueryCancelMessage to message. if detects a connection lost. I also attached a test as zipped file for keep cfbot quiet. When connection lost is detected, the following message will show: ``` ERROR: Foreign Server (servername) might be down. ``` How do you think? Best Regards, Hayato Kuroda FUJITSU LIMITED v10_0001_expose_cancel_message.patch Description: v10_0001_expose_cancel_message.patch v10_0002_add_health_check.patch Description: v10_0002_add_health_check.patch v10_0003_add_doc.patch Description: v10_0003_add_doc.patch <>
RE: [Proposal] Add foreign-server health checks infrastructure
Dear Fujii-san, > Isn't it a very special case where many FDWs use their own user timeouts? > Could > you tell me the assumption that you're thinking, especially how many FDWs are > working? I came up with the case like star schema, which postgres database connects data store. If each dbms are different and FDWs have each timeout, many timeout will be registered. But it may be a corner case and should not be confused with OLTP case. So I'll post new patch based on his post. Best Regards, Hayato Kuroda FUJITSU LIMITED
RE: [Proposal] Add foreign-server health checks infrastructure
Dear Horiguchi-san, > I think we just don't need to add the special timeout kind to the > core. postgres_fdw can use USER_TIMEOUT and it would be suffiction to > keep running health checking regardless of transaction state then fire > query cancel if disconnection happens. As I said in the previous main, > possible extra query cancel woud be safe. I finally figured out that you mentioned about user-defined timeout system. Firstly - before posting to hackers - I designed like that, but I was afraid of an overhead that many FDW registers timeout and call setitimer() many times. Is it too overcautious? Best Regards, Hayato Kuroda FUJITSU LIMITED
RE: [Proposal] Add foreign-server health checks infrastructure
> I understood here as removing following mechanism from core: > > * disable timeout at end of tx. While reading again and this part might be wrong. Sorry for inconvenience. But anyway some codes should be (re)moved from core, right? Best Regards, Hayato Kuroda FUJITSU LIMITED
RE: [Proposal] Add foreign-server health checks infrastructure
Dear Horiguchi-san, Thank you for giving your suggestions. I want to confirm your saying. > FWIW, I'm not sure this feature necessarily requires core support > dedicated to FDWs. The core have USER_TIMEOUT feature already and > FDWs are not necessarily connection based. It seems better if FDWs > can implement health check feature without core support and it seems > possible. Or at least the core feature should be more generic and > simpler. Why don't we just expose InTransactionHealthCheckCallbacks or > something and operating functions on it? I understood that core is too complicated and FDW side is too stupid, right? > Mmm. AFAICS the running command will stop with "canceling statement > due to user request", which is a hoax. We need a more decent message > there. +1 about better messages. > I understand that the motive of this patch is "to avoid wasted long > local work when fdw-connection dies". Yeah your understanding is right. > In regard to the workload in > your first mail, it is easily avoided by ending the transaction as soon > as remote access ends. This feature doesn't work for the case "begin; > ; ". But the same measure also works in > that case. So the only case where this feature is useful is "begin; > ; ; ; end;". But in the first > place how frequently do you expecting remote-connection close happens? > If that happens so frequently, you might need to recheck the system > health before implementing this feature. Since it is correctly > detected when something really went wrong, I feel that it is a bit too > complex for the usefulness especially for the core part. Thanks for analyzing motivation. Indeed, some cases may be resolved by separating tx and this event rarely happens. > In conclusion, as my humble opinion I would like to propose to reduce > this feature to: > > - Just periodically check health (in any aspect) of all live > connections regardless of the session state. I understood here as removing following mechanism from core: * disable timeout at end of tx. * skip if held off or read commands > - If an existing connection is found to be dead, just try canceling > the query (or sending query cancel). > One issue with it is how to show the decent message for the query > cancel, but maybe we can have a global variable that suggests the > reason for the cancel. Currently I have no good idea for that but I'll try. Best Regards, Hayato Kuroda FUJITSU LIMITED
RE: [Proposal] Add foreign-server health checks infrastructure
Dear Hackers, I found patches we depend have been committed, so rebased. https://git.postgresql.org/gitweb/?p=postgresql.git;a=commit;h=50e570a59e7f86bb41f029a66b781fc79b8d50f1 In this version there is a little bit change in part of postgres_fdw. A system checking by WaitEventSetCanReportClosed() is added because some OSes cannot wait WL_SOCKET_CLOSED event. Note that test cannot be added in the regression test because cfbot may be not happy. In my environment a test that contained in previous patches works well. 0001, 0002 is not changed from previous version. Best Regards, Hayato Kuroda FUJITSU LIMITED v09_0003_helth_check_for_postgres_fdw.patch Description: v09_0003_helth_check_for_postgres_fdw.patch v09_0002_add_doc_about_infrastructure.patch Description: v09_0002_add_doc_about_infrastructure.patch v09_0001_add_checking_infrastracture.patch Description: v09_0001_add_checking_infrastracture.patch
RE: Logical replication timeout problem
Dear Wang, Thank you for making a patch. I applied your patch and confirmed that codes passed regression test. I put a short reviewing: ``` + static int skipped_changes_count = 0; + /* +* Conservatively, at least 150,000 changes can be skipped in 1s. +* +* Because we use half of wal_sender_timeout as the threshold, and the unit +* of wal_sender_timeout in process is ms, the final threshold is +* wal_sender_timeout * 75. +*/ + int skipped_changes_threshold = wal_sender_timeout * 75; ``` I'm not sure but could you tell me the background of this calculation? Is this assumption reasonable? ``` @@ -654,20 +663,62 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, { case REORDER_BUFFER_CHANGE_INSERT: if (!relentry->pubactions.pubinsert) + { + if (++skipped_changes_count >= skipped_changes_threshold) + { + OutputPluginUpdateProgress(ctx, true); + + /* +* After sending keepalive message, reset +* skipped_changes_count. +*/ + skipped_changes_count = 0; + } return; + } break; ``` Is the if-statement needed? In the walsender case OutputPluginUpdateProgress() leads WalSndUpdateProgress(), and the function also has the threshold for ping-ing. ``` static void -WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid) +WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool send_keep_alive) { - static TimestampTz sendTime = 0; + static TimestampTz trackTime = 0; TimestampTz now = GetCurrentTimestamp(); + if (send_keep_alive) + { + /* +* If half of wal_sender_timeout has lapsed without send message standby, +* send a keep-alive message to the standby. +*/ + static TimestampTz sendTime = 0; + TimestampTz ping_time = TimestampTzPlusMilliseconds(sendTime, + wal_sender_timeout / 2); + if (now >= ping_time) + { + WalSndKeepalive(false); + + /* Try to flush pending output to the client */ + if (pq_flush_if_writable() != 0) + WalSndShutdown(); + sendTime = now; + } + } + ``` * +1 about renaming to trackTime. * `/2` might be magic number. How about following? Renaming is very welcome: ``` +#define WALSND_LOGICAL_PING_FACTOR 0.5 + static TimestampTz sendTime = 0; + TimestampTz ping_time = TimestampTzPlusMilliseconds(sendTime, + wal_sender_timeout * WALSND_LOGICAL_PING_FACTOR) ``` Could you add a commitfest entry for cfbot? Best Regards, Hayato Kuroda FUJITSU LIMITED
RE: [Proposal] Add foreign-server health checks infrastructure
Dear Fujii-san, Thank you for good suggestions. > This logic sounds complicated to me. I'm afraid that FDW developers may a bit > easily misunderstand the logic and make the bug in their FDW. > Isn't it simpler to just disable the timeout in core whenever the transaction > ends > whether committed or aborted, like statement_timeout is disabled after each > command? For example, call something like DisableForeignCheckTimeout() in > CommitTransaction() etc. Your idea is that stop the timer at the end of each transactions, right? I had not adopted that because I thought some developers might want not to stop the timer even if transactions ends. It caused complexed situation and not have good usecase, however, so your logic was implemented. > > You are right. I think this suggests that error-reporting should be done in > > the > core layer. > > For meaningful error reporting, I changed a callback specification > > that it should return ForeignServer* which points to downed remote server. > > This approach seems to assume that FDW must manage all the ForeignServer > information so that the callback can return it. Is this assumption valid for > all the > FDW? Not sure, the assumption might be too optimistic. mysql_fdw can easily return ForeignServer* because it caches serverid, but dblink and maybe oracle_fdw cannot. > How about making FDW trigger a query cancel interrupt by signaling SIGINT to > the backend, instead? I understood that the error should be caught by QueryCancelPending. Could you check 0003? Does it follow that? Best Regards, Hayato Kuroda FUJITSU LIMITED <> v08_0001_add_checking_infrastracture.patch Description: v08_0001_add_checking_infrastracture.patch v08_0002_add_doc.patch Description: v08_0002_add_doc.patch
RE: [Proposal] Add foreign-server health checks infrastructure
Dear Fujii-san, Thank you for reviewing! I attached the latest version. > When more than one FDWs are used, even if one FDW calls this function to > disable the timeout, its remote-server-check-callback can still be called. Is > this > OK? > Please imagine the case where two FDWs are used and they registered their > callbacks. Even when one FDW calls TryDisableRemoteServerCheckingTimeout(), > if another FDW has not called that yet, the timeout is still being enabled. > If the > timeout is triggered during that period, even the callback registered by the > FDW > that has already called TryDisableRemoteServerCheckingTimeout() would be > called. Indeed and it should be avoided. I added a counter to CheckingRemoteServersCallbackItem. The register function returns the registered item, and it must be set as the argument for enable and disable functions. Callback functions will be called when item->counter is larger than zero. > + if (remote_servers_connection_check_interval > 0) > + { > + CallCheckingRemoteServersCallbacks(); > + > enable_timeout_after(CHECKING_REMOTE_SERVERS_TIMEOUT, > + > remote_servers_connection_check_interval); > > LockErrorCleanup() needs to be called before reporting the error, doesn't it? You are right. I think this suggests that error-reporting should be done in the core layer. For meaningful error reporting, I changed a callback specification that it should return ForeignServer* which points to downed remote server. > This can cause an error even while DoingCommandRead == true. Is that safe? I read codes again and I think it is not safe. It is OK when whereToSendOutput is DestRemote, but not good in InteractiveBackend(). I changed that if-statement for CheckingRemoteServersTimeoutPending is moved just after ClientConnectionLost, because the that may throw a FATAL error and disconnect from client. Best Regards, Hayato Kuroda FUJITSU LIMITED <> v07_0001_add_checking_infrastracture.patch Description: v07_0001_add_checking_infrastracture.patch v07_0002_add_doc.patch Description: v07_0002_add_doc.patch
RE: [Proposal] Add foreign-server health checks infrastructure
Dear Fujii-san, Zhihong, I attached the latest version. The biggest change is that callbacks are no longer un-registered at the end of transactions. FDW developer must enable or disable timeout instead, via new APIs. The timer will be turned on when: * new GUC is >= 0, and * anyone calls TryEnableRemoteServerCheckingTimeout(). I think this version is reduced overhead, but it might not be developer friendly... Best Regards, Hayato Kuroda FUJITSU LIMITED <> v06_0002_add_doc.patch Description: v06_0002_add_doc.patch v06_0001_add_checking_infrastracture.patch Description: v06_0001_add_checking_infrastracture.patch
RE: [Proposal] Add foreign-server health checks infrastructure
Dear Fujii-san, Thank you for your interest! I'll post new version within several days. > > Yeah, remote-checking timeout will be enable even ifa local transaction is > opened. > > In my understanding, postgres cannot distinguish whether opening > > transactions > > are using only local object or not. > > My first idea was that turning on the timeout when GetFdwRoutineXXX > functions > > were called, > > What about starting the timeout in GetConnection(), instead? Did you said about a function in postgres_fdw/connection.c? In my understanding that means that the timeout should be enabled or disabled by each FDW extensions. I did not find bad cases for that, so I'll change like that and make new APIs. > v05_0004_add_tests.patch failed to be applied to the master. Could you rebase > it? It's caused because a testcase was added in postgres_fdw. Will rebase. > The above change is included in both > v5-0003-Use-WL_SOCKET_CLOSED-for-client_connection_check_.patch and > v05_0002_add_doc.patch. If it should be in the former patch, it should be > removed > from your patch v05_0002_add_doc.patch. I confused about doc-patch. Sorry for inconvenience. > There seems no user of UnregisterCheckingRemoteServersCallback(). So how > about removing it? Previously I kept the API for any other extensions, but I cannot find use cases. Will remove. Best Regards, Hayato Kuroda FUJITSU LIMITED
RE: [Proposal] Add foreign-server health checks infrastructure
Dear Zhihong, Thank you for reviewing! And I have to apologize for the late. I'll post new patchset later. > + UnregisterAllCheckingRemoteServersCallback(); > > UnregisterAllCheckingRemoteServersCallback > -> UnregisterAllCheckingRemoteServersCallbacks Will fix. > + CheckingRemoteServersCallbackItem *item; > + item = fdw_callbacks; > > The above two lines can be combined. Will fix. > +UnregisterCheckingRemoteServersCallback(CheckingRemoteServersCallback > callback, > + void *arg) > > Is the above func called anywhere ? Currently not, I just kept the API because of any other FDW extensions. But I cannot find any use cases, so will remove. > + if (item->callback == callback && item->arg == arg) > > Is comparing argument pointers stable ? What if the same argument is cloned > ? This part is no longer needed. Will remove. > + CallCheckingRemoteServersCallbacks(); > + > + if (remote_servers_connection_check_interval > 0) > + enable_timeout_after(CHECKING_REMOTE_SERVERS_TIMEOUT, > + > remote_servers_connection_check_interval); > > Should the call to CallCheckingRemoteServersCallbacks() be placed under the > if block checking remote_servers_connection_check_interval ? > If remote_servers_connection_check_interval is 0, it seems the callbacks > shouldn't be called. Agreed. We force stopping timeout when the GUC sets to 0 in assign_hook, so your saying is consistent. Will move. Best Regards, Hayato Kuroda FUJITSU LIMITED
RE: Allow escape in application_name
Dear Sawada-san, > Good idea. But the application_name is actually truncated to 63 > characters (NAMEDATALEN - 1)? If so, we need to do substring(... for > 63) instead. Yeah, the parameter will be truncated as one less than NAMEDATALEN: ``` max_identifier_length (integer) Reports the maximum identifier length. It is determined as one less than the value of NAMEDATALEN when building the server. The default value of NAMEDATALEN is 64; therefore the default max_identifier_length is 63 bytes, which can be less than 63 characters when using multibyte encodings. ``` But in Fujii-san's patch length is picked up by the following SQL, so I think it works well. ``` SELECT max_identifier_length FROM pg_control_init() ``` Best Regards, Hayato Kuroda FUJITSU LIMITED
RE: Allow escape in application_name
Dear Fujii-san, > Attached is the patch that adds the regression test for > postgres_fdw.application_name. Could you review this? > > As Horiguchi-san suggested at [1], I split the test into two, and then > tweaked them > as follows. > > 1. Set application_name option of a server option to 'fdw_%d%p' > 2. Set postgres_fdw.application_name to 'fdw_%a%u%%' > > 'fdw_%d%p' and 'fdw_%a%u%%' still may be larger than NAMEDATALEN > depending on the regression test environment. To make the test stable even in > that case, the patch uses substring() is truncate application_name string in > the > test query's condition to less than NAMEDATALEN. I think it's good because we can care about max_identifier_length, and both of setting methods are used. Also it's smart using pg_terminate_backend(). (Actually I'm writing a test that separates test cases into five parts, but your patch is better.) I think the test failure should be noticed by me, but I missed, sorry. Do you know how to apply my patch and test by buildfarm animals? Best Regards, Hayato Kuroda FUJITSU LIMITED
RE: Allow escape in application_name
Dear Fujii-san, Horiguchi-san, I confirmed that the feature was committed but reverted the test. Now I'm checking buildfarm. But anyway I want to say thank you for your contribution! Best Regards, Hayato Kuroda FUJITSU LIMITED
RE: [PATCH] pg_stat_toast v0.3
Dear Gunnar, > The attached v0.3 includes > * columns "storagemethod" and "compressmethod" added as per Hayato > Kuroda's suggestion I prefer your implementation that referring another system view. > * gathering timing information Here is a minor comment: I'm not sure when we should start to measure time. If you want to record time spent for compressing, toast_compress_datum() should be sandwiched by INSTR_TIME_SET_CURRENT() and caclurate the time duration. Currently time_spent is calcurated in the pgstat_report_toast_activity(), but this have a systematic error. If you want to record time spent for inserting/updating, however, I think we should start measuring at the upper layer, maybe heap_toast_insert_or_update(). > Any hints on how to write meaningful tests would be much appreciated! > I.e., will a test I searched hints from PG sources, and I thought that modules/ subdirectory can be used. Following is the example: https://github.com/postgres/postgres/tree/master/src/test/modules/commit_ts I attached a patch to create a new test. Could you rewrite the sql and the output file? Best Regards, Hayato Kuroda FUJITSU LIMITED adding_new_test.patch Description: adding_new_test.patch
RE: [PATCH] pg_stat_toast
Dear Gunnar, > postgres=# CREATE TABLE test (i int, lz4 text COMPRESSION lz4, std text); > postgres=# INSERT INTO test SELECT > i,repeat(md5(i::text),100),repeat(md5(i::text),100) FROM > generate_series(0,10) x(i); > postgres=# SELECT * FROM pg_stat_toast WHERE schemaname = 'public'; > -[ RECORD 1 ]+-- > schemaname | public > reloid | 16829 > attnum | 2 > relname | test > attname | lz4 > externalizations | 0 > compressions | 11 > compressionsuccesses | 11 > compressionsizesum | 6299710 > originalsizesum | 320403204 > -[ RECORD 2 ]+-- > schemaname | public > reloid | 16829 > attnum | 3 > relname | test > attname | std > externalizations | 0 > compressions | 11 > compressionsuccesses | 11 > compressionsizesum | 8198819 > originalsizesum | 320403204 I'm not sure about TOAST, but currently compressions are configurable: https://git.postgresql.org/gitweb/?p=postgresql.git;a=commit;h=bbe0a81db69bd10bd166907c3701492a29aca294 How about adding a new attribute "method" to pg_stat_toast? ToastAttrInfo *attr->tai_compression represents how compress the data, so I think it's easy to add. Or, is it not needed because pg_attr has information? Best Regards, Hayato Kuroda FUJITSU LIMITED
RE: Allow escape in application_name
Dear Fujii-san, Sorry I forgot replying your messages. > >>if (strcmp(keywords[i], "application_name") == 0 && > >>values[i] != NULL && *(values[i]) != '\0') > > > > I'm not sure but do we have a case that values[i] becomes NULL > > even if keywords[i] is "application_name"? > > No for now, I guess. But isn't it safer to check that, too? I also could not > find strong > reason why that check should be dropped. But you'd like to drop that? No, I agreed the new checking. I'm just afraid of my code missing. Best Regards, Hayato Kuroda FUJITSU LIMITED
RE: Allow escape in application_name
Dear Fujii-san, > Thanks for the review! At first I pushed 0001 patch. I found your commit. Thanks! > BTW, 0002 patch adds the regression test that checks > pg_stat_activity.application_name. But three months before, we added the > similar > test when introducing postgres_fdw.application_name GUC and > reverted/removed it because it's not stable [1]. So we should review carefully > whether the test 0002 patch adds may have the same issue or not. As far as I > read > the patch, ISTM that the patch has no same issue. But could you double check > that? I agreed we will not face the problem. When postgres_fdw_disconnect_all() is performed, we just send a character 'X' to remote backends(in sendTerminateConn() and lower functions) and return without any blockings. After receiving 'X' message in remote backends, proc_exit() is performed and processes will be died. The test failure is caused because SELECT statement is performed before dying backends perfectly. Currently we search pg_stat_activity and this is not affected by residual rows because the condition is too strict to exist others. Hence I think this test is stable. How do you think? Best Regards, Hayato Kuroda FUJITSU LIMITED
RE: Allow escape in application_name
Dear Fujii-san, Thank you for updating! I read your patches and I have only one comment. > if (strcmp(keywords[i], "application_name") == 0 && > values[i] != NULL && *(values[i]) != '\0') I'm not sure but do we have a case that values[i] becomes NULL even if keywords[i] is "application_name"? I think other parts are perfect. Best Regards, Hayato Kuroda FUJITSU LIMITED