On Mon, Nov 2, 2020 at 4:11 PM Amit Kapila <amit.kapil...@gmail.com> wrote: >
Few Comments on v15-0003-Support-2PC-txn-pgoutput =============================================== 1. This patch needs to be rebased after commit 644f0d7cc9 and requires some adjustments accordingly. 2. if (flags != 0) elog(ERROR, "unrecognized flags %u in commit message", flags); + /* read fields */ commit_data->commit_lsn = pq_getmsgint64(in); Spurious line. 3. @@ -720,6 +722,7 @@ apply_handle_commit(StringInfo s) replorigin_session_origin_timestamp = commit_data.committime; CommitTransactionCommand(); + pgstat_report_stat(false); Spurious line 4. +static void +apply_handle_prepare_txn(LogicalRepPrepareData * prepare_data) +{ + Assert(prepare_data->prepare_lsn == remote_final_lsn); + + /* The synchronization worker runs in single transaction. */ + if (IsTransactionState() && !am_tablesync_worker()) + { + /* End the earlier transaction and start a new one */ + BeginTransactionBlock(); + CommitTransactionCommand(); + StartTransactionCommand(); There is no explanation as to why you want to end the previous transaction and start a new one. Even if we have to do so, we first need to call BeginTransactionBlock before CommitTransactionCommand. 5. - * Handle STREAM COMMIT message. + * Common spoolfile processing. + * Returns how many changes were applied. */ -static void -apply_handle_stream_commit(StringInfo s) +static int +apply_spooled_messages(TransactionId xid, XLogRecPtr lsn) { - TransactionId xid; Can we have a separate patch for this as this can be committed before main patch. This is a refactoring required for the main patch. 6. @@ -57,7 +63,8 @@ static void pgoutput_stream_abort(struct LogicalDecodingContext *ctx, static void pgoutput_stream_commit(struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn); - +static void pgoutput_stream_prepare_txn(LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, XLogRecPtr prepare_lsn); Spurious line removal. -- With Regards, Amit Kapila.