On Thu, Sep 15, 2022 at 10:45 AM wangw.f...@fujitsu.com <wangw.f...@fujitsu.com> wrote: > > Attach the new patch set. >
Review of v29-0001* ================== 1. +parallel_apply_find_worker(TransactionId xid) { ... + entry = hash_search(ParallelApplyWorkersHash, &xid, HASH_FIND, &found); + if (found) + { + /* If any workers (or the postmaster) have died, we have failed. */ + if (entry->winfo->error_mq_handle == NULL) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("lost connection to parallel apply worker"))); ... } I think the above comment is incorrect because if the postmaster would have died then you wouldn't have found the entry in the hash table. How about something like: "We can't proceed if the parallel streaming worker has already exited." 2. +/* + * Find the previously assigned worker for the given transaction, if any. + */ +ParallelApplyWorkerInfo * +parallel_apply_find_worker(TransactionId xid) No need to use word 'previously' in the above sentence. 3. + * We need one key to register the location of the header, and we need + * another key to track the location of the message queue. + */ + shm_toc_initialize_estimator(&e); + shm_toc_estimate_chunk(&e, sizeof(ParallelApplyWorkerShared)); + shm_toc_estimate_chunk(&e, queue_size); + shm_toc_estimate_chunk(&e, error_queue_size); + + shm_toc_estimate_keys(&e, 3); Overall, three keys are used but the comment indicates two. You forgot to mention about error_queue. 4. + if (launched) + ParallelApplyWorkersList = lappend(ParallelApplyWorkersList, winfo); + else + { + shm_mq_detach(winfo->mq_handle); + shm_mq_detach(winfo->error_mq_handle); + dsm_detach(winfo->dsm_seg); + pfree(winfo); + + winfo = NULL; + } A. The code used in the else part to free worker info is the same as what is used in parallel_apply_free_worker. Can we move this to a separate function say parallel_apply_free_worker_info()? B. I think it will be better if you use {} for if branch to make it look consistent with else branch. 5. + * case define a named savepoint, so that we are able to commit/rollback it + * separately later. + */ +void +parallel_apply_subxact_info_add(TransactionId current_xid) I don't see the need of commit in the above message. So, we can slightly modify it to: "... so that we are able to rollback to it separately later." 6. + for (i = list_length(subxactlist) - 1; i >= 0; i--) + { + xid = list_nth_xid(subxactlist, i); ... ... +/* + * Return the TransactionId value contained in the n'th element of the + * specified list. + */ +static inline TransactionId +list_nth_xid(const List *list, int n) +{ + Assert(IsA(list, XidList)); + return lfirst_xid(list_nth_cell(list, n)); +} I am not really sure that we need a new list function to use for this place. Can't we directly use lfirst_xid(list_nth_cell) instead? 7. +void +parallel_apply_replorigin_setup(void) +{ + RepOriginId originid; + char originname[NAMEDATALEN]; + bool started_tx = false; + + /* This function might be called inside or outside of transaction. */ + if (!IsTransactionState()) + { + StartTransactionCommand(); + started_tx = true; + } Is there a place in the patch where this function will be called without having an active transaction state? If so, then this coding is fine but if not, then I suggest keeping an assert for transaction state here. The same thing applies to parallel_apply_replorigin_reset() as well. 8. + * + * If write_abort_lsn is true, send the abort_lsn and abort_time fields, + * otherwise don't. */ void logicalrep_write_stream_abort(StringInfo out, TransactionId xid, - TransactionId subxid) + TransactionId subxid, XLogRecPtr abort_lsn, + TimestampTz abort_time, bool abort_info) In the comment, the name of the variable needs to be updated. 9. +TransactionId stream_xid = InvalidTransactionId; -static TransactionId stream_xid = InvalidTransactionId; ... ... +void +parallel_apply_subxact_info_add(TransactionId current_xid) +{ + if (current_xid != stream_xid && + !list_member_xid(subxactlist, current_xid)) It seems you have changed the scope of stream_xid to use it in parallel_apply_subxact_info_add(). Won't it be better to pass it as a parameter (say top_xid)? 10. --- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c +++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c @@ -20,6 +20,7 @@ #include <sys/time.h> #include "access/xlog.h" +#include "catalog/pg_subscription.h" #include "catalog/pg_type.h" #include "common/connect.h" #include "funcapi.h" @@ -443,9 +444,14 @@ libpqrcv_startstreaming(WalReceiverConn *conn, appendStringInfo(&cmd, "proto_version '%u'", options->proto.logical.proto_version); - if (options->proto.logical.streaming && - PQserverVersion(conn->streamConn) >= 140000) - appendStringInfoString(&cmd, ", streaming 'on'"); + if (options->proto.logical.streaming != SUBSTREAM_OFF) + { + if (PQserverVersion(conn->streamConn) >= 160000 && + options->proto.logical.streaming == SUBSTREAM_PARALLEL) + appendStringInfoString(&cmd, ", streaming 'parallel'"); + else if (PQserverVersion(conn->streamConn) >= 140000) + appendStringInfoString(&cmd, ", streaming 'on'"); + } It doesn't seem like a good idea to expose subscription options here. Can we think of having char *streaming_option instead of the current streaming parameter which is filled by the caller and used here directly? 11. The error message used in pgoutput_startup() seems to be better than the current messages used in that function but it is better to be consistent with other messages. There is a discussion in the email thread [1] on improving those messages, so kindly suggest there. 12. In addition to the above, I have changed/added a few comments in the attached patch. [1] - https://www.postgresql.org/message-id/20220914.111507.13049297635620898.horikyota.ntt%40gmail.com -- With Regards, Amit Kapila.
changed_comments_amit_v29.patch
Description: Binary data