Dear Hou-san,

Thank you for updating the patch! Followings are comments for v28-0001.
I will dig your patch more, but I send partially to keep the activity of the 
thread.

===
For applyparallelworker.c

01. filename
The word-ordering of filename seems not good
because you defined the new worker as "parallel apply worker".

02. global variable

```
+/* Parallel apply workers hash table (initialized on first use). */
+static HTAB *ParallelApplyWorkersHash = NULL;
+
+/*
+ * List that stores the information of parallel apply workers that were
+ * started. Newly added worker information will be removed from the list at the
+ * end of the transaction when there are enough workers in the pool. Besides,
+ * exited workers will be removed from the list after being detected.
+ */
+static List *ParallelApplyWorkersList = NIL;
```

Could you add descriptions about difference between the list and hash table?
IIUC the Hash stores the parallel workers that
are assigned to transacitons, and the list stores all alive ones.


03. parallel_apply_find_worker

```
+       /* Return the cached parallel apply worker if valid. */
+       if (stream_apply_worker != NULL)
+               return stream_apply_worker;
```

This is just a question -
Why the given xid and the assigned xid to the worker are not checked here?
Is there chance to find wrong worker? 


04. parallel_apply_start_worker

```
+/*
+ * Start a parallel apply worker that will be used for the specified xid.
+ *
+ * If a parallel apply worker is not in use then re-use it, otherwise start a
+ * fresh one. Cache the worker information in ParallelApplyWorkersHash keyed by
+ * the specified xid.
+ */
+void
+parallel_apply_start_worker(TransactionId xid)
```

"parallel_apply_start_worker" should be "start_parallel_apply_worker", I think


05. parallel_apply_stream_abort

```
                for (i = list_length(subxactlist) - 1; i >= 0; i--)
                {
                        xid = list_nth_xid(subxactlist, i);
                        if (xid == subxid)
                        {
                                found = true;
                                break;
                        }
                }
```

Please not reuse the xid, declare and use another variable in the else block or 
something.

06. parallel_apply_free_worker

```
+       if (napplyworkers > (max_parallel_apply_workers_per_subscription / 2))
+       {
```

Please add a comment like: "Do we have enough workers in the pool?" or 
something.

===
For worker.c

07. general

In many lines if-else statement is used for apply_action, but I think they 
should rewrite as switch-case statement.

08. global variable

```
-static bool in_streamed_transaction = false;
+bool in_streamed_transaction = false;
```

a.

It seems that in_streamed_transaction is used only in the worker.c, so we can 
change to stati variable.

b.

That flag is set only when an apply worker spill the transaction to the disk.
How about "in_streamed_transaction" -> "in_spilled_transaction"?

09.  apply_handle_stream_prepare

```
-       elog(DEBUG1, "received prepare for streamed transaction %u", 
prepare_data.xid);
```

I think this debug message is still useful.

10. apply_handle_stream_stop

```
+       if (apply_action == TA_APPLY_IN_PARALLEL_WORKER)
+       {
+               pgstat_report_activity(STATE_IDLEINTRANSACTION, NULL);
+       }
+       else if (apply_action == TA_SEND_TO_PARALLEL_WORKER)
+       {
```

The ordering of the STREAM {STOP, START} is checked only when an apply worker 
spill the transaction to the disk.
(This is done via in_streamed_transaction)
I think checks should be added here, like if (!stream_apply_worker) or 
something.

11. apply_handle_stream_abort

```
+       if (in_streamed_transaction)
+               ereport(ERROR,
+                               (errcode(ERRCODE_PROTOCOL_VIOLATION),
+                                errmsg_internal("STREAM ABORT message without 
STREAM STOP")));
```

I think the check by stream_apply_worker should be added.

12. apply_handle_stream_commit

a.

```
        if (in_streamed_transaction)
                ereport(ERROR,
                                (errcode(ERRCODE_PROTOCOL_VIOLATION),
                                 errmsg_internal("STREAM COMMIT message without 
STREAM STOP")));
```

I think the check by stream_apply_worker should be added.

b. 

```
-       elog(DEBUG1, "received commit for streamed transaction %u", xid);
```

I think this debug message is still useful.

===
For launcher.c

13. logicalrep_worker_stop_by_slot

```
+       LogicalRepWorker *worker = &LogicalRepCtx->workers[slot_no];
+
+       LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
+
+       /* Return if the generation doesn't match or the worker is not alive. */
+       if (worker->generation != generation ||
+               worker->proc == NULL)
+               return;
+
```

a.

LWLockAcquire(LogicalRepWorkerLock) is needed before reading slots.

b. 

LWLockRelease(LogicalRepWorkerLock) is needed even if worker is not found.



Best Regards,
Hayato Kuroda
FUJITSU LIMITED

Reply via email to