Hi all, I'm trying to execute a PROCEDURE (with COMMIT inside) called from a background worker using SPI but I'm always getting the error below:
2021-09-13 09:36:43.568 -03 [23845] LOG: worker_spi worker 2 initialized with schema2.counted 2021-09-13 09:36:43.568 -03 [23846] LOG: worker_spi worker 1 initialized with schema1.counted 2021-09-13 09:36:43.571 -03 [23846] ERROR: invalid transaction termination 2021-09-13 09:36:43.571 -03 [23846] CONTEXT: PL/pgSQL function schema1.counted_proc() line 1 at COMMIT SQL statement "CALL "schema1"."counted_proc"()" 2021-09-13 09:36:43.571 -03 [23846] STATEMENT: CALL "schema1"."counted_proc"() 2021-09-13 09:36:43.571 -03 [23845] ERROR: invalid transaction termination 2021-09-13 09:36:43.571 -03 [23845] CONTEXT: PL/pgSQL function schema2.counted_proc() line 1 at COMMIT SQL statement "CALL "schema2"."counted_proc"()" 2021-09-13 09:36:43.571 -03 [23845] STATEMENT: CALL "schema2"."counted_proc"() 2021-09-13 09:36:43.571 -03 [23838] LOG: background worker "worker_spi" (PID 23845) exited with exit code 1 2021-09-13 09:36:43.571 -03 [23838] LOG: background worker "worker_spi" (PID 23846) exited with exit code 1 I changed the worker_spi example (attached) a bit to execute a simple procedure. Even using SPI_connect_ext(SPI_OPT_NONATOMIC) I'm getting the error "invalid transaction termination". There are something wrong with the attached example or am I missing something? Regards, -- Fabrízio de Royes Mello
diff --git a/src/test/modules/worker_spi/worker_spi.c b/src/test/modules/worker_spi/worker_spi.c index d0acef2652..0b9e4e9335 100644 --- a/src/test/modules/worker_spi/worker_spi.c +++ b/src/test/modules/worker_spi/worker_spi.c @@ -108,8 +108,20 @@ initialize_worker_spi(worktable *table) " type text CHECK (type IN ('total', 'delta')), " " value integer)" "CREATE UNIQUE INDEX \"%s_unique_total\" ON \"%s\" (type) " - "WHERE type = 'total'", - table->schema, table->name, table->name, table->name); + "WHERE type = 'total'; " + "CREATE PROCEDURE \"%s\".\"%s_proc\"() AS $$ " + "DECLARE " + " i INTEGER; " + "BEGIN " + " FOR i IN 1..10 " + " LOOP " + " INSERT INTO \"%s\".\"%s\" VALUES ('delta', i); " + " COMMIT; " + " END LOOP; " + "END; " + "$$ LANGUAGE plpgsql; ", + table->schema, table->name, table->name, table->name, + table->schema, table->name, table->schema, table->name); /* set statement start time */ SetCurrentStatementStartTimestamp(); @@ -168,20 +180,8 @@ worker_spi_main(Datum main_arg) table->name = quote_identifier(table->name); initStringInfo(&buf); - appendStringInfo(&buf, - "WITH deleted AS (DELETE " - "FROM %s.%s " - "WHERE type = 'delta' RETURNING value), " - "total AS (SELECT coalesce(sum(value), 0) as sum " - "FROM deleted) " - "UPDATE %s.%s " - "SET value = %s.value + total.sum " - "FROM total WHERE type = 'total' " - "RETURNING %s.value", - table->schema, table->name, - table->schema, table->name, - table->name, - table->name); + + appendStringInfo(&buf, "CALL \"%s\".\"%s_proc\"()", table->schema, table->name); /* * Main loop: do this until SIGTERM is received and processed by @@ -232,7 +232,7 @@ worker_spi_main(Datum main_arg) */ SetCurrentStatementStartTimestamp(); StartTransactionCommand(); - SPI_connect(); + SPI_connect_ext(SPI_OPT_NONATOMIC); PushActiveSnapshot(GetTransactionSnapshot()); debug_query_string = buf.data; pgstat_report_activity(STATE_RUNNING, buf.data); @@ -240,30 +240,15 @@ worker_spi_main(Datum main_arg) /* We can now execute queries via SPI */ ret = SPI_execute(buf.data, false, 0); - if (ret != SPI_OK_UPDATE_RETURNING) - elog(FATAL, "cannot select from table %s.%s: error code %d", - table->schema, table->name, ret); - - if (SPI_processed > 0) - { - bool isnull; - int32 val; - - val = DatumGetInt32(SPI_getbinval(SPI_tuptable->vals[0], - SPI_tuptable->tupdesc, - 1, &isnull)); - if (!isnull) - elog(LOG, "%s: count in %s.%s is now %d", - MyBgworkerEntry->bgw_name, - table->schema, table->name, val); - } + if (ret != SPI_OK_UTILITY) + elog(FATAL, "failed to call procedure"); /* * And finish our transaction. */ - SPI_finish(); PopActiveSnapshot(); CommitTransactionCommand(); + SPI_finish(); debug_query_string = NULL; pgstat_report_stat(false); pgstat_report_activity(STATE_IDLE, NULL);