diff --git a/contrib/postgres_fdw/expected/postgres_fdw.out b/contrib/postgres_fdw/expected/postgres_fdw.out
index 0f5271d476e..002c0a4ede5 100644
--- a/contrib/postgres_fdw/expected/postgres_fdw.out
+++ b/contrib/postgres_fdw/expected/postgres_fdw.out
@@ -252,6 +252,7 @@ SELECT c3, c4 FROM ft1 ORDER BY c3, c1 LIMIT 1;  -- should work again
 -- Now we should be able to run ANALYZE.
 -- To exercise multiple code paths, we use local stats on ft1
 -- and remote-estimate mode on ft2.
+ALTER SERVER loopback OPTIONS (ADD fetch_stats 'false');
 ANALYZE ft1;
 ALTER FOREIGN TABLE ft2 OPTIONS (use_remote_estimate 'true');
 -- ===================================================================
@@ -4559,7 +4560,8 @@ REINDEX TABLE reind_fdw_parent; -- ok
 REINDEX TABLE CONCURRENTLY reind_fdw_parent; -- ok
 DROP TABLE reind_fdw_parent;
 -- ===================================================================
--- conversion error
+-- conversion error, will generate a WARNING for imported stats and an
+-- error on locally computed stats.
 -- ===================================================================
 ALTER FOREIGN TABLE ft1 ALTER COLUMN c8 TYPE int;
 SELECT * FROM ft1 ftx(x1,x2,x3,x4,x5,x6,x7,x8) WHERE x1 = 1;  -- ERROR
@@ -11481,6 +11483,12 @@ CREATE FOREIGN TABLE async_p2 PARTITION OF async_pt FOR VALUES FROM (2000) TO (3
   SERVER loopback2 OPTIONS (table_name 'base_tbl2');
 INSERT INTO async_p1 SELECT 1000 + i, i, to_char(i, 'FM0000') FROM generate_series(0, 999, 5) i;
 INSERT INTO async_p2 SELECT 2000 + i, i, to_char(i, 'FM0000') FROM generate_series(0, 999, 5) i;
+-- Will failover to sampling on async_p2 because fetch_stats = true (the default) on
+-- loopback2, and is set to false on loopback
+ANALYZE async_pt;
+WARNING:  could not import statistics for foreign table "public.async_p2" --- remote table "public.base_tbl2" has no relation statistics to import
+-- Turning off fetch_stats at the table level for async_p2 removes the warning.
+ALTER FOREIGN TABLE async_p2 OPTIONS (ADD fetch_stats 'false');
 ANALYZE async_pt;
 -- simple queries
 CREATE TABLE result_tbl (a int, b int, c text);
@@ -11587,6 +11595,11 @@ CREATE TABLE base_tbl3 (a int, b int, c text);
 CREATE FOREIGN TABLE async_p3 PARTITION OF async_pt FOR VALUES FROM (3000) TO (4000)
   SERVER loopback2 OPTIONS (table_name 'base_tbl3');
 INSERT INTO async_p3 SELECT 3000 + i, i, to_char(i, 'FM0000') FROM generate_series(0, 999, 5) i;
+-- Will fail because fetch_stats = true (the default) on async_p3/loopback2
+ANALYZE async_pt;
+WARNING:  could not import statistics for foreign table "public.async_p3" --- remote table "public.base_tbl3" has no relation statistics to import
+-- Turn off fetch_stats at the server level.
+ALTER SERVER loopback2 OPTIONS (ADD fetch_stats 'false');
 ANALYZE async_pt;
 EXPLAIN (VERBOSE, COSTS OFF)
 INSERT INTO result_tbl SELECT * FROM async_pt WHERE b === 505;
diff --git a/contrib/postgres_fdw/option.c b/contrib/postgres_fdw/option.c
index b0bd72d1e58..5b7726800d0 100644
--- a/contrib/postgres_fdw/option.c
+++ b/contrib/postgres_fdw/option.c
@@ -120,6 +120,7 @@ postgres_fdw_validator(PG_FUNCTION_ARGS)
 			strcmp(def->defname, "async_capable") == 0 ||
 			strcmp(def->defname, "parallel_commit") == 0 ||
 			strcmp(def->defname, "parallel_abort") == 0 ||
+			strcmp(def->defname, "fetch_stats") == 0 ||
 			strcmp(def->defname, "keep_connections") == 0)
 		{
 			/* these accept only boolean values */
@@ -278,6 +279,10 @@ InitPgFdwOptions(void)
 		{"use_scram_passthrough", ForeignServerRelationId, false},
 		{"use_scram_passthrough", UserMappingRelationId, false},
 
+		/* fetch_stats is available on both server and table */
+		{"fetch_stats", ForeignServerRelationId, false},
+		{"fetch_stats", ForeignTableRelationId, false},
+
 		/*
 		 * sslcert and sslkey are in fact libpq options, but we repeat them
 		 * here to allow them to appear in both foreign server context (when
diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c
index 41e47cc795b..f4c70ef8dd3 100644
--- a/contrib/postgres_fdw/postgres_fdw.c
+++ b/contrib/postgres_fdw/postgres_fdw.c
@@ -21,8 +21,11 @@
 #include "commands/defrem.h"
 #include "commands/explain_format.h"
 #include "commands/explain_state.h"
+#include "commands/vacuum.h"
 #include "executor/execAsync.h"
 #include "executor/instrument.h"
+#include "executor/spi.h"
+#include "fmgr.h"
 #include "foreign/fdwapi.h"
 #include "funcapi.h"
 #include "miscadmin.h"
@@ -40,9 +43,11 @@
 #include "optimizer/tlist.h"
 #include "parser/parsetree.h"
 #include "postgres_fdw.h"
+#include "statistics/statistics.h"
 #include "storage/latch.h"
 #include "utils/builtins.h"
 #include "utils/float.h"
+#include "utils/fmgroids.h"
 #include "utils/guc.h"
 #include "utils/lsyscache.h"
 #include "utils/memutils.h"
@@ -318,6 +323,22 @@ typedef struct
 	List	   *already_used;	/* expressions already dealt with */
 } ec_member_foreign_arg;
 
+/* Result sets that are returned from a foreign statistics scan */
+typedef struct
+{
+	PGresult   *rel;
+	PGresult   *att;
+} RemoteStatsResults;
+
+/* Pairs of remote columns with local attnums */
+typedef struct
+{
+	AttrNumber	local_attnum;
+	char		local_attname[NAMEDATALEN];
+	char		remote_attname[NAMEDATALEN];
+	int			res_index;
+} RemoteAttributeMapping;
+
 /*
  * SQL functions
  */
@@ -403,6 +424,9 @@ static void postgresExecForeignTruncate(List *rels,
 static bool postgresAnalyzeForeignTable(Relation relation,
 										AcquireSampleRowsFunc *func,
 										BlockNumber *totalpages);
+static bool postgresImportStatistics(Relation relation,
+									 List *va_cols,
+									 int elevel);
 static List *postgresImportForeignSchema(ImportForeignSchemaStmt *stmt,
 										 Oid serverOid);
 static void postgresGetForeignJoinPaths(PlannerInfo *root,
@@ -547,6 +571,210 @@ static void merge_fdw_options(PgFdwRelationInfo *fpinfo,
 							  const PgFdwRelationInfo *fpinfo_i);
 static int	get_batch_size_option(Relation rel);
 
+/*
+ * Static queries for querying remote statistics.
+ */
+
+/* All static relstats queries have the same column order */
+enum RelStatsColumns
+{
+	RELSTATS_RELKIND = 0,
+	RELSTATS_RELPAGES,
+	RELSTATS_RELTUPLES,
+	RELSTATS_NUM_FIELDS,
+};
+
+/* range stats introduced in v17 */
+static const char *attstats_query_17 =
+"SELECT DISTINCT ON (s.attname COLLATE \"C\") attname, s.null_frac, s.avg_width, "
+"s.n_distinct, s.most_common_vals, s.most_common_freqs, "
+"s.histogram_bounds, s.correlation, s.most_common_elems, "
+"s.most_common_elem_freqs, s.elem_count_histogram, "
+"s.range_length_histogram, s.range_empty_frac, s.range_bounds_histogram "
+"FROM pg_catalog.pg_stats AS s "
+"WHERE s.schemaname = $1 AND s.tablename = $2 "
+"AND s.attname = ANY($3::text[]) "
+"ORDER BY s.attname COLLATE \"C\", s.inherited DESC";
+
+/* elements stats introduced in 9.2 */
+static const char *attstats_query_9_2 =
+"SELECT DISTINCT ON (s.attname COLLATE \"C\") attname, s.null_frac, s.avg_width, "
+"s.n_distinct, s.most_common_vals, s.most_common_freqs, "
+"s.histogram_bounds, s.correlation, s.most_common_elems, "
+"s.most_common_elem_freqs, s.elem_count_histogram, "
+"NULL AS range_length_histogram, NULL AS range_empty_frac, "
+"NULL AS range_bounds_histogram "
+"FROM pg_catalog.pg_stats AS s "
+"WHERE s.schemaname = $1 AND s.tablename = $2 "
+"AND s.attname = ANY($3::text[]) "
+"ORDER BY s.attname COLLATE \"C\", s.inherited DESC";
+
+/* inherited introduced in 9.0 */
+static const char *attstats_query_9_0 =
+"SELECT DISTINCT ON (s.attname COLLATE \"C\") attname, s.null_frac, s.avg_width, "
+"s.n_distinct, s.most_common_vals, s.most_common_freqs, "
+"s.histogram_bounds, s.correlation, NULL AS most_common_elems, "
+"NULL AS most_common_elem_freqs, NULL AS elem_count_histogram, "
+"NULL AS range_length_histogram, NULL AS range_empty_frac, "
+"NULL AS range_bounds_histogram "
+"FROM pg_catalog.pg_stats AS s "
+"WHERE s.schemaname = $1 AND s.tablename = $2 "
+"AND s.attname = ANY($3::text[]) "
+"ORDER BY s.attname COLLATE \"C\", s.inherited DESC";
+
+static const char *attstats_query_default =
+"SELECT s.attname, s.null_frac, s.avg_width, "
+"s.n_distinct, s.most_common_vals, s.most_common_freqs, "
+"s.histogram_bounds, s.correlation, NULL AS most_common_elems, "
+"NULL AS most_common_elem_freqs, NULL AS elem_count_histogram, "
+"NULL AS range_length_histogram, NULL AS range_empty_frac, "
+"NULL AS range_bounds_histogram "
+"FROM pg_catalog.pg_stats AS s "
+"WHERE s.schemaname = $1 AND s.tablename = $2 "
+"AND s.attname = ANY($3::text[]) "
+"ORDER BY s.attname COLLATE \"C\"";
+
+/* All static attstats queries have the same column order */
+enum AttStatsColumns
+{
+	ATTSTATS_ATTNAME = 0,
+	ATTSTATS_NULL_FRAC,
+	ATTSTATS_AVG_WIDTH,
+	ATTSTATS_N_DISTINCT,
+	ATTSTATS_MOST_COMMON_VALS,
+	ATTSTATS_MOST_COMMON_FREQS,
+	ATTSTATS_HISTOGRAM_BOUNDS,
+	ATTSTATS_CORRELATION,
+	ATTSTATS_MOST_COMMON_ELEMS,
+	ATTSTATS_MOST_COMMON_ELEM_FREQS,
+	ATTSTATS_ELEM_COUNT_HISTOGRAM,
+	ATTSTATS_RANGE_LENGTH_HISTOGRAM,
+	ATTSTATS_RANGE_EMPTY_FRAC,
+	ATTSTATS_RANGE_BOUNDS_HISTOGRAM,
+	ATTSTATS_NUM_FIELDS,
+};
+
+static const char *relimport_sql =
+	"SELECT pg_catalog.pg_restore_relation_stats(\n"
+	"\t'version', $1,\n"
+	"\t'schemaname', $2,\n"
+	"\t'relname', $3,\n"
+	"\t'relpages', $4::integer,\n"
+	"\t'reltuples', $5::real)";
+
+enum RelImportSqlArgs
+{
+	RELIMPORT_SQL_VERSION = 0,
+	RELIMPORT_SQL_SCHEMANAME,
+	RELIMPORT_SQL_RELNAME,
+	RELIMPORT_SQL_RELPAGES,
+	RELIMPORT_SQL_RELTUPLES,
+	RELIMPORT_SQL_NUM_FIELDS
+};
+
+static const Oid relimport_argtypes[RELIMPORT_SQL_NUM_FIELDS] =
+{
+	INT4OID, TEXTOID, TEXTOID, TEXTOID,
+	TEXTOID,
+};
+
+static const char *attimport_sql =
+	"SELECT pg_catalog.pg_restore_attribute_stats(\n"
+	"\t'version', $1,\n"
+	"\t'schemaname', $2,\n"
+	"\t'relname', $3,\n"
+	"\t'attnum', $4,\n"
+	"\t'inherited', false::boolean,\n"
+	"\t'null_frac', $5::real,\n"
+	"\t'avg_width', $6::integer,\n"
+	"\t'n_distinct', $7::real,\n"
+	"\t'most_common_vals', $8,\n"
+	"\t'most_common_freqs', $9::real[],\n"
+	"\t'histogram_bounds', $10,\n"
+	"\t'correlation', $11::real,\n"
+	"\t'most_common_elems', $12,\n"
+	"\t'most_common_elem_freqs', $13::real[],\n"
+	"\t'elem_count_histogram', $14::real[],\n"
+	"\t'range_length_histogram', $15,\n"
+	"\t'range_empty_frac', $16::real,\n"
+	"\t'range_bounds_histogram', $17)";
+
+enum AttImportSqlArgs
+{
+	ATTIMPORT_SQL_VERSION = 0,
+	ATTIMPORT_SQL_SCHEMANAME,
+	ATTIMPORT_SQL_RELNAME,
+	ATTIMPORT_SQL_ATTNUM,
+	ATTIMPORT_SQL_NULL_FRAC,
+	ATTIMPORT_SQL_AVG_WIDTH,
+	ATTIMPORT_SQL_N_DISTINCT,
+	ATTIMPORT_SQL_MOST_COMMON_VALS,
+	ATTIMPORT_SQL_MOST_COMMON_FREQS,
+	ATTIMPORT_SQL_HISTOGRAM_BOUNDS,
+	ATTIMPORT_SQL_CORRELATION,
+	ATTIMPORT_SQL_MOST_COMMON_ELEMS,
+	ATTIMPORT_SQL_MOST_COMMON_ELEM_FREQS,
+	ATTIMPORT_SQL_ELEM_COUNT_HISTOGRAM,
+	ATTIMPORT_SQL_RANGE_LENGTH_HISTOGRAM,
+	ATTIMPORT_SQL_RANGE_EMPTY_FRAC,
+	ATTIMPORT_SQL_RANGE_BOUNDS_HISTOGRAM,
+	ATTIMPORT_SQL_NUM_FIELDS
+};
+
+static const Oid attimport_argtypes[ATTIMPORT_SQL_NUM_FIELDS] =
+{
+	INT4OID, TEXTOID, TEXTOID, INT2OID,
+	TEXTOID, TEXTOID, TEXTOID, TEXTOID,
+	TEXTOID, TEXTOID, TEXTOID, TEXTOID,
+	TEXTOID, TEXTOID, TEXTOID, TEXTOID,
+	TEXTOID,
+};
+
+/* Pairs AttStatsColumns:AttImportSqlArgs Map */
+typedef struct
+{
+	enum AttStatsColumns res_field;
+	enum AttImportSqlArgs arg_num;
+} AttrResultArgMap;
+
+/*
+ * The mapping of attribute stats query columns to the positional arguments in
+ * the prepared pg_restore_attribute_stats() statement.
+ */
+#define NUM_MAPPED_ATTIMPORT_ARGS 13
+static const AttrResultArgMap attr_result_arg_map[NUM_MAPPED_ATTIMPORT_ARGS] =
+{
+	{ATTSTATS_NULL_FRAC, ATTIMPORT_SQL_NULL_FRAC},
+	{ATTSTATS_AVG_WIDTH, ATTIMPORT_SQL_AVG_WIDTH},
+	{ATTSTATS_N_DISTINCT, ATTIMPORT_SQL_N_DISTINCT},
+	{ATTSTATS_MOST_COMMON_VALS, ATTIMPORT_SQL_MOST_COMMON_VALS},
+	{ATTSTATS_MOST_COMMON_FREQS, ATTIMPORT_SQL_MOST_COMMON_FREQS},
+	{ATTSTATS_HISTOGRAM_BOUNDS, ATTIMPORT_SQL_HISTOGRAM_BOUNDS},
+	{ATTSTATS_CORRELATION, ATTIMPORT_SQL_CORRELATION},
+	{ATTSTATS_MOST_COMMON_ELEMS, ATTIMPORT_SQL_MOST_COMMON_ELEMS},
+	{ATTSTATS_MOST_COMMON_ELEM_FREQS, ATTIMPORT_SQL_MOST_COMMON_ELEM_FREQS},
+	{ATTSTATS_ELEM_COUNT_HISTOGRAM, ATTIMPORT_SQL_ELEM_COUNT_HISTOGRAM},
+	{ATTSTATS_RANGE_LENGTH_HISTOGRAM, ATTIMPORT_SQL_RANGE_LENGTH_HISTOGRAM},
+	{ATTSTATS_RANGE_EMPTY_FRAC, ATTIMPORT_SQL_RANGE_EMPTY_FRAC},
+	{ATTSTATS_RANGE_BOUNDS_HISTOGRAM, ATTIMPORT_SQL_RANGE_BOUNDS_HISTOGRAM},
+};
+
+static const char *attclear_sql =
+	"SELECT pg_catalog.pg_clear_attribute_stats($1, $2, $3, false)";
+
+enum AttClearSqlArgs
+{
+	ATTCLEAR_SQL_SCHEMANAME = 0,
+	ATTCLEAR_SQL_RELNAME,
+	ATTCLEAR_SQL_ATTNAME,
+	ATTCLEAR_SQL_NUM_FIELDS
+};
+
+static const Oid attclear_argtypes[ATTCLEAR_SQL_NUM_FIELDS] =
+{
+	TEXTOID, TEXTOID, TEXTOID,
+};
+
 
 /*
  * Foreign-data wrapper handler function: return a struct with pointers
@@ -596,6 +824,7 @@ postgres_fdw_handler(PG_FUNCTION_ARGS)
 
 	/* Support functions for ANALYZE */
 	routine->AnalyzeForeignTable = postgresAnalyzeForeignTable;
+	routine->ImportStatistics = postgresImportStatistics;
 
 	/* Support functions for IMPORT FOREIGN SCHEMA */
 	routine->ImportForeignSchema = postgresImportForeignSchema;
@@ -4936,6 +5165,660 @@ postgresAnalyzeForeignTable(Relation relation,
 	return true;
 }
 
+/*
+ * Test if an attribute name is in the list.
+ *
+ * An empty list means that all attribute names are in the list.
+ */
+static bool
+attname_in_list(const char *attname, List *va_cols)
+{
+	ListCell   *le;
+
+	if (va_cols == NIL)
+		return true;
+
+	foreach(le, va_cols)
+	{
+		char	   *col = strVal(lfirst(le));
+
+		if (strcmp(attname, col) == 0)
+			return true;
+	}
+	return false;
+}
+
+/*
+ * Move a string value from a result set to a Text value of a Datum array.
+ */
+static void
+map_field_to_arg(PGresult *res, int row, int field,
+				 int arg, Datum *values, char *nulls)
+{
+	if (PQgetisnull(res, row, field))
+	{
+		values[arg] = (Datum) 0;
+		nulls[arg] = 'n';
+	}
+	else
+	{
+		const char *s = PQgetvalue(res, row, field);
+
+		values[arg] = CStringGetTextDatum(s);
+		nulls[arg] = ' ';
+	}
+}
+
+/*
+ * Check the 1x1 result set of a pg_restore_*_stats() command for success.
+ */
+static bool
+import_spi_query_ok(void)
+{
+	char	   *s;
+	bool		ok;
+
+	Assert(SPI_tuptable != NULL);
+	Assert(SPI_processed == 1);
+	s = SPI_getvalue(SPI_tuptable->vals[0], SPI_tuptable->tupdesc, 1);
+	ok = (s != NULL && s[0] == 't' && s[1] == '\0');
+	pfree(s);
+
+	return ok;
+}
+
+/*
+ * Import fetched statistics into the local statistics tables.
+ */
+static bool
+import_fetched_statistics(const char *schemaname,
+						  const char *relname,
+						  int server_version_num,
+						  int natts,
+						  const RemoteAttributeMapping *remattrmap,
+						  RemoteStatsResults *remstats)
+{
+	SPIPlanPtr	attimport_plan = NULL;
+	SPIPlanPtr	attclear_plan = NULL;
+	Datum		values[ATTIMPORT_SQL_NUM_FIELDS];
+	char		nulls[ATTIMPORT_SQL_NUM_FIELDS];
+	int			spirc;
+	bool		ok = false;
+
+	/* Assign all the invariant parameters */
+	values[ATTIMPORT_SQL_VERSION] = Int32GetDatum(server_version_num);
+	nulls[ATTIMPORT_SQL_VERSION] = ' ';
+
+	values[ATTIMPORT_SQL_SCHEMANAME] = CStringGetTextDatum(schemaname);
+	nulls[ATTIMPORT_SQL_SCHEMANAME] = ' ';
+
+	values[ATTIMPORT_SQL_RELNAME] = CStringGetTextDatum(relname);
+	nulls[ATTIMPORT_SQL_RELNAME] = ' ';
+
+	SPI_connect();
+
+	/*
+	 * We import the attribute statistics first, because those are more prone
+	 * to errors. This avoids making a modification of pg_class that will just
+	 * get rolled back by a failed attribute import.
+	 */
+	if (remstats->att != NULL)
+	{
+		Assert(PQnfields(remstats->att) == ATTSTATS_NUM_FIELDS);
+		Assert(PQntuples(remstats->att) >= 1);
+
+		attimport_plan = SPI_prepare(attimport_sql, ATTIMPORT_SQL_NUM_FIELDS,
+									 (Oid *) attimport_argtypes);
+		if (attimport_plan == NULL)
+			elog(ERROR, "failed to prepare attimport_sql query");
+
+		attclear_plan = SPI_prepare(attclear_sql, ATTCLEAR_SQL_NUM_FIELDS,
+									(Oid *) attclear_argtypes);
+		if (attclear_plan == NULL)
+			elog(ERROR, "failed to prepare attclear_sql query");
+
+		nulls[ATTIMPORT_SQL_ATTNUM] = ' ';
+
+		for (int mapidx = 0; mapidx < natts; mapidx++)
+		{
+			int			row = remattrmap[mapidx].res_index;
+			Datum		*values2 = values + 1;
+			char		*nulls2 = nulls + 1;
+
+			/* All mappings should have been assigned a result set row. */
+			Assert(row >= 0);
+
+			/*
+			 * Check for user-requested abort.
+			 */
+			CHECK_FOR_INTERRUPTS();
+
+			/*
+			 * First, clear existing attribute stats.
+			 *
+			 * We can re-use the values/nulls because the number of parameters
+			 * is less and the first two params are the same as the second and
+			 * third ones in attimport_sql.
+			 */
+			values2[ATTCLEAR_SQL_ATTNAME] =
+				CStringGetTextDatum(remattrmap[mapidx].local_attname);
+
+			spirc = SPI_execute_plan(attclear_plan, values2, nulls2, false, 1);
+			if (spirc != SPI_OK_SELECT)
+				elog(ERROR, "failed to execute attclear_sql query for column \"%s\" of foreign table \"%s.%s\"",
+					 remattrmap[mapidx].local_attname, schemaname, relname);
+
+			values[ATTIMPORT_SQL_ATTNUM] =
+				Int16GetDatum(remattrmap[mapidx].local_attnum);
+
+			/* Loop through all mappable columns to set remaining arguments */
+			for (int i = 0; i < NUM_MAPPED_ATTIMPORT_ARGS; i++)
+				map_field_to_arg(remstats->att, row,
+								 attr_result_arg_map[i].res_field,
+								 attr_result_arg_map[i].arg_num,
+								 values, nulls);
+
+			spirc = SPI_execute_plan(attimport_plan, values, nulls, false, 1);
+			if (spirc != SPI_OK_SELECT)
+				elog(ERROR, "failed to execute attimport_sql query for column \"%s\" of foreign table \"%s.%s\"",
+					 remattrmap[mapidx].local_attname, schemaname, relname);
+
+			if (!import_spi_query_ok())
+			{
+				ereport(WARNING,
+						errmsg("could not import statistics for foreign table \"%s.%s\" --- attribute statistics import failed for column \"%s\" of this foreign table",
+							   schemaname, relname,
+							   remattrmap[mapidx].local_attname));
+				goto import_cleanup;
+			}
+		}
+	}
+
+	/*
+	 * Import relstats.
+	 *
+	 * Import relation stats. We only perform this once, so there is no point
+	 * in preparing the statement.
+	 *
+	 * We can re-use the values/nulls because the number of parameters is less
+	 * and the first three params are the same as attimport_sql.
+	 */
+	Assert(remstats->rel != NULL);
+	Assert(PQnfields(remstats->rel) == RELSTATS_NUM_FIELDS);
+	Assert(PQntuples(remstats->rel) == 1);
+	map_field_to_arg(remstats->rel, 0, RELSTATS_RELPAGES,
+					 RELIMPORT_SQL_RELPAGES, values, nulls);
+	map_field_to_arg(remstats->rel, 0, RELSTATS_RELTUPLES,
+					 RELIMPORT_SQL_RELTUPLES, values, nulls);
+
+	spirc = SPI_execute_with_args(relimport_sql,
+								  RELIMPORT_SQL_NUM_FIELDS,
+								  (Oid *) relimport_argtypes,
+								  values, nulls, false, 1);
+	if (spirc != SPI_OK_SELECT)
+		elog(ERROR, "failed to execute relimport_sql query for foreign table \"%s.%s\"",
+			 schemaname, relname);
+
+	if (!import_spi_query_ok())
+	{
+		ereport(WARNING,
+				errmsg("could not import statistics for foreign table \"%s.%s\" --- relation statistics import failed for this foreign table",
+					   schemaname, relname));
+		goto import_cleanup;
+	}
+
+	ok = true;
+
+import_cleanup:
+	if (attimport_plan)
+		SPI_freeplan(attimport_plan);
+	if (attclear_plan)
+		SPI_freeplan(attclear_plan);
+	SPI_finish();
+	return ok;
+}
+
+/*
+ * Attempt to fetch remote relations stats.
+ */
+static PGresult *
+fetch_relstats(PGconn *conn,
+			   const char *remote_schemaname, const char *remote_relname)
+{
+	const char *params[2] = {remote_schemaname, remote_relname};
+	const char *sql = "SELECT c.relkind, c.relpages, c.reltuples "
+		"FROM pg_catalog.pg_class AS c "
+		"JOIN pg_catalog.pg_namespace AS n "
+		"ON n.oid = c.relnamespace "
+		"WHERE n.nspname = $1 AND c.relname = $2";
+	PGresult   *res = NULL;
+
+	if (!PQsendQueryParams(conn, sql, 2, NULL, params, NULL, NULL, 0))
+		pgfdw_report_error(NULL, conn, sql);
+
+	res = pgfdw_get_result(conn);
+	if (PQresultStatus(res) != PGRES_TUPLES_OK)
+		pgfdw_report_error(res, conn, sql);
+
+	if (PQntuples(res) != 1 ||
+		PQnfields(res) != RELSTATS_NUM_FIELDS ||
+		PQgetisnull(res, 0, RELSTATS_RELKIND))
+		elog(ERROR, "unexpected result from fetch_relstats query");
+
+	return res;
+}
+
+/*
+ * Attempt to fetch remote attribute stats.
+ */
+static PGresult *
+fetch_attstats(PGconn *conn, int server_version_num,
+			   const char *remote_schemaname, const char *remote_relname,
+			   const char *column_list)
+{
+	const char *params[3] = {remote_schemaname, remote_relname, column_list};
+	const char *sql;
+	PGresult   *res;
+
+	if (server_version_num >= 170000)
+		sql = attstats_query_17;
+	else if (server_version_num >= 90200)
+		sql = attstats_query_9_2;
+	else if (server_version_num >= 90000)
+		sql = attstats_query_9_0;
+	else
+		sql = attstats_query_default;
+
+	if (!PQsendQueryParams(conn, sql, 3, NULL, params, NULL, NULL, 0))
+		pgfdw_report_error(NULL, conn, sql);
+
+	res = pgfdw_get_result(conn);
+	if (PQresultStatus(res) != PGRES_TUPLES_OK)
+		pgfdw_report_error(res, conn, sql);
+
+	if (PQnfields(res) != ATTSTATS_NUM_FIELDS)
+		elog(ERROR, "unexpected result from fetch_attstats query");
+
+	return res;
+}
+
+/*
+ * Match local columns to result set rows.
+ *
+ * As the result set consists of the attribute stats for some/all of distinct
+ * mapped remote columns in the RemoteAttributeMapping, every entry in it
+ * should have at most one match in the result set; which is also ordered by
+ * attname, so we find such pairs by doing a merge join.
+ *
+ * Returns true if every entry in it has a match, and false if not.
+ */
+static bool
+match_attrmap(PGresult *res,
+			  const char *schemaname,
+			  const char *relname,
+			  const char *remote_schemaname,
+			  const char *remote_relname,
+			  int natts,
+			  RemoteAttributeMapping *remattrmap)
+{
+	int			numrows = PQntuples(res);
+	int			row = -1;
+
+	/* No work if there are no stats rows. */
+	if (numrows == 0)
+	{
+		ereport(WARNING,
+				errmsg("could not import statistics for foreign table \"%s.%s\" --- remote table \"%s.%s\" has no attribute statistics to import",
+					   schemaname, relname,
+					   remote_schemaname, remote_relname));
+		return false;
+	}
+
+	/* Scan all entries in the RemoteAttributeMapping. */
+	for (int i = 0; i < natts; i++)
+	{
+		/*
+		 * First, check whether the entry matches the current stats row, if it
+		 * is set.
+		 */
+		if (row >= 0 &&
+			strcmp(remattrmap[i].remote_attname,
+				   PQgetvalue(res, row, ATTSTATS_ATTNAME)) == 0)
+		{
+			remattrmap[i].res_index = row;
+			continue;
+		}
+
+		/*
+		 * If we've exhausted all stats rows, it means the stats for the entry
+		 * are missing.
+		 */
+		if (row >= numrows - 1)
+		{
+			ereport(WARNING,
+					errmsg("could not import statistics for foreign table \"%s.%s\" --- no attribute statistics found for column \"%s\" of remote table \"%s.%s\"",
+						   schemaname, relname,
+						   remattrmap[i].remote_attname,
+						   remote_schemaname, remote_relname));
+			return false;
+		}
+
+		/* Advance to the next stats row. */
+		row += 1;
+
+		/*
+		 * If the attname in the entry is less than that in the next stats row,
+		 * it means the stats for the entry are missing.
+		 */
+		if (strcmp(remattrmap[i].remote_attname,
+				   PQgetvalue(res, row, ATTSTATS_ATTNAME)) < 0)
+		{
+			ereport(WARNING,
+					errmsg("could not import statistics for foreign table \"%s.%s\" --- no attribute statistics found for column \"%s\" of remote table \"%s.%s\"",
+						   schemaname, relname,
+						   remattrmap[i].remote_attname,
+						   remote_schemaname, remote_relname));
+			return false;
+		}
+
+		/* We should not get a stats row we didn't expect. */
+		if (strcmp(remattrmap[i].remote_attname,
+				   PQgetvalue(res, row, ATTSTATS_ATTNAME)) > 0)
+			elog(ERROR, "unexpected result from fetch_attstats query");
+
+		/* We found a match. */
+		Assert(strcmp(remattrmap[i].remote_attname,
+					  PQgetvalue(res, row, ATTSTATS_ATTNAME)) == 0);
+		remattrmap[i].res_index = row;
+	}
+
+	/* We should have exhausted all stats rows. */
+	if (row < numrows - 1)
+		elog(ERROR, "unexpected result from fetch_attstats query");
+
+	return true;
+}
+
+/*
+ * Attempt to fetch statistics from a remote server.
+ */
+static bool
+fetch_remote_statistics(PGconn *conn,
+						const char *schemaname,
+						const char *relname,
+						const char *remote_schemaname,
+						const char *remote_relname,
+						int server_version_num,
+						int natts,
+						RemoteAttributeMapping *remattrmap,
+						const char *column_list,
+						RemoteStatsResults *remstats)
+{
+	PGresult   *relstats = NULL;
+	PGresult   *attstats = NULL;
+	char		relkind;
+	double		reltuples;
+
+	relstats = fetch_relstats(conn, remote_schemaname, remote_relname);
+
+	/*
+	 * Verify that the remote table is the sort that can have meaningful stats
+	 * in pg_stats.
+	 *
+	 * Note that while relations of kinds RELKIND_INDEX and
+	 * RELKIND_PARTITIONED_INDEX can have rows in pg_stats, they obviously
+	 * can't support a foreign table.
+	 */
+	relkind = *PQgetvalue(relstats, 0, RELSTATS_RELKIND);
+	switch (relkind)
+	{
+		case RELKIND_RELATION:
+		case RELKIND_FOREIGN_TABLE:
+		case RELKIND_MATVIEW:
+		case RELKIND_PARTITIONED_TABLE:
+			break;
+		default:
+			ereport(WARNING,
+					errmsg("could not import statistics for foreign table \"%s.%s\" --- remote table \"%s.%s\" is of relkind \"%c\" which cannot have statistics",
+						   schemaname, relname,
+						   remote_schemaname, remote_relname, relkind));
+			goto fetch_cleanup;
+	}
+
+	/*
+	 * If the reltuples value > 0, then then we can expect to find attribute
+	 * stats for the table.
+	 *
+	 * A reltuples value of -1 means the table has never been analyzed (v14+).
+	 *
+	 * In versions prior to v14, a value of 0 was ambiguous, it could mean
+	 * that the table had never been analyzed, or that it was empty at the
+	 * time that it was analyzed. Either way, we wouldn't expect to find
+	 * attstats for the relation.
+	 */
+	reltuples = strtod(PQgetvalue(relstats, 0, RELSTATS_RELTUPLES), NULL);
+	if (((server_version_num < 140000) && (reltuples == 0)) ||
+		((server_version_num >= 140000) && (reltuples == -1)))
+	{
+		ereport(WARNING,
+				errmsg("could not import statistics for foreign table \"%s.%s\" --- remote table \"%s.%s\" has no relation statistics to import",
+					   schemaname, relname,
+					   remote_schemaname, remote_relname));
+		goto fetch_cleanup;
+	}
+
+	if (reltuples > 0)
+	{
+		attstats = fetch_attstats(conn, server_version_num, remote_schemaname,
+								  remote_relname, column_list);
+
+		/* Reject the stats if any are missing or in excess. */
+		if (!match_attrmap(attstats, schemaname, relname,
+						   remote_schemaname, remote_relname,
+						   natts, remattrmap))
+			goto fetch_cleanup;
+	}
+
+	remstats->rel = relstats;
+	remstats->att = attstats;
+	return true;
+
+fetch_cleanup:
+	PQclear(attstats);
+	PQclear(relstats);
+	return false;
+}
+
+/*
+ * Compare two RemoteAttributeMappings for sorting.
+ */
+static int
+remattrmap_cmp(const void *v1, const void *v2)
+{
+	const		RemoteAttributeMapping *r1 = v1;
+	const		RemoteAttributeMapping *r2 = v2;
+
+	return strncmp(r1->remote_attname, r2->remote_attname, NAMEDATALEN);
+}
+
+/*
+ * postgresImportStatistics
+ * 		Attempt to fetch remote statistics and apply those instead of analyzing.
+ */
+static bool
+postgresImportStatistics(Relation relation, List *va_cols, int elevel)
+{
+	ForeignTable *table;
+	ForeignServer *server;
+	UserMapping *user;
+	PGconn	   *conn;
+	int			server_version_num;
+	const char *schemaname = NULL;
+	const char *relname = NULL;
+	const char *remote_schemaname = NULL;
+	const char *remote_relname = NULL;
+	TupleDesc	tupdesc;
+	RemoteStatsResults remstats = {.rel = NULL, .att = NULL};
+	RemoteAttributeMapping *remattrmap;
+	StringInfoData column_list;
+	int			natts = 0;
+	bool		fetch_stats = true;
+	bool		ok = false;
+	ListCell   *lc;
+
+	table = GetForeignTable(RelationGetRelid(relation));
+	server = GetForeignServer(table->serverid);
+
+	/*
+	 * Check whether the fetch_stats option is enabled on the foreign table.
+	 * If not, silently ignore the foreign table.
+	 *
+	 * Server-level options can be overridden by table-level options, so check
+	 * server-level first.
+	 */
+	foreach(lc, server->options)
+	{
+		DefElem    *def = (DefElem *) lfirst(lc);
+
+		if (strcmp(def->defname, "fetch_stats") == 0)
+		{
+			fetch_stats = defGetBoolean(def);
+			break;
+		}
+	}
+	foreach(lc, table->options)
+	{
+		DefElem    *def = (DefElem *) lfirst(lc);
+
+		if (strcmp(def->defname, "fetch_stats") == 0)
+		{
+			fetch_stats = defGetBoolean(def);
+			break;
+		}
+	}
+	if (!fetch_stats)
+		return false;
+
+	schemaname = get_namespace_name(RelationGetNamespace(relation));
+	relname = RelationGetRelationName(relation);
+
+	/*
+	 * We don't currently support statistics import for foreign tables with
+	 * extended statistics objects.
+	 */
+	if (HasRelationExtStatistics(relation))
+	{
+		ereport(WARNING,
+				errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+				errmsg("cannot import statistics for foreign table \"%s.%s\" --- this foreign table has extended statistics objects",
+					   schemaname, relname));
+		return false;
+	}
+
+	/*
+	 * OK, let's do it.
+	 */
+	ereport(elevel,
+			(errmsg("importing statistics for foreign table \"%s.%s\"",
+					schemaname, relname)));
+
+	/*
+	 * Get connection to the foreign server.  Connection manager will
+	 * establish new connection if necessary.
+	 */
+	user = GetUserMapping(GetUserId(), table->serverid);
+	conn = GetConnection(user, false, NULL);
+	server_version_num = PQserverVersion(conn);
+
+	/*
+	 * Assume the remote schema/relation names are the same as the local name
+	 * unless the foreign table's options tell us otherwise.
+	 */
+	remote_schemaname = schemaname;
+	remote_relname = relname;
+	foreach(lc, table->options)
+	{
+		DefElem    *def = (DefElem *) lfirst(lc);
+
+		if (strcmp(def->defname, "schema_name") == 0)
+			remote_schemaname = defGetString(def);
+		else if (strcmp(def->defname, "table_name") == 0)
+			remote_relname = defGetString(def);
+	}
+
+	/*
+	 * Build local-attnum/remote-attname map and column list.
+	 */
+	tupdesc = RelationGetDescr(relation);
+	remattrmap = palloc_array(RemoteAttributeMapping, tupdesc->natts);
+	initStringInfo(&column_list);
+	appendStringInfoChar(&column_list, '{');
+	for (int i = 0; i < tupdesc->natts; i++)
+	{
+		Form_pg_attribute attr = TupleDescAttr(tupdesc, i);
+		char	   *attname = NameStr(attr->attname);
+		AttrNumber	attnum = attr->attnum;
+		char	   *remote_attname;
+		List	   *fc_options;
+
+		/* If a list is specified, exclude any attnames not in it. */
+		if (!attname_in_list(attname, va_cols))
+			continue;
+
+		if (!attribute_is_analyzable(relation, attnum, attr, NULL))
+			continue;
+
+		/* If column_name is not specified, go with attname. */
+		remote_attname = attname;
+		fc_options = GetForeignColumnOptions(RelationGetRelid(relation), attnum);
+		foreach(lc, fc_options)
+		{
+			DefElem    *def = (DefElem *) lfirst(lc);
+
+			if (strcmp(def->defname, "column_name") == 0)
+			{
+				remote_attname = defGetString(def);
+				break;
+			}
+		}
+
+		if (i > 0)
+			appendStringInfoChar(&column_list, ',');
+		appendStringInfoString(&column_list, quote_identifier(remote_attname));
+
+		remattrmap[natts].local_attnum = attnum;
+		strncpy(remattrmap[natts].local_attname, attname, NAMEDATALEN);
+		strncpy(remattrmap[natts].remote_attname, remote_attname, NAMEDATALEN);
+		remattrmap[natts].res_index = -1;
+		natts++;
+	}
+	appendStringInfoChar(&column_list, '}');
+
+	/* Sort mapping by remote attribute name. */
+	qsort(remattrmap, natts, sizeof(RemoteAttributeMapping), remattrmap_cmp);
+
+	ok = fetch_remote_statistics(conn, schemaname, relname,
+								 remote_schemaname, remote_relname,
+								 server_version_num, natts, remattrmap,
+								 column_list.data, &remstats);
+
+	if (ok)
+		ok = import_fetched_statistics(schemaname, relname, server_version_num,
+									   natts, remattrmap, &remstats);
+
+	if (ok)
+		ereport(elevel,
+				(errmsg("finished importing statistics for foreign table \"%s.%s\"",
+						schemaname, relname)));
+
+	pfree(remattrmap);
+	PQclear(remstats.rel);
+	PQclear(remstats.att);
+	ReleaseConnection(conn);
+
+	return ok;
+}
+
 /*
  * postgresGetAnalyzeInfoForForeignTable
  *		Count tuples in foreign table (just get pg_class.reltuples).
diff --git a/contrib/postgres_fdw/sql/postgres_fdw.sql b/contrib/postgres_fdw/sql/postgres_fdw.sql
index 49ed797e8ef..a0f52e57dbc 100644
--- a/contrib/postgres_fdw/sql/postgres_fdw.sql
+++ b/contrib/postgres_fdw/sql/postgres_fdw.sql
@@ -241,6 +241,7 @@ SELECT c3, c4 FROM ft1 ORDER BY c3, c1 LIMIT 1;  -- should work again
 -- Now we should be able to run ANALYZE.
 -- To exercise multiple code paths, we use local stats on ft1
 -- and remote-estimate mode on ft2.
+ALTER SERVER loopback OPTIONS (ADD fetch_stats 'false');
 ANALYZE ft1;
 ALTER FOREIGN TABLE ft2 OPTIONS (use_remote_estimate 'true');
 
@@ -1293,7 +1294,8 @@ REINDEX TABLE CONCURRENTLY reind_fdw_parent; -- ok
 DROP TABLE reind_fdw_parent;
 
 -- ===================================================================
--- conversion error
+-- conversion error, will generate a WARNING for imported stats and an
+-- error on locally computed stats.
 -- ===================================================================
 ALTER FOREIGN TABLE ft1 ALTER COLUMN c8 TYPE int;
 SELECT * FROM ft1 ftx(x1,x2,x3,x4,x5,x6,x7,x8) WHERE x1 = 1;  -- ERROR
@@ -3917,6 +3919,11 @@ CREATE FOREIGN TABLE async_p2 PARTITION OF async_pt FOR VALUES FROM (2000) TO (3
   SERVER loopback2 OPTIONS (table_name 'base_tbl2');
 INSERT INTO async_p1 SELECT 1000 + i, i, to_char(i, 'FM0000') FROM generate_series(0, 999, 5) i;
 INSERT INTO async_p2 SELECT 2000 + i, i, to_char(i, 'FM0000') FROM generate_series(0, 999, 5) i;
+-- Will failover to sampling on async_p2 because fetch_stats = true (the default) on
+-- loopback2, and is set to false on loopback
+ANALYZE async_pt;
+-- Turning off fetch_stats at the table level for async_p2 removes the warning.
+ALTER FOREIGN TABLE async_p2 OPTIONS (ADD fetch_stats 'false');
 ANALYZE async_pt;
 
 -- simple queries
@@ -3954,6 +3961,10 @@ CREATE TABLE base_tbl3 (a int, b int, c text);
 CREATE FOREIGN TABLE async_p3 PARTITION OF async_pt FOR VALUES FROM (3000) TO (4000)
   SERVER loopback2 OPTIONS (table_name 'base_tbl3');
 INSERT INTO async_p3 SELECT 3000 + i, i, to_char(i, 'FM0000') FROM generate_series(0, 999, 5) i;
+-- Will fail because fetch_stats = true (the default) on async_p3/loopback2
+ANALYZE async_pt;
+-- Turn off fetch_stats at the server level.
+ALTER SERVER loopback2 OPTIONS (ADD fetch_stats 'false');
 ANALYZE async_pt;
 
 EXPLAIN (VERBOSE, COSTS OFF)
diff --git a/doc/src/sgml/postgres-fdw.sgml b/doc/src/sgml/postgres-fdw.sgml
index de69ddcdebc..d7c0dc8ed14 100644
--- a/doc/src/sgml/postgres-fdw.sgml
+++ b/doc/src/sgml/postgres-fdw.sgml
@@ -332,7 +332,7 @@ OPTIONS (ADD password_required 'false');
    </para>
 
    <para>
-    The following option controls how such an <command>ANALYZE</command>
+    The following options control how such an <command>ANALYZE</command>
     operation behaves:
    </para>
 
@@ -364,6 +364,29 @@ OPTIONS (ADD password_required 'false');
      </listitem>
     </varlistentry>
 
+    <varlistentry>
+     <term><literal>fetch_stats</literal> (<type>boolean</type>)</term>
+     <listitem>
+      <para>
+       This option, which can be specified for a foreign table or a foreign
+       server, determines if <command>ANALYZE</command> on a foreign table
+       will instead attempt to fetch the existing relation and attribute
+       statistics from the remote table, and if all of the attributes being
+       analyzed have statistics in the remote table, then it will import
+       those statistics directly using
+       <function>pg_restore_relation_stats</function> and
+       <function>pg_restore_attribute_stats</function>. This option is only
+       useful if the remote relation is one that can have regular statistics
+       (tables and materialized views). If the foreign table is a partition
+       of a partitioned table, analyzing the partitioned table will still
+       result in row sampling on the foreign table regardless of this setting,
+       though direct analysis of the foreign table would have attempted to
+       fetch remote statistics first.
+       The default is <literal>true</literal>.
+      </para>
+     </listitem>
+    </varlistentry>
+
    </variablelist>
 
   </sect3>
diff --git a/src/backend/commands/analyze.c b/src/backend/commands/analyze.c
index eeed91be266..417a510e007 100644
--- a/src/backend/commands/analyze.c
+++ b/src/backend/commands/analyze.c
@@ -83,6 +83,7 @@ static void compute_index_stats(Relation onerel, double totalrows,
 								AnlIndexData *indexdata, int nindexes,
 								HeapTuple *rows, int numrows,
 								MemoryContext col_context);
+static void validate_va_cols_list(Relation onerel, List *va_cols);
 static VacAttrStats *examine_attribute(Relation onerel, int attnum,
 									   Node *index_expr);
 static int	acquire_sample_rows(Relation onerel, int elevel,
@@ -114,6 +115,7 @@ analyze_rel(Oid relid, RangeVar *relation,
 	int			elevel;
 	AcquireSampleRowsFunc acquirefunc = NULL;
 	BlockNumber relpages = 0;
+	bool		stats_imported = false;
 
 	/* Select logging level */
 	if (params.options & VACOPT_VERBOSE)
@@ -182,6 +184,27 @@ analyze_rel(Oid relid, RangeVar *relation,
 		return;
 	}
 
+	/*
+	 * Check the given list of columns
+	 */
+	if (va_cols != NIL)
+		validate_va_cols_list(onerel, va_cols);
+
+	/*
+	 * Initialize progress reporting before setup for regular/foreign tables.
+	 * (For the former, the time spent on it would be negligible, but for the
+	 * latter, if FDWs support statistics import or analysis, they'd do some
+	 * work that needs the remote access, so the time can be non-negligible.)
+	 */
+	pgstat_progress_start_command(PROGRESS_COMMAND_ANALYZE,
+								  RelationGetRelid(onerel));
+	if (AmAutoVacuumWorkerProcess())
+		pgstat_progress_update_param(PROGRESS_ANALYZE_STARTED_BY,
+									 PROGRESS_ANALYZE_STARTED_BY_AUTOVACUUM);
+	else
+		pgstat_progress_update_param(PROGRESS_ANALYZE_STARTED_BY,
+									 PROGRESS_ANALYZE_STARTED_BY_MANUAL);
+
 	/*
 	 * Check that it's of an analyzable relkind, and set up appropriately.
 	 */
@@ -196,26 +219,33 @@ analyze_rel(Oid relid, RangeVar *relation,
 	else if (onerel->rd_rel->relkind == RELKIND_FOREIGN_TABLE)
 	{
 		/*
-		 * For a foreign table, call the FDW's hook function to see whether it
-		 * supports analysis.
+		 * For a foreign table, call the FDW's hook functions to see whether
+		 * it supports statistics import or analysis.
 		 */
 		FdwRoutine *fdwroutine;
-		bool		ok = false;
 
 		fdwroutine = GetFdwRoutineForRelation(onerel, false);
 
-		if (fdwroutine->AnalyzeForeignTable != NULL)
-			ok = fdwroutine->AnalyzeForeignTable(onerel,
-												 &acquirefunc,
-												 &relpages);
-
-		if (!ok)
+		if (fdwroutine->ImportStatistics != NULL &&
+			fdwroutine->ImportStatistics(onerel, va_cols, elevel))
+			stats_imported = true;
+		else
 		{
-			ereport(WARNING,
-					(errmsg("skipping \"%s\" --- cannot analyze this foreign table",
-							RelationGetRelationName(onerel))));
-			relation_close(onerel, ShareUpdateExclusiveLock);
-			return;
+			bool		ok = false;
+
+			if (fdwroutine->AnalyzeForeignTable != NULL)
+				ok = fdwroutine->AnalyzeForeignTable(onerel,
+													 &acquirefunc,
+													 &relpages);
+
+			if (!ok)
+			{
+				ereport(WARNING,
+						errmsg("skipping \"%s\" -- cannot analyze this foreign table.",
+							   RelationGetRelationName(onerel)));
+				relation_close(onerel, ShareUpdateExclusiveLock);
+				goto out;
+			}
 		}
 	}
 	else if (onerel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
@@ -232,26 +262,16 @@ analyze_rel(Oid relid, RangeVar *relation,
 					(errmsg("skipping \"%s\" --- cannot analyze non-tables or special system tables",
 							RelationGetRelationName(onerel))));
 		relation_close(onerel, ShareUpdateExclusiveLock);
-		return;
+		goto out;
 	}
 
-	/*
-	 * OK, let's do it.  First, initialize progress reporting.
-	 */
-	pgstat_progress_start_command(PROGRESS_COMMAND_ANALYZE,
-								  RelationGetRelid(onerel));
-	if (AmAutoVacuumWorkerProcess())
-		pgstat_progress_update_param(PROGRESS_ANALYZE_STARTED_BY,
-									 PROGRESS_ANALYZE_STARTED_BY_AUTOVACUUM);
-	else
-		pgstat_progress_update_param(PROGRESS_ANALYZE_STARTED_BY,
-									 PROGRESS_ANALYZE_STARTED_BY_MANUAL);
-
 	/*
 	 * Do the normal non-recursive ANALYZE.  We can skip this for partitioned
-	 * tables, which don't contain any rows.
+	 * tables, which don't contain any rows, and foreign tables that
+	 * successfully imported statistics.
 	 */
-	if (onerel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE)
+	if ((onerel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE)
+		&& !stats_imported)
 		do_analyze_rel(onerel, params, va_cols, acquirefunc,
 					   relpages, false, in_outer_xact, elevel);
 
@@ -270,6 +290,7 @@ analyze_rel(Oid relid, RangeVar *relation,
 	 */
 	relation_close(onerel, NoLock);
 
+out:
 	pgstat_progress_end_command();
 }
 
@@ -368,16 +389,10 @@ do_analyze_rel(Relation onerel, const VacuumParams params,
 	starttime = GetCurrentTimestamp();
 
 	/*
-	 * Determine which columns to analyze
-	 *
-	 * Note that system attributes are never analyzed, so we just reject them
-	 * at the lookup stage.  We also reject duplicate column mentions.  (We
-	 * could alternatively ignore duplicates, but analyzing a column twice
-	 * won't work; we'd end up making a conflicting update in pg_statistic.)
+	 * Determine which columns to analyze.
 	 */
 	if (va_cols != NIL)
 	{
-		Bitmapset  *unique_cols = NULL;
 		ListCell   *le;
 
 		vacattrstats = (VacAttrStats **) palloc(list_length(va_cols) *
@@ -388,18 +403,7 @@ do_analyze_rel(Relation onerel, const VacuumParams params,
 			char	   *col = strVal(lfirst(le));
 
 			i = attnameAttNum(onerel, col, false);
-			if (i == InvalidAttrNumber)
-				ereport(ERROR,
-						(errcode(ERRCODE_UNDEFINED_COLUMN),
-						 errmsg("column \"%s\" of relation \"%s\" does not exist",
-								col, RelationGetRelationName(onerel))));
-			if (bms_is_member(i, unique_cols))
-				ereport(ERROR,
-						(errcode(ERRCODE_DUPLICATE_COLUMN),
-						 errmsg("column \"%s\" of relation \"%s\" appears more than once",
-								col, RelationGetRelationName(onerel))));
-			unique_cols = bms_add_member(unique_cols, i);
-
+			Assert(i != InvalidAttrNumber);
 			vacattrstats[tcnt] = examine_attribute(onerel, i, NULL);
 			if (vacattrstats[tcnt] != NULL)
 				tcnt++;
@@ -1030,6 +1034,40 @@ compute_index_stats(Relation onerel, double totalrows,
 	MemoryContextDelete(ind_context);
 }
 
+/*
+ * validate_va_cols_list -- validate the columns list given to analyze_rel
+ *
+ * Note that system attributes are never analyzed, so we just reject them at
+ * the lookup stage.  We also reject duplicate column mentions.  (We could
+ * alternatively ignore duplicates, but analyzing a column twice won't work;
+ * we'd end up making a conflicting update in pg_statistic.)
+ */
+static void
+validate_va_cols_list(Relation onerel, List *va_cols)
+{
+	Bitmapset  *unique_cols = NULL;
+	ListCell   *le;
+
+	Assert(va_cols != NIL);
+	foreach(le, va_cols)
+	{
+		char	   *col = strVal(lfirst(le));
+		int			i = attnameAttNum(onerel, col, false);
+
+		if (i == InvalidAttrNumber)
+			ereport(ERROR,
+					(errcode(ERRCODE_UNDEFINED_COLUMN),
+					 errmsg("column \"%s\" of relation \"%s\" does not exist",
+							col, RelationGetRelationName(onerel))));
+		if (bms_is_member(i, unique_cols))
+			ereport(ERROR,
+					(errcode(ERRCODE_DUPLICATE_COLUMN),
+					 errmsg("column \"%s\" of relation \"%s\" appears more than once",
+							col, RelationGetRelationName(onerel))));
+		unique_cols = bms_add_member(unique_cols, i);
+	}
+}
+
 /*
  * examine_attribute -- pre-analysis of a single column
  *
@@ -1044,37 +1082,15 @@ examine_attribute(Relation onerel, int attnum, Node *index_expr)
 {
 	Form_pg_attribute attr = TupleDescAttr(onerel->rd_att, attnum - 1);
 	int			attstattarget;
-	HeapTuple	atttuple;
-	Datum		dat;
-	bool		isnull;
 	HeapTuple	typtuple;
 	VacAttrStats *stats;
 	int			i;
 	bool		ok;
 
-	/* Never analyze dropped columns */
-	if (attr->attisdropped)
-		return NULL;
-
-	/* Don't analyze virtual generated columns */
-	if (attr->attgenerated == ATTRIBUTE_GENERATED_VIRTUAL)
-		return NULL;
-
 	/*
-	 * Get attstattarget value.  Set to -1 if null.  (Analyze functions expect
-	 * -1 to mean use default_statistics_target; see for example
-	 * std_typanalyze.)
+	 * Check if the column is analyzable.
 	 */
-	atttuple = SearchSysCache2(ATTNUM, ObjectIdGetDatum(RelationGetRelid(onerel)), Int16GetDatum(attnum));
-	if (!HeapTupleIsValid(atttuple))
-		elog(ERROR, "cache lookup failed for attribute %d of relation %u",
-			 attnum, RelationGetRelid(onerel));
-	dat = SysCacheGetAttr(ATTNUM, atttuple, Anum_pg_attribute_attstattarget, &isnull);
-	attstattarget = isnull ? -1 : DatumGetInt16(dat);
-	ReleaseSysCache(atttuple);
-
-	/* Don't analyze column if user has specified not to */
-	if (attstattarget == 0)
+	if (!attribute_is_analyzable(onerel, attnum, attr, &attstattarget))
 		return NULL;
 
 	/*
@@ -1155,6 +1171,45 @@ examine_attribute(Relation onerel, int attnum, Node *index_expr)
 	return stats;
 }
 
+bool
+attribute_is_analyzable(Relation onerel, int attnum, Form_pg_attribute attr,
+						int *p_attstattarget)
+{
+	int			attstattarget;
+	HeapTuple	atttuple;
+	Datum		dat;
+	bool		isnull;
+
+	/* Never analyze dropped columns */
+	if (attr->attisdropped)
+		return false;
+
+	/* Don't analyze virtual generated columns */
+	if (attr->attgenerated == ATTRIBUTE_GENERATED_VIRTUAL)
+		return false;
+
+	/*
+	 * Get attstattarget value.  Set to -1 if null.  (Analyze functions expect
+	 * -1 to mean use default_statistics_target; see for example
+	 * std_typanalyze.)
+	 */
+	atttuple = SearchSysCache2(ATTNUM, ObjectIdGetDatum(RelationGetRelid(onerel)), Int16GetDatum(attnum));
+	if (!HeapTupleIsValid(atttuple))
+		elog(ERROR, "cache lookup failed for attribute %d of relation %u",
+			 attnum, RelationGetRelid(onerel));
+	dat = SysCacheGetAttr(ATTNUM, atttuple, Anum_pg_attribute_attstattarget, &isnull);
+	attstattarget = isnull ? -1 : DatumGetInt16(dat);
+	ReleaseSysCache(atttuple);
+
+	/* Don't analyze column if user has specified not to */
+	if (attstattarget == 0)
+		return false;
+
+	if (p_attstattarget)
+		*p_attstattarget = attstattarget;
+	return true;
+}
+
 /*
  * Read stream callback returning the next BlockNumber as chosen by the
  * BlockSampling algorithm.
diff --git a/src/backend/statistics/extended_stats.c b/src/backend/statistics/extended_stats.c
index 334c6498581..a546f377f44 100644
--- a/src/backend/statistics/extended_stats.c
+++ b/src/backend/statistics/extended_stats.c
@@ -245,6 +245,40 @@ BuildRelationExtStatistics(Relation onerel, bool inh, double totalrows,
 	table_close(pg_stext, RowExclusiveLock);
 }
 
+/*
+ * Test if the given relation has extended statistics objects.
+ */
+bool
+HasRelationExtStatistics(Relation onerel)
+{
+	Relation	pg_statext;
+	SysScanDesc scan;
+	ScanKeyData skey;
+	bool		found;
+
+	pg_statext = table_open(StatisticExtRelationId, RowExclusiveLock);
+
+	/*
+	 * Prepare to scan pg_statistic_ext for entries having stxrelid = this
+	 * rel.
+	 */
+	ScanKeyInit(&skey,
+				Anum_pg_statistic_ext_stxrelid,
+				BTEqualStrategyNumber, F_OIDEQ,
+				ObjectIdGetDatum(RelationGetRelid(onerel)));
+
+	scan = systable_beginscan(pg_statext, StatisticExtRelidIndexId, true,
+							  NULL, 1, &skey);
+
+	found = HeapTupleIsValid(systable_getnext(scan));
+
+	systable_endscan(scan);
+
+	table_close(pg_statext, RowExclusiveLock);
+
+	return found;
+}
+
 /*
  * ComputeExtStatisticsRows
  *		Compute number of rows required by extended statistics on a table.
diff --git a/src/include/commands/vacuum.h b/src/include/commands/vacuum.h
index 1f45bca015c..f34c6bf9d5e 100644
--- a/src/include/commands/vacuum.h
+++ b/src/include/commands/vacuum.h
@@ -428,6 +428,8 @@ extern void parallel_vacuum_main(dsm_segment *seg, shm_toc *toc);
 extern void analyze_rel(Oid relid, RangeVar *relation,
 						const VacuumParams params, List *va_cols, bool in_outer_xact,
 						BufferAccessStrategy bstrategy);
+extern bool attribute_is_analyzable(Relation onerel, int attnum, Form_pg_attribute attr,
+									int *p_attstattarget);
 extern bool std_typanalyze(VacAttrStats *stats);
 
 /* in utils/misc/sampling.c --- duplicate of declarations in utils/sampling.h */
diff --git a/src/include/foreign/fdwapi.h b/src/include/foreign/fdwapi.h
index 96b6f692d2a..ad0107b2549 100644
--- a/src/include/foreign/fdwapi.h
+++ b/src/include/foreign/fdwapi.h
@@ -157,6 +157,10 @@ typedef bool (*AnalyzeForeignTable_function) (Relation relation,
 											  AcquireSampleRowsFunc *func,
 											  BlockNumber *totalpages);
 
+typedef bool (*ImportStatistics_function) (Relation relation,
+										   List *va_cols,
+										   int elevel);
+
 typedef List *(*ImportForeignSchema_function) (ImportForeignSchemaStmt *stmt,
 											   Oid serverOid);
 
@@ -255,6 +259,7 @@ typedef struct FdwRoutine
 
 	/* Support functions for ANALYZE */
 	AnalyzeForeignTable_function AnalyzeForeignTable;
+	ImportStatistics_function ImportStatistics;
 
 	/* Support functions for IMPORT FOREIGN SCHEMA */
 	ImportForeignSchema_function ImportForeignSchema;
diff --git a/src/include/statistics/statistics.h b/src/include/statistics/statistics.h
index 38a56f6ccb3..8f9b9d237fd 100644
--- a/src/include/statistics/statistics.h
+++ b/src/include/statistics/statistics.h
@@ -101,6 +101,7 @@ extern MCVList *statext_mcv_load(Oid mvoid, bool inh);
 extern void BuildRelationExtStatistics(Relation onerel, bool inh, double totalrows,
 									   int numrows, HeapTuple *rows,
 									   int natts, VacAttrStats **vacattrstats);
+extern bool HasRelationExtStatistics(Relation onerel);
 extern int	ComputeExtStatisticsRows(Relation onerel,
 									 int natts, VacAttrStats **vacattrstats);
 extern bool statext_is_kind_built(HeapTuple htup, char type);
