Dear Hou,
Thanks for updating the patch! Followings are my comments.
===
01. applyparallelworker.c - SIZE_STATS_MESSAGE
```
/*
* There are three fields in each message received by the parallel apply
* worker: start_lsn, end_lsn and send_time. Because we have updated these
* statistics in the leader apply worker, we can ignore these fields in the
* parallel apply worker (see function LogicalRepApplyLoop).
*/
#define SIZE_STATS_MESSAGE (2 * sizeof(XLogRecPtr) + sizeof(TimestampTz))
```
According to other comment styles, it seems that the first sentence of the
comment should
represent the datatype and usage, not the detailed reason.
For example, about ParallelApplyWorkersList, you said "A list ...". How about
adding like following message:
The message size that can be skipped by parallel apply worker
~~~
02. applyparallelworker.c - parallel_apply_start_subtrans
```
if (current_xid != top_xid &&
!list_member_xid(subxactlist, current_xid))
```
A macro TransactionIdEquals is defined in access/transam.h. Should we use it,
or is it too trivial?
~~~
03. applyparallelwprker.c - LogicalParallelApplyLoop
```
case SHM_MQ_WOULD_BLOCK:
{
int rc;
if (!in_streamed_transaction)
{
/*
* If we didn't get any
transactions for a while there might be
* unconsumed invalidation
messages in the queue, consume them
* now.
*/
AcceptInvalidationMessages();
maybe_reread_subscription();
}
MemoryContextReset(ApplyMessageContext);
```
Is MemoryContextReset() needed? IIUC no one uses ApplyMessageContext if we
reach here.
~~~
04. applyparallelwprker.c - HandleParallelApplyMessages
```
else if (res == SHM_MQ_SUCCESS)
{
StringInfoData msg;
initStringInfo(&msg);
appendBinaryStringInfo(&msg, data, nbytes);
HandleParallelApplyMessage(winfo, &msg);
pfree(msg.data);
}
```
In LogicalParallelApplyLoop(), appendBinaryStringInfo() is not used
but initialized StringInfoData directly initialized. Why there is a difrerence?
The function will do repalloc() and memcpy(), so it may be inefficient.
~~~
05. applyparallelwprker.c - parallel_apply_send_data
```
if (result != SHM_MQ_SUCCESS)
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("could not send data to shared-memory
queue")));
```
I checked the enumeration of shm_mq_result, and I felt that shm_mq_send(nowait
= false) failed
only when the opposite process has been exited.
How about add a hint or detailed message like "lost connection to parallel
apply worker"?
===
06. worker.c - nchanges
```
/*
* The number of changes sent to parallel apply workers during one streaming
* block.
*/
static uint32 nchanges = 0;
```
I found that the name "nchanges" has been already used in
apply_spooled_messages().
It works well because the local variable is always used
when name collision between local and global variables is occurred, but I think
it may be confused.
~~~
07. worker.c - apply_handle_commit_internal
I think we can add an assertion like Assert(replorigin_session_origin_lsn !=
InvalidXLogRecPtr && replorigin_session_origin = InvalidRepOriginId),
to avoid missing replorigin_session_setup. Previously it was set at the entry
point at never reset.
~~~
08. worker.c - apply_handle_prepare_internal
Same as above.
~~~
09. worker.c - maybe_reread_subscription
```
/*
* Exit if any parameter that affects the remote connection was changed.
* The launcher will start a new worker.
*/
if (strcmp(newsub->conninfo, MySubscription->conninfo) != 0 ||
strcmp(newsub->name, MySubscription->name) != 0 ||
strcmp(newsub->slotname, MySubscription->slotname) != 0 ||
newsub->binary != MySubscription->binary ||
newsub->stream != MySubscription->stream ||
strcmp(newsub->origin, MySubscription->origin) != 0 ||
newsub->owner != MySubscription->owner ||
!equal(newsub->publications, MySubscription->publications))
{
ereport(LOG,
(errmsg("logical replication apply worker for
subscription \"%s\" will restart because of a parameter change",
MySubscription->name)));
proc_exit(0);
}
```
When the parallel apply worker has been launched and then the subscription
option has been modified,
the same message will appear twice.
But if the option "streaming" is changed from "parallel" to "on", one of them
will not restart again.
Should we modify message?
===
10. general
IIUC parallel apply workers could not detect the deadlock automatically, right?
I thought we might be able to use the heartbeat protocol between a leader
worker and parallel workers.
You have already implemented a mechanism to send and receive messages between
workers.
My idea is that each parallel apply worker records a timestamp that gets a
message from the leader
and if a certain time (30s?) has passed it sends a heartbeat message like 'H'.
The leader consumes 'H' and sends a reply like LOGICAL_REP_MSG_KEEPALIVE in
HandleParallelApplyMessage().
If the parallel apply worker does not receive any message for more than one
minute,
it regards that the deadlock has occurred and can change the retry flag to on
and exit.
The above assumes that the leader cannot reply to the message while waiting for
the lock.
Moreover, it may have notable overhead and we must use a new logical
replication message type.
How do you think? Have you already considered about it?
Best Regards,
Hayato Kuroda
FUJITSU LIMITED