Hi Hou-San.
Some review comments for v19-0001 and v19-0002
//////////
v19-0001
======
.../replication/logical/applyparallelworker.c
1.
+/* An entry in the parallelized_txns shared hash table */
+typedef struct ParallelizedTxnEntry
+{
+ TransactionId xid; /* Hash key, remote transaction ID */
+} ParallelizedTxnEntry;
+
/*
* A hash table used to cache the state of streaming transactions being applied
- * by the parallel apply workers.
+ * by the parallel apply workers. Entries are of type ParallelApplyWorkerEntry.
*/
static HTAB *ParallelApplyTxnHash = NULL;
+/*
+ * A hash table used to track the parallelized remote transactions
that could be
+ * depended on by other transactions. Entries are of type ParallelizedTxnEntry.
+ *
+ * dshash is used to enable dynamic shared memory allocation based on
the number
+ * of transactions being applied in parallel.
+ */
+static dsa_area *parallel_apply_dsa_area = NULL;
+static dshash_table *parallelized_txns = NULL;
+
+/* parameters for the parallelized_txns shared hash table */
+static const dshash_parameters dsh_params = {
+ sizeof(TransactionId),
+ sizeof(ParallelizedTxnEntry),
+ dshash_memcmp,
+ dshash_memhash,
+ dshash_memcpy,
+ LWTRANCHE_PARALLEL_APPLY_DSA
+};
+
1a.
Maybe that ParallelizedTxnEntry should be moved to just immediately
above 'dshash_parameters' because it seems to belong with that, and
currently it is splitting the ParallelApplyWorkerEntry from the
ParallelApplyTxnHash.
~
1b.
/parameters for/Parameters for/
~~~
pa_attach_parallelized_txn_hash:
2.
+ MemoryContext oldctx;
+
+ if (parallelized_txns)
+ {
+ Assert(parallel_apply_dsa_area);
+ *pa_dsa_handle = dsa_get_handle(parallel_apply_dsa_area);
+ *pa_dshash_handle = dshash_get_hash_table_handle(parallelized_txns);
+ return;
+ }
+
+ /* Be sure any local memory allocated by DSA routines is persistent. */
+ oldctx = MemoryContextSwitchTo(ApplyContext);
+
+ if (am_leader_apply_worker())
+ {
+ /* Initialize dynamic shared hash table for parallelized transactions */
+ parallel_apply_dsa_area = dsa_create(LWTRANCHE_PARALLEL_APPLY_DSA);
+ dsa_pin(parallel_apply_dsa_area);
+ dsa_pin_mapping(parallel_apply_dsa_area);
+ parallelized_txns = dshash_create(parallel_apply_dsa_area, &dsh_params, NULL);
+
+ /* Store handles in shared memory for other backends to use. */
+ *pa_dsa_handle = dsa_get_handle(parallel_apply_dsa_area);
+ *pa_dshash_handle = dshash_get_hash_table_handle(parallelized_txns);
+ }
+ else if (am_parallel_apply_worker())
+ {
+ /* Attach to existing dynamic shared hash table. */
+ parallel_apply_dsa_area =
dsa_attach(MyParallelShared->parallel_apply_dsa_handle);
+ dsa_pin_mapping(parallel_apply_dsa_area);
+ parallelized_txns = dshash_attach(parallel_apply_dsa_area, &dsh_params,
+ MyParallelShared->parallelized_txns_handle,
+ NULL);
+ }
+
+ MemoryContextSwitchTo(oldctx);
2a.
This might be easier to read if rearranged to use if/else instead of
having the early return.
SUGGESTION
if (parallelized_txns)
{
/* Hashtable is already available */
...
}
else
{
/* Create or attach... */
MemoryContext oldctx = ...
if (am_leader_apply_worker())
{
/* Create... */
...
}
else (am_parallel_apply_worker())
{
/* Attach... */
...
}
MemoryContextSwitchTo(oldctx);
}
~~~
2b.
Can it be anything other than
am_leader_apply_worker/am_parallel_apply_worker here? Should there be
an Assert?
~~~
2c.
Since the `dsh_params` are already set up, shouldn't this code be using them?
BEFORE
dsa_create(LWTRANCHE_PARALLEL_APPLY_DSA);
SUGGESTION
dsa_create(dsh_params.tranche_id);
//////////
v19-0002
======
.../replication/logical/applyparallelworker.
1.
+void
+pa_wait_for_depended_transaction(TransactionId xid)
+{
+ ParallelizedTxnEntry *txn_entry;
The declaration of `txn_entry` can be done later within the loop where
it is used.
======
Kind Regards,
Peter Smith.
Fujitsu Australia