On Wed, 18 Sep 2019 at 12:24, Amit Khandekar <[email protected]> wrote:
> Probably, for now at least, what everyone seems to agree is to take my
> earlier attached patch forward.
>
> I am going to see if I can add a TAP test for the patch, and will add
> the patch into the commitfest soon.
Attached is an updated patch v2. Has a new test scenario added in
contrib/test_decoding/sql/spill test, and some minor code cleanup.
Going to add this into Nov commitfest.
--
Thanks,
-Amit Khandekar
EnterpriseDB Corporation
The Postgres Database Company
diff --git a/contrib/test_decoding/expected/spill.out b/contrib/test_decoding/expected/spill.out
index 10734bd..e1d150b6 100644
--- a/contrib/test_decoding/expected/spill.out
+++ b/contrib/test_decoding/expected/spill.out
@@ -247,6 +247,25 @@ GROUP BY 1 ORDER BY 1;
'serialize-nested-subbig-subbigabort-subbig-3 | 5000 | table public.spill_test: INSERT: data[text]:'serialize-nested-subbig-subbigabort-subbig-3:5001' | table public.spill_test: INSERT: data[text]:'serialize-nested-subbig-subbigabort-subbig-3:10000'
(2 rows)
+-- large number of spilling subxacts. Should not exceed maxAllocatedDescs
+BEGIN;
+DO $$
+BEGIN
+ FOR i IN 1..600 LOOP
+ BEGIN
+ INSERT INTO spill_test select generate_series(1, 5000) ;
+ EXCEPTION
+ when division_by_zero then perform 'dummy';
+ END;
+ END LOOP;
+END $$;
+COMMIT;
+SELECT 1 from pg_logical_slot_get_changes('regression_slot', NULL,NULL) LIMIT 1;
+ ?column?
+----------
+ 1
+(1 row)
+
DROP TABLE spill_test;
SELECT pg_drop_replication_slot('regression_slot');
pg_drop_replication_slot
diff --git a/contrib/test_decoding/sql/spill.sql b/contrib/test_decoding/sql/spill.sql
index e638cac..715d1f9 100644
--- a/contrib/test_decoding/sql/spill.sql
+++ b/contrib/test_decoding/sql/spill.sql
@@ -174,6 +174,21 @@ SELECT (regexp_split_to_array(data, ':'))[4] COLLATE "C", COUNT(*), (array_agg(d
FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL) WHERE data ~ 'INSERT'
GROUP BY 1 ORDER BY 1;
+-- large number of spilling subxacts. Should not exceed maxAllocatedDescs
+BEGIN;
+DO $$
+BEGIN
+ FOR i IN 1..600 LOOP
+ BEGIN
+ INSERT INTO spill_test select generate_series(1, 5000) ;
+ EXCEPTION
+ when division_by_zero then perform 'dummy';
+ END;
+ END LOOP;
+END $$;
+COMMIT;
+SELECT 1 from pg_logical_slot_get_changes('regression_slot', NULL,NULL) LIMIT 1;
+
DROP TABLE spill_test;
SELECT pg_drop_replication_slot('regression_slot');
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index 8ce28ad..1e5a725 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -103,13 +103,21 @@ typedef struct ReorderBufferTupleCidEnt
CommandId combocid; /* just for debugging */
} ReorderBufferTupleCidEnt;
+/* Virtual file descriptor with file offset tracking */
+typedef struct TXNEntryFile
+{
+ File vfd; /* -1 when the file is closed */
+ off_t curOffset; /* offset for next write or read. Reset to 0
+ * when vfd is opened. */
+} TXNEntryFile;
+
/* k-way in-order change iteration support structures */
typedef struct ReorderBufferIterTXNEntry
{
XLogRecPtr lsn;
ReorderBufferChange *change;
ReorderBufferTXN *txn;
- int fd;
+ TXNEntryFile file;
XLogSegNo segno;
} ReorderBufferIterTXNEntry;
@@ -194,7 +202,7 @@ static void ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn);
static void ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
int fd, ReorderBufferChange *change);
static Size ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn,
- int *fd, XLogSegNo *segno);
+ TXNEntryFile *file, XLogSegNo *segno);
static void ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
char *change);
static void ReorderBufferRestoreCleanup(ReorderBuffer *rb, ReorderBufferTXN *txn);
@@ -988,7 +996,7 @@ ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn)
for (off = 0; off < state->nr_txns; off++)
{
- state->entries[off].fd = -1;
+ state->entries[off].file.vfd = -1;
state->entries[off].segno = 0;
}
@@ -1013,7 +1021,7 @@ ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn)
{
/* serialize remaining changes */
ReorderBufferSerializeTXN(rb, txn);
- ReorderBufferRestoreChanges(rb, txn, &state->entries[off].fd,
+ ReorderBufferRestoreChanges(rb, txn, &state->entries[off].file,
&state->entries[off].segno);
}
@@ -1043,7 +1051,7 @@ ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn)
/* serialize remaining changes */
ReorderBufferSerializeTXN(rb, cur_txn);
ReorderBufferRestoreChanges(rb, cur_txn,
- &state->entries[off].fd,
+ &state->entries[off].file,
&state->entries[off].segno);
}
cur_change = dlist_head_element(ReorderBufferChange, node,
@@ -1124,7 +1132,7 @@ ReorderBufferIterTXNNext(ReorderBuffer *rb, ReorderBufferIterTXNState *state)
dlist_delete(&change->node);
dlist_push_tail(&state->old_change, &change->node);
- if (ReorderBufferRestoreChanges(rb, entry->txn, &entry->fd,
+ if (ReorderBufferRestoreChanges(rb, entry->txn, &entry->file,
&state->entries[off].segno))
{
/* successfully restored changes from disk */
@@ -1163,8 +1171,8 @@ ReorderBufferIterTXNFinish(ReorderBuffer *rb,
for (off = 0; off < state->nr_txns; off++)
{
- if (state->entries[off].fd != -1)
- CloseTransientFile(state->entries[off].fd);
+ if (state->entries[off].file.vfd != -1)
+ FileClose(state->entries[off].file.vfd);
}
/* free memory we might have "leaked" in the last *Next call */
@@ -2517,11 +2525,12 @@ ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
*/
static Size
ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn,
- int *fd, XLogSegNo *segno)
+ TXNEntryFile *file, XLogSegNo *segno)
{
Size restored = 0;
XLogSegNo last_segno;
dlist_mutable_iter cleanup_iter;
+ File *fd = &file->vfd;
Assert(txn->first_lsn != InvalidXLogRecPtr);
Assert(txn->final_lsn != InvalidXLogRecPtr);
@@ -2562,7 +2571,11 @@ ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn,
ReorderBufferSerializedPath(path, MyReplicationSlot, txn->xid,
*segno);
- *fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
+ *fd = PathNameOpenFile(path, O_RDONLY | PG_BINARY);
+
+ /* No harm in resetting the offset even in case of failure */
+ file->curOffset = 0;
+
if (*fd < 0 && errno == ENOENT)
{
*fd = -1;
@@ -2582,23 +2595,26 @@ ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn,
* end of this file.
*/
ReorderBufferSerializeReserve(rb, sizeof(ReorderBufferDiskChange));
- pgstat_report_wait_start(WAIT_EVENT_REORDER_BUFFER_READ);
- readBytes = read(*fd, rb->outbuf, sizeof(ReorderBufferDiskChange));
- pgstat_report_wait_end();
+ readBytes = FileRead(file->vfd, rb->outbuf,
+ sizeof(ReorderBufferDiskChange),
+ file->curOffset, WAIT_EVENT_REORDER_BUFFER_READ);
/* eof */
if (readBytes == 0)
{
- CloseTransientFile(*fd);
+ FileClose(*fd);
*fd = -1;
(*segno)++;
continue;
}
- else if (readBytes < 0)
+ if (readBytes < 0)
ereport(ERROR,
(errcode_for_file_access(),
errmsg("could not read from reorderbuffer spill file: %m")));
- else if (readBytes != sizeof(ReorderBufferDiskChange))
+
+ file->curOffset += readBytes;
+
+ if (readBytes != sizeof(ReorderBufferDiskChange))
ereport(ERROR,
(errcode_for_file_access(),
errmsg("could not read from reorderbuffer spill file: read %d instead of %u bytes",
@@ -2611,10 +2627,11 @@ ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn,
sizeof(ReorderBufferDiskChange) + ondisk->size);
ondisk = (ReorderBufferDiskChange *) rb->outbuf;
- pgstat_report_wait_start(WAIT_EVENT_REORDER_BUFFER_READ);
- readBytes = read(*fd, rb->outbuf + sizeof(ReorderBufferDiskChange),
- ondisk->size - sizeof(ReorderBufferDiskChange));
- pgstat_report_wait_end();
+ readBytes = FileRead(file->vfd,
+ rb->outbuf + sizeof(ReorderBufferDiskChange),
+ ondisk->size - sizeof(ReorderBufferDiskChange),
+ file->curOffset,
+ WAIT_EVENT_REORDER_BUFFER_READ);
if (readBytes < 0)
ereport(ERROR,
@@ -2627,6 +2644,8 @@ ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn,
readBytes,
(uint32) (ondisk->size - sizeof(ReorderBufferDiskChange)))));
+ file->curOffset += readBytes;
+
/*
* ok, read a full change from disk, now restore it into proper
* in-memory format