Dear Wang,

Followings are comments for your patchset.

====
0001


01. launcher.c - logicalrep_worker_stop_internal()

```
+
+       Assert(LWLockHeldByMe(LogicalRepWorkerLock));
+
```

I think it should be Assert(LWLockHeldByMeInMode(LogicalRepWorkerLock, 
LW_SHARED))
because the lock is released one and acquired again as LW_SHARED.
If newer function has been acquired lock as LW_EXCLUSIVE and call 
logicalrep_worker_stop_internal(),
its lock may become weaker after calling it.

02. launcher.c - apply_handle_stream_start()

```
+                       /*
+                        * Notify handle methods we're processing a remote 
in-progress
+                        * transaction.
+                        */
+                       in_streamed_transaction = true;
 
-               MyLogicalRepWorker->stream_fileset = palloc(sizeof(FileSet));
-               FileSetInit(MyLogicalRepWorker->stream_fileset);
+                       /*
+                        * Start a transaction on stream start, this 
transaction will be
+                        * committed on the stream stop unless it is a 
tablesync worker in
+                        * which case it will be committed after processing all 
the
+                        * messages. We need the transaction for handling the 
buffile,
+                        * used for serializing the streaming data and subxact 
info.
+                        */
+                       begin_replication_step();
```

Previously in_streamed_transaction was set after the begin_replication_step(),
but the ordering is modified. Maybe we don't have to modify it if there is no 
particular reason.

03. launcher.c - apply_handle_stream_stop()

```
+                       /* Commit the per-stream transaction */
+                       CommitTransactionCommand();
+
+                       /* Reset per-stream context */
+                       MemoryContextReset(LogicalStreamingContext);
+
+                       pgstat_report_activity(STATE_IDLE, NULL);
+
+                       in_streamed_transaction = false;
```

Previously in_streamed_transaction was set after the MemoryContextReset(), but 
the ordering is modified.
Maybe we don't have to modify it if there is no particular reason.

04. applyparallelworker.c - LogicalParallelApplyLoop()

```
+               shmq_res = shm_mq_receive(mqh, &len, &data, false);
...
+               if (ConfigReloadPending)
+               {
+                       ConfigReloadPending = false;
+                       ProcessConfigFile(PGC_SIGHUP);
+               }
```


Here the parallel apply worker waits to receive messages and after dispatching 
it ProcessConfigFile() is called.
It means that .conf will be not read until the parallel apply worker receives 
new messages and apply them.

It may be problematic when users change log_min_message to debugXXX for 
debugging but the streamed transaction rarely come.
They expected that detailed description appears on the log from next streaming 
chunk, but it does not.

This does not occur in leader worker when it waits messages from publisher, 
because it uses libpqrcv_receive(), which works asynchronously.

I 'm not sure whether it should be documented that the evaluation of GUCs may 
be delayed, how do you think?

===
0004

05. logical-replication.sgml

```
...
In that case, it may be necessary to change the streaming mode to on or off and 
cause
the same conflicts again so the finish LSN of the failed transaction will be 
written to the server log.
 ...
```

Above sentence is added by 0001, but it is not modified by 0004.
Such transactions will be retried as streaming=on mode, so some descriptions 
related with it should be added.


Best Regards,
Hayato Kuroda
FUJITSU LIMITED

Reply via email to