On Thu, Nov 3, 2022 at 6:36 PM [email protected]
<[email protected]> wrote:
>
> Thanks for the analysis and summary !
>
> I tried to implement the above idea and here is the patch set.
>
Few comments on v42-0001
===========================
1.
+ /*
+ * Set the xact_state flag in the leader instead of the
+ * parallel apply worker to avoid the race condition where the leader has
+ * already started waiting for the parallel apply worker to finish
+ * processing the transaction while the child process has not yet
+ * processed the first STREAM_START and has not set the
+ * xact_state to true.
+ */
+ SpinLockAcquire(&winfo->shared->mutex);
+ winfo->shared->xact_state = PARALLEL_TRANS_UNKNOWN;
The comments and code for xact_state doesn't seem to match.
2.
+ * progress. This could happend as we don't wait for transaction rollback
+ * to finish.
+ */
/happend/happen
3.
+/* Helper function to release a lock with lockid */
+void
+parallel_apply_lock(uint16 lockid)
...
...
+/* Helper function to take a lock with lockid */
+void
+parallel_apply_unlock(uint16 lockid)
Here, the comments seems to be reversed.
4.
+parallel_apply_lock(uint16 lockid)
+{
+ MemoryContext oldcontext;
+
+ if (list_member_int(ParallelApplyLockids, lockid))
+ return;
+
+ LockSharedObjectForSession(SubscriptionRelationId, MySubscription->oid,
+ lockid, am_leader_apply_worker() ?
+ AccessExclusiveLock:
+ AccessShareLock);
This appears odd to me because this forecloses the option the parallel
apply worker can ever acquire this lock in exclusive mode. I think it
would be better to have lock_mode as one of the parameters in this
API.
5.
+ * Inintialize fileset if not yet and open the file.
+ */
+void
+serialize_stream_start(TransactionId xid, bool first_segment)
Typo. /Inintialize/Initialize
6.
parallel_apply_setup_dsm()
{
...
+ shared->xact_state = false;
xact_state should be set with one of the values of ParallelTransState.
7.
/*
+ * Don't use SharedFileSet here because the fileset is shared by the leader
+ * worker and the fileset in leader need to survive after releasing the
+ * shared memory
This comment seems a bit unclear to me. Should there be and between
leader worker? If so, then the following 'and' won't make sense.
8.
+apply_handle_stream_stop(StringInfo s)
{
...
+ case TRANS_PARALLEL_APPLY:
+
+ /*
+ * If there is no message left, wait for the leader to release the
+ * lock and send more messages.
+ */
+ if (pg_atomic_sub_fetch_u32(&(MyParallelShared->left_message), 1) == 0)
+ parallel_apply_lock(MyParallelShared->stream_lock_id);
As per Sawada-San's email [1], this lock should be released
immediately after we acquire it. If we do so, then we don't need to
unlock separately in apply_handle_stream_start() in the below code and
at similar places in stream_prepare, stream_commit, and stream_abort.
Is there a reason for doing it differently?
apply_handle_stream_start(StringInfo s)
{
...
+ case TRANS_PARALLEL_APPLY:
...
+ /*
+ * Unlock the shared object lock so that the leader apply worker
+ * can continue to send changes.
+ */
+ parallel_apply_unlock(MyParallelShared->stream_lock_id);
9.
+parallel_apply_spooled_messages(void)
{
...
+ if (fileset_valid)
+ {
+ in_streamed_transaction = false;
+
+ parallel_apply_lock(MyParallelShared->transaction_lock_id);
Is there a reason to acquire this lock here if the parallel apply
worker will acquire it at stream_start?
10.
+ winfo->shared->stream_lock_id = parallel_apply_get_unique_id();
+ winfo->shared->transaction_lock_id = parallel_apply_get_unique_id();
Why can't we use xid (remote_xid) for one of these and local_xid (one
generated by parallel apply) for the other? I was a bit worried about
the local_xid because it will be generated only after applying the
first message but the patch already seems to be waiting for it in
parallel_apply_wait_for_xact_finish as seen in the below code.
+void
+parallel_apply_wait_for_xact_finish(ParallelApplyWorkerShared *wshared)
+{
+ /*
+ * Wait until the parallel apply worker handles the first message and
+ * set the flag to true.
+ */
+ parallel_apply_wait_for_in_xact(wshared, PARALLEL_TRANS_STARTED);
+
+ /* Wait for the transaction lock to be released. */
+ parallel_apply_lock(wshared->transaction_lock_id);
[1] -
https://www.postgresql.org/message-id/CAD21AoCWovvhGBD2uKcQqbk6px6apswuBrs6dR9%2BWhP1j2LdsQ%40mail.gmail.com
--
With Regards,
Amit Kapila.