Hi, One of the issues I'm fairly regularly reminded by users/customers is that inserting into tables sharded using FDWs are rather slow. We do even get it reported on pgsql-bugs from time to time [1].
Some of the slowness / overhead is expected, doe to the latency between machines in the sharded setup. Even just 1ms latency will make it way more expensive than a single instance. But let's do a simple experiment, comparing a hash-partitioned regular partitions, and one with FDW partitions in the same instance. Scripts to run this are attached. The duration of inserting 1M rows to this table (average of 10 runs on my laptop) looks like this: regular: 2872 ms FDW: 64454 ms Yep, it's ~20x slower. On setup with ping latency well below 0.05ms. Imagine how would it look on sharded setups with 0.1ms or 1ms latency, which is probably where most single-DC clusters are :-( Now, the primary reason why the performance degrades like this is that while FDW has batching for SELECT queries (i.e. we read larger chunks of data from the cursors), we don't have that for INSERTs (or other DML). Every time you insert a row, it has to go all the way down into the partition synchronously. For some use cases this may be reduced by having many independent connnections from different users, so the per-user latency is higher but acceptable. But if you need to import larger amounts of data (say, a CSV file for analytics, ...) this may not work. Some time ago I wrote an ugly PoC adding batching, just to see how far would it get us, and it seems quite promising - results for he same INSERT benchmarks look like this: FDW batching: 4584 ms So, rather nice improvement, I'd say ... Before I spend more time hacking on this, I have a couple open questions about the design, restrictions etc. 1) Extend the FDW API? In the patch, the batching is simply "injected" into the existing insert API method, i.e. ExecForeignInsert et al. I wonder if it'd be better to extend the API with a "batched" version of the method, so that we can easily determine whether the FDW supports batching or not - it would require changes in the callers, though. OTOH it might be useful for COPY, where we could do something similar to multi_insert (COPY already benefits from this patch, but it does not use the batching built-into COPY). 2) What about the insert results? I'm not sure what to do about "result" status for the inserted rows. We only really "stash" the rows into a buffer, so we don't know if it will succeed or not. The patch simply assumes it will succeed, but that's clearly wrong, and it may result in reporting a wrong number or rows. The patch also disables the batching when the insert has a RETURNING clause, because there's just a single slot (for the currently inserted row). I suppose a "batching" method would take an array of slots. 3) What about the other DML operations (DELETE/UPDATE)? The other DML operations could probably benefit from the batching too. INSERT was good enough for a PoC, but having batching only for INSERT seems somewhat asmymetric. DELETE/UPDATE seem more complicated because of quals, but likely doable. 3) Should we do batching for COPY insteads? While looking at multi_insert, I've realized it's mostly exactly what the new "batching insert" API function would need to be. But it's only really used in COPY, so I wonder if we should just abandon the idea of batching INSERTs and do batching COPY for FDW tables. For cases that can replace INSERT with COPY this would be enough, but unfortunately it does nothing for DELETE/UPDATE so I'm hesitant to do this :-( 4) Expected consistency? I'm not entirely sure what are the consistency expectations for FDWs. Currently the FDW nodes pointing to the same server share a connection, so the inserted rows might be visible to other nodes. But if we only stash the rows in a local buffer for a while, that's no longer true. So maybe this breaks the consistency expectations? But maybe that's OK - I'm not sure how the prepared statements/cursors affect this. I can imagine restricting the batching only to plans where this is not an issue (single FDW node or something), but it seems rather fragile and undesirable. I was thinking about adding a GUC to enable/disable the batching at some level (global, server, table, ...) but it seems like a bad match because the consistency expectations likely depend on a query. There should be a GUC to set the batch size, though (it's hardcoded to 100 for now). regards [1] https://www.postgresql.org/message-id/CACnz%2BQ1q0%2B2KoJam9LyNMk8JmdC6qYHXWB895Wu2xcpoip18xQ%40mail.gmail.com -- Tomas Vondra http://www.2ndQuadrant.com PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services
fdw.sql
Description: application/sql
local.sql
Description: application/sql
>From df2cf502909886fbfc86f93f36b2daba03f785e4 Mon Sep 17 00:00:00 2001 From: Tomas Vondra <t...@fuzzy.cz> Date: Sun, 28 Jun 2020 14:31:18 +0200 Subject: [PATCH] patch --- contrib/postgres_fdw/deparse.c | 73 ++++++++ contrib/postgres_fdw/postgres_fdw.c | 261 ++++++++++++++++++++++++---- contrib/postgres_fdw/postgres_fdw.h | 5 + 3 files changed, 305 insertions(+), 34 deletions(-) diff --git a/contrib/postgres_fdw/deparse.c b/contrib/postgres_fdw/deparse.c index ad37a74221..374d2f5dbb 100644 --- a/contrib/postgres_fdw/deparse.c +++ b/contrib/postgres_fdw/deparse.c @@ -1758,6 +1758,79 @@ deparseInsertSql(StringInfo buf, RangeTblEntry *rte, withCheckOptionList, returningList, retrieved_attrs); } +/* + * deparse remote batch INSERT statement + * + * The statement text is appended to buf, and we also create an integer List + * of the columns being retrieved by WITH CHECK OPTION or RETURNING (if any), + * which is returned to *retrieved_attrs. + */ +void +deparseBatchInsertSql(StringInfo buf, RangeTblEntry *rte, + Index rtindex, Relation rel, + List *targetAttrs, bool doNothing, + List *withCheckOptionList, List *returningList, + List **retrieved_attrs, int batchSize) +{ + AttrNumber pindex; + bool first; + ListCell *lc; + int i; + + appendStringInfoString(buf, "INSERT INTO "); + deparseRelation(buf, rel); + + if (targetAttrs) + { + appendStringInfoChar(buf, '('); + + first = true; + foreach(lc, targetAttrs) + { + int attnum = lfirst_int(lc); + + if (!first) + appendStringInfoString(buf, ", "); + first = false; + + deparseColumnRef(buf, rtindex, attnum, rte, false); + } + + appendStringInfoString(buf, ") VALUES "); + + pindex = 1; + for (i = 0; i < batchSize; i++) + { + if (i > 0) + appendStringInfoString(buf, ", "); + + appendStringInfoString(buf, "("); + + first = true; + foreach(lc, targetAttrs) + { + if (!first) + appendStringInfoString(buf, ", "); + first = false; + + appendStringInfo(buf, "$%d", pindex); + pindex++; + } + + appendStringInfoChar(buf, ')'); + } + } + else + appendStringInfoString(buf, " DEFAULT VALUES"); + + if (doNothing) + appendStringInfoString(buf, " ON CONFLICT DO NOTHING"); + + deparseReturningList(buf, rte, rtindex, rel, + rel->trigdesc && rel->trigdesc->trig_insert_after_row, + withCheckOptionList, returningList, retrieved_attrs); +} + /* * deparse remote UPDATE statement * diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c index 9fc53cad68..17421f6b65 100644 --- a/contrib/postgres_fdw/postgres_fdw.c +++ b/contrib/postgres_fdw/postgres_fdw.c @@ -56,6 +56,8 @@ PG_MODULE_MAGIC; /* If no remote estimates, assume a sort costs 20% extra */ #define DEFAULT_FDW_SORT_MULTIPLIER 1.2 +#define BATCH_SIZE 100 + /* * Indexes of FDW-private information stored in fdw_private lists. * @@ -93,6 +95,8 @@ enum FdwModifyPrivateIndex { /* SQL statement to execute remotely (as a String node) */ FdwModifyPrivateUpdateSql, + /* SQL statement to execute remotely (as a String node) */ + FdwModifyPrivateUpdateBatchSql, /* Integer list of target attribute numbers for INSERT/UPDATE */ FdwModifyPrivateTargetAttnums, /* has-returning flag (as an integer Value node) */ @@ -172,9 +176,11 @@ typedef struct PgFdwModifyState /* for remote query execution */ PGconn *conn; /* connection for the scan */ char *p_name; /* name of prepared statement, if created */ + char *p_name_batch; /* name of prepared batch statement, if created */ /* extracted fdw_private data */ char *query; /* text of INSERT/UPDATE/DELETE command */ + char *batch_query; /* text of INSERT/UPDATE/DELETE command */ List *target_attrs; /* list of target attribute numbers */ bool has_returning; /* is there a RETURNING clause? */ List *retrieved_attrs; /* attr numbers retrieved by RETURNING */ @@ -187,6 +193,12 @@ typedef struct PgFdwModifyState /* working memory context */ MemoryContext temp_cxt; /* context for per-tuple temporary data */ + /* batching of values */ + MemoryContext batch_cxt; + int maxbatched; + int nbatched; + const char **values; + /* for update row movement if subplan result rel */ struct PgFdwModifyState *aux_fmstate; /* foreign-insert state, if * created */ @@ -427,6 +439,7 @@ static PgFdwModifyState *create_foreign_modify(EState *estate, CmdType operation, Plan *subplan, char *query, + char *batch_query, List *target_attrs, bool has_returning, List *retrieved_attrs); @@ -435,6 +448,11 @@ static TupleTableSlot *execute_foreign_modify(EState *estate, CmdType operation, TupleTableSlot *slot, TupleTableSlot *planSlot); +static TupleTableSlot *flush_foreign_modify(EState *estate, + ResultRelInfo *resultRelInfo, + CmdType operation, + TupleTableSlot *slot, + TupleTableSlot *planSlot); static void prepare_foreign_modify(PgFdwModifyState *fmstate); static const char **convert_prep_stmt_params(PgFdwModifyState *fmstate, ItemPointer tupleid, @@ -1659,13 +1677,16 @@ postgresPlanForeignModify(PlannerInfo *root, RangeTblEntry *rte = planner_rt_fetch(resultRelation, root); Relation rel; StringInfoData sql; + StringInfoData batch_sql; List *targetAttrs = NIL; List *withCheckOptionList = NIL; List *returningList = NIL; List *retrieved_attrs = NIL; bool doNothing = false; + List *priv = NIL; initStringInfo(&sql); + initStringInfo(&batch_sql); /* * Core code already has some lock on each rel being planned, so we can @@ -1752,6 +1773,11 @@ postgresPlanForeignModify(PlannerInfo *root, targetAttrs, doNothing, withCheckOptionList, returningList, &retrieved_attrs); + + deparseBatchInsertSql(&batch_sql, rte, resultRelation, rel, + targetAttrs, doNothing, + withCheckOptionList, returningList, + &retrieved_attrs, BATCH_SIZE); break; case CMD_UPDATE: deparseUpdateSql(&sql, rte, resultRelation, rel, @@ -1775,10 +1801,13 @@ postgresPlanForeignModify(PlannerInfo *root, * Build the fdw_private list that will be available to the executor. * Items in the list must match enum FdwModifyPrivateIndex, above. */ - return list_make4(makeString(sql.data), - targetAttrs, - makeInteger((retrieved_attrs != NIL)), - retrieved_attrs); + priv = lappend(priv, makeString(sql.data)); + priv = lappend(priv, makeString(batch_sql.data)); + priv = lappend(priv, targetAttrs); + priv = lappend(priv, makeInteger((retrieved_attrs != NIL))); + priv = lappend(priv, retrieved_attrs); + + return priv; } /* @@ -1794,6 +1823,7 @@ postgresBeginForeignModify(ModifyTableState *mtstate, { PgFdwModifyState *fmstate; char *query; + char *batch_query; List *target_attrs; bool has_returning; List *retrieved_attrs; @@ -1809,6 +1839,8 @@ postgresBeginForeignModify(ModifyTableState *mtstate, /* Deconstruct fdw_private data. */ query = strVal(list_nth(fdw_private, FdwModifyPrivateUpdateSql)); + batch_query = strVal(list_nth(fdw_private, + FdwModifyPrivateUpdateBatchSql)); target_attrs = (List *) list_nth(fdw_private, FdwModifyPrivateTargetAttnums); has_returning = intVal(list_nth(fdw_private, @@ -1827,6 +1859,7 @@ postgresBeginForeignModify(ModifyTableState *mtstate, mtstate->operation, mtstate->mt_plans[subplan_index]->plan, query, + batch_query, target_attrs, has_returning, retrieved_attrs); @@ -1925,6 +1958,7 @@ postgresBeginForeignInsert(ModifyTableState *mtstate, TupleDesc tupdesc = RelationGetDescr(rel); int attnum; StringInfoData sql; + StringInfoData batch_sql; List *targetAttrs = NIL; List *retrieved_attrs = NIL; bool doNothing = false; @@ -1946,6 +1980,7 @@ postgresBeginForeignInsert(ModifyTableState *mtstate, RelationGetRelationName(rel)))); initStringInfo(&sql); + initStringInfo(&batch_sql); /* We transmit all columns that are defined in the foreign table. */ for (attnum = 1; attnum <= tupdesc->natts; attnum++) @@ -2002,6 +2037,12 @@ postgresBeginForeignInsert(ModifyTableState *mtstate, resultRelInfo->ri_returningList, &retrieved_attrs); + /* Construct the SQL command string. */ + deparseBatchInsertSql(&batch_sql, rte, resultRelation, rel, targetAttrs, doNothing, + resultRelInfo->ri_WithCheckOptions, + resultRelInfo->ri_returningList, + &retrieved_attrs, BATCH_SIZE); + /* Construct an execution state. */ fmstate = create_foreign_modify(mtstate->ps.state, rte, @@ -2009,6 +2050,7 @@ postgresBeginForeignInsert(ModifyTableState *mtstate, CMD_INSERT, NULL, sql.data, + batch_sql.data, targetAttrs, retrieved_attrs != NIL, retrieved_attrs); @@ -2040,6 +2082,9 @@ postgresEndForeignInsert(EState *estate, Assert(fmstate != NULL); + /* Send over remaining data to insert. */ + flush_foreign_modify(estate, resultRelInfo, CMD_INSERT, NULL, NULL); + /* * If the fmstate has aux_fmstate set, get the aux_fmstate (see * postgresBeginForeignInsert()) @@ -3536,6 +3581,7 @@ create_foreign_modify(EState *estate, CmdType operation, Plan *subplan, char *query, + char *batch_query, List *target_attrs, bool has_returning, List *retrieved_attrs) @@ -3571,15 +3617,24 @@ create_foreign_modify(EState *estate, /* Set up remote query information. */ fmstate->query = query; + fmstate->batch_query = batch_query; fmstate->target_attrs = target_attrs; fmstate->has_returning = has_returning; fmstate->retrieved_attrs = retrieved_attrs; + fmstate->nbatched = 0; + fmstate->maxbatched = BATCH_SIZE * list_length(target_attrs); + fmstate->values = palloc(fmstate->maxbatched * sizeof(char *)); + /* Create context for per-tuple temp workspace. */ fmstate->temp_cxt = AllocSetContextCreate(estate->es_query_cxt, "postgres_fdw temporary data", ALLOCSET_SMALL_SIZES); + fmstate->batch_cxt = AllocSetContextCreate(estate->es_query_cxt, + "postgres_fdw batch data", + ALLOCSET_DEFAULT_SIZES); + /* Prepare for input conversion of RETURNING results. */ if (fmstate->has_returning) fmstate->attinmeta = TupleDescGetAttInMetadata(tupdesc); @@ -3646,7 +3701,9 @@ execute_foreign_modify(EState *estate, ItemPointer ctid = NULL; const char **p_values; PGresult *res; - int n_rows; + int n_rows = 0; + MemoryContext oldctx; + int i; /* The operation should be INSERT, UPDATE, or DELETE */ Assert(operation == CMD_INSERT || @@ -3677,48 +3734,163 @@ execute_foreign_modify(EState *estate, /* Convert parameters needed by prepared statement to text form */ p_values = convert_prep_stmt_params(fmstate, ctid, slot); - /* - * Execute the prepared statement. - */ - if (!PQsendQueryPrepared(fmstate->conn, - fmstate->p_name, - fmstate->p_nums, - p_values, - NULL, - NULL, - 0)) - pgfdw_report_error(ERROR, NULL, fmstate->conn, false, fmstate->query); + /* copy the parameters to the batch */ + oldctx = MemoryContextSwitchTo(fmstate->batch_cxt); + + for (i = 0; i < fmstate->p_nums; i++) + if (p_values[i] == NULL) + fmstate->values[fmstate->nbatched++] = NULL; + else + fmstate->values[fmstate->nbatched++] = pstrdup(p_values[i]); + + MemoryContextSwitchTo(oldctx); + + Assert(fmstate->nbatched <= fmstate->maxbatched); + + /* if the batch is "full" we need to flush it */ + if (fmstate->nbatched == fmstate->maxbatched || fmstate->has_returning) + { + /* + * Execute the prepared statement. + */ + if (fmstate->has_returning) + { + if (!PQsendQueryPrepared(fmstate->conn, + fmstate->p_name, + fmstate->nbatched, + fmstate->values, + NULL, + NULL, + 0)) + pgfdw_report_error(ERROR, NULL, fmstate->conn, false, fmstate->query); + } + else if (!PQsendQueryPrepared(fmstate->conn, + fmstate->p_name_batch, + fmstate->nbatched, + fmstate->values, + NULL, + NULL, + 0)) + pgfdw_report_error(ERROR, NULL, fmstate->conn, false, fmstate->batch_query); + + /* + * Get the result, and check for success. + * + * We don't use a PG_TRY block here, so be careful not to throw error + * without releasing the PGresult. + */ + res = pgfdw_get_result(fmstate->conn, fmstate->query); + if (PQresultStatus(res) != + (fmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK)) + pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query); + + /* Check number of rows affected, and fetch RETURNING tuple if any */ + if (fmstate->has_returning) + { + n_rows = PQntuples(res); + if (n_rows > 0) + store_returning_result(fmstate, slot, res); + } + else + n_rows = atoi(PQcmdTuples(res)); + + /* And clean up */ + PQclear(res); + + MemoryContextReset(fmstate->batch_cxt); + + fmstate->nbatched = 0; + } + else + /* XXX we don't know if the future insert succeeds */ + n_rows = 1; + + MemoryContextReset(fmstate->temp_cxt); /* - * Get the result, and check for success. - * - * We don't use a PG_TRY block here, so be careful not to throw error - * without releasing the PGresult. + * Return NULL if nothing was inserted/updated/deleted on the remote end */ - res = pgfdw_get_result(fmstate->conn, fmstate->query); - if (PQresultStatus(res) != - (fmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK)) - pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query); + return (n_rows > 0) ? slot : NULL; +} - /* Check number of rows affected, and fetch RETURNING tuple if any */ - if (fmstate->has_returning) +/* + * flush_foreign_modify + * Perform foreign-table modification as required, and fetch RETURNING + * result if any. (This is the shared guts of postgresExecForeignInsert, + * postgresExecForeignUpdate, and postgresExecForeignDelete.) + */ +static TupleTableSlot * +flush_foreign_modify(EState *estate, + ResultRelInfo *resultRelInfo, + CmdType operation, + TupleTableSlot *slot, + TupleTableSlot *planSlot) +{ + PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState; + PGresult *res; + int n_rows = 0; + int i; + + /* The operation should be INSERT, UPDATE, or DELETE */ + Assert(operation == CMD_INSERT || + operation == CMD_UPDATE || + operation == CMD_DELETE); + + /* Set up the prepared statement on the remote server, if we didn't yet */ + if (!fmstate->p_name) + prepare_foreign_modify(fmstate); + + /* if the batch is "full" we need to flush it */ + i = 0; + while (i < fmstate->nbatched) { - n_rows = PQntuples(res); - if (n_rows > 0) - store_returning_result(fmstate, slot, res); + /* + * Execute the prepared statement. + */ + if (!PQsendQueryPrepared(fmstate->conn, + fmstate->p_name, + fmstate->p_nums, + &fmstate->values[i], + NULL, + NULL, + 0)) + pgfdw_report_error(ERROR, NULL, fmstate->conn, false, fmstate->query); + + /* + * Get the result, and check for success. + * + * We don't use a PG_TRY block here, so be careful not to throw error + * without releasing the PGresult. + */ + res = pgfdw_get_result(fmstate->conn, fmstate->query); + if (PQresultStatus(res) != + (fmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK)) + pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query); + + /* Check number of rows affected, and fetch RETURNING tuple if any */ + if (fmstate->has_returning) + { + n_rows = PQntuples(res); + if (n_rows > 0) + store_returning_result(fmstate, slot, res); + } + else + n_rows = atoi(PQcmdTuples(res)); + + /* And clean up */ + PQclear(res); + + i += fmstate->p_nums; } - else - n_rows = atoi(PQcmdTuples(res)); - /* And clean up */ - PQclear(res); + Assert(i == fmstate->nbatched); MemoryContextReset(fmstate->temp_cxt); /* * Return NULL if nothing was inserted/updated/deleted on the remote end */ - return (n_rows > 0) ? slot : NULL; + return NULL; } /* @@ -3729,7 +3901,9 @@ static void prepare_foreign_modify(PgFdwModifyState *fmstate) { char prep_name[NAMEDATALEN]; + char prep_name_batch[NAMEDATALEN]; char *p_name; + char *p_name_batch; PGresult *res; /* Construct name we'll use for the prepared statement. */ @@ -3737,6 +3911,11 @@ prepare_foreign_modify(PgFdwModifyState *fmstate) GetPrepStmtNumber(fmstate->conn)); p_name = pstrdup(prep_name); + /* Construct name we'll use for the batch prepared statement. */ + snprintf(prep_name_batch, sizeof(prep_name_batch), "pgsql_fdw_prep_%u", + GetPrepStmtNumber(fmstate->conn)); + p_name_batch = pstrdup(prep_name_batch); + /* * We intentionally do not specify parameter types here, but leave the * remote server to derive them by default. This avoids possible problems @@ -3762,8 +3941,22 @@ prepare_foreign_modify(PgFdwModifyState *fmstate) pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query); PQclear(res); + if (fmstate->batch_query && + !PQsendPrepare(fmstate->conn, + p_name_batch, + fmstate->batch_query, + 0, + NULL)) + pgfdw_report_error(ERROR, NULL, fmstate->conn, false, fmstate->batch_query); + + res = pgfdw_get_result(fmstate->conn, fmstate->batch_query); + if (PQresultStatus(res) != PGRES_COMMAND_OK) + pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->batch_query); + PQclear(res); + /* This action shows that the prepare has been done. */ fmstate->p_name = p_name; + fmstate->p_name_batch = p_name_batch; } /* diff --git a/contrib/postgres_fdw/postgres_fdw.h b/contrib/postgres_fdw/postgres_fdw.h index eef410db39..7e4342cab6 100644 --- a/contrib/postgres_fdw/postgres_fdw.h +++ b/contrib/postgres_fdw/postgres_fdw.h @@ -162,6 +162,11 @@ extern void deparseInsertSql(StringInfo buf, RangeTblEntry *rte, List *targetAttrs, bool doNothing, List *withCheckOptionList, List *returningList, List **retrieved_attrs); +extern void deparseBatchInsertSql(StringInfo buf, RangeTblEntry *rte, + Index rtindex, Relation rel, + List *targetAttrs, bool doNothing, + List *withCheckOptionList, List *returningList, + List **retrieved_attrs, int batchSize); extern void deparseUpdateSql(StringInfo buf, RangeTblEntry *rte, Index rtindex, Relation rel, List *targetAttrs, -- 2.25.4