On 2/12/22 01:34, Tomas Vondra wrote:
> On 2/10/22 19:17, Tomas Vondra wrote:
>> I've polished & pushed the first part adding sequence decoding
>> infrastructure etc. Attached are the two remaining parts.
>>
>> I plan to wait a day or two and then push the test_decoding part. The
>> last part (for built-in replication) will need more work and maybe
>> rethinking the grammar etc.
>>
> 
> I've pushed the second part, adding sequences to test_decoding.
> 
> Here's the remaining part, rebased, with a small tweak in the TAP test
> to eliminate the issue with not waiting for sequence increments. I've
> kept the tweak in a separate patch, so that we can throw it away easily
> if we happen to resolve the issue.
> 

Hmm, cfbot was not happy about this, so here's a version fixing the
elog() format issue reported by CirrusCI/mingw by ditching the log
message. It was useful for debugging, but otherwise just noise.

I'm a bit puzzled about the macOS failure, though. It seems as if the
test does not wait for the subscriber long enough, but this is with the
tweaked test variant, so it should not have the rollback issue. And I
haven't seen this failure on any other machine.

Regarding adding decoding of sequences to the built-in replication,
there is a couple questions that we need to discuss first before
cleaning up the code etc. Most of them are related to syntax and
handling of various sequence variants.


1) Firstly, what about implicit sequences. That is, if you create a
table with SERIAL or BIGSERIAL column, that'll have a sequence attached.
Should those sequences be added to the publication when the table gets
added? Or should we require adding them separately? Or should that be
specified in the grammar, somehow? Should we have INCLUDING SEQUENCES
for ALTER PUBLICATION ... ADD TABLE ...?

I think we shouldn't require replicating the sequence, because who knows
what the schema is on the subscriber? We want to allow differences, so
maybe the sequence is not there. I'd start with just adding them
separately, because that just seems simpler, but maybe there are good
reasons to support adding them in ADD TABLE.


2) Should it be possible to add sequences that are also associated with
a serial column, without the table being replicated too? I'd say yes, if
people want to do that - I don't think it can cause any issues, and it's
possible to just use sequence directly for non-serial columns anyway.
Which is the same thing, but we can't detect that.


3) What about sequences for UNLOGGED tables? At the moment we don't
allow sequences to be UNLOGGED (Peter revived his patch [1], but that's
not committed yet). Again, I'd say it's up to the user to decide which
sequences are replicated - it's similar to (2).


4) I wonder if we actually want FOR ALL SEQUENCES. On the one hand it'd
be symmetrical with FOR ALL TABLES, which is the other object type we
can replicate. So it'd seem reasonable to handle them in a similar way.
But it's causing some shift/reduce error in the grammar, so it'll need
some changes.



regards


[1]
https://www.postgresql.org/message-id/8da92c1f-9117-41bc-731b-ce1477a77...@enterprisedb.com

-- 
Tomas Vondra
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company
From 960c90c6849e27eabe09dd3582b6f844c5d5a6a5 Mon Sep 17 00:00:00 2001
From: Tomas Vondra <tomas.von...@postgresql.org>
Date: Thu, 10 Feb 2022 15:18:59 +0100
Subject: [PATCH 1/2] Add support for decoding sequences to built-in
 replication

---
 doc/src/sgml/catalogs.sgml                  |  71 ++++
 doc/src/sgml/ref/alter_publication.sgml     |  24 +-
 doc/src/sgml/ref/alter_subscription.sgml    |   4 +-
 src/backend/catalog/pg_publication.c        | 149 ++++++++-
 src/backend/catalog/system_views.sql        |  10 +
 src/backend/commands/publicationcmds.c      | 350 +++++++++++++++++++-
 src/backend/commands/sequence.c             |  79 +++++
 src/backend/commands/subscriptioncmds.c     | 272 +++++++++++++++
 src/backend/executor/execReplication.c      |   2 +-
 src/backend/nodes/copyfuncs.c               |   1 +
 src/backend/nodes/equalfuncs.c              |   1 +
 src/backend/parser/gram.y                   |  32 ++
 src/backend/replication/logical/proto.c     |  52 +++
 src/backend/replication/logical/tablesync.c | 114 ++++++-
 src/backend/replication/logical/worker.c    |  60 ++++
 src/backend/replication/pgoutput/pgoutput.c |  85 ++++-
 src/backend/utils/cache/relcache.c          |   4 +-
 src/bin/psql/tab-complete.c                 |  14 +-
 src/include/catalog/pg_proc.dat             |   5 +
 src/include/catalog/pg_publication.h        |  14 +
 src/include/commands/sequence.h             |   1 +
 src/include/nodes/parsenodes.h              |   6 +
 src/include/replication/logicalproto.h      |  19 ++
 src/include/replication/pgoutput.h          |   1 +
 src/test/regress/expected/rules.out         |   8 +
 src/test/subscription/t/028_sequences.pl    | 196 +++++++++++
 26 files changed, 1538 insertions(+), 36 deletions(-)
 create mode 100644 src/test/subscription/t/028_sequences.pl

diff --git a/doc/src/sgml/catalogs.sgml b/doc/src/sgml/catalogs.sgml
index 879d2dbce03..271dc03e5a2 100644
--- a/doc/src/sgml/catalogs.sgml
+++ b/doc/src/sgml/catalogs.sgml
@@ -9540,6 +9540,11 @@ SCRAM-SHA-256$<replaceable>&lt;iteration count&gt;</replaceable>:<replaceable>&l
       <entry>prepared transactions</entry>
      </row>
 
+     <row>
+      <entry><link linkend="view-pg-publication-sequences"><structname>pg_publication_sequences</structname></link></entry>
+      <entry>publications and their associated sequences</entry>
+     </row>
+
      <row>
       <entry><link linkend="view-pg-publication-tables"><structname>pg_publication_tables</structname></link></entry>
       <entry>publications and their associated tables</entry>
@@ -11375,6 +11380,72 @@ SELECT * FROM pg_locks pl LEFT JOIN pg_prepared_xacts ppx
 
  </sect1>
 
+ <sect1 id="view-pg-publication-sequences">
+  <title><structname>pg_publication_sequences</structname></title>
+
+  <indexterm zone="view-pg-publication-sequences">
+   <primary>pg_publication_sequences</primary>
+  </indexterm>
+
+  <para>
+   The view <structname>pg_publication_sequences</structname> provides
+   information about the mapping between publications and the sequences they
+   contain.  Unlike the underlying catalog
+   <link linkend="catalog-pg-publication-rel"><structname>pg_publication_rel</structname></link>,
+   this view expands
+   publications defined as <literal>FOR ALL SEQUENCES</literal>, so for such
+   publications there will be a row for each eligible sequence.
+  </para>
+
+  <table>
+   <title><structname>pg_publication_sequences</structname> Columns</title>
+   <tgroup cols="1">
+    <thead>
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       Column Type
+      </para>
+      <para>
+       Description
+      </para></entry>
+     </row>
+    </thead>
+
+    <tbody>
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>pubname</structfield> <type>name</type>
+       (references <link linkend="catalog-pg-publication"><structname>pg_publication</structname></link>.<structfield>pubname</structfield>)
+      </para>
+      <para>
+       Name of publication
+      </para></entry>
+     </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>schemaname</structfield> <type>name</type>
+       (references <link linkend="catalog-pg-namespace"><structname>pg_namespace</structname></link>.<structfield>nspname</structfield>)
+      </para>
+      <para>
+       Name of schema containing sequence
+      </para></entry>
+     </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>sequencename</structfield> <type>name</type>
+       (references <link linkend="catalog-pg-class"><structname>pg_class</structname></link>.<structfield>relname</structfield>)
+      </para>
+      <para>
+       Name of sequence
+      </para></entry>
+     </row>
+    </tbody>
+   </tgroup>
+  </table>
+ </sect1>
+
  <sect1 id="view-pg-publication-tables">
   <title><structname>pg_publication_tables</structname></title>
 
diff --git a/doc/src/sgml/ref/alter_publication.sgml b/doc/src/sgml/ref/alter_publication.sgml
index 7c7c27bf7ce..9da8274ae2c 100644
--- a/doc/src/sgml/ref/alter_publication.sgml
+++ b/doc/src/sgml/ref/alter_publication.sgml
@@ -31,7 +31,9 @@ ALTER PUBLICATION <replaceable class="parameter">name</replaceable> RENAME TO <r
 <phrase>where <replaceable class="parameter">publication_object</replaceable> is one of:</phrase>
 
     TABLE [ ONLY ] <replaceable class="parameter">table_name</replaceable> [ * ] [, ... ]
+    SEQUENCE <replaceable class="parameter">sequence_name</replaceable> [ * ] [, ... ]
     ALL TABLES IN SCHEMA { <replaceable class="parameter">schema_name</replaceable> | CURRENT_SCHEMA } [, ... ]
+    ALL SEQUENCES IN SCHEMA { <replaceable class="parameter">schema_name</replaceable> | CURRENT_SCHEMA } [, ... ]
 </synopsis>
  </refsynopsisdiv>
 
@@ -56,7 +58,18 @@ ALTER PUBLICATION <replaceable class="parameter">name</replaceable> RENAME TO <r
   </para>
 
   <para>
-   The fourth variant of this command listed in the synopsis can change
+   The next three variants change which sequences are part of the publication.
+   The <literal>SET SEQUENCE</literal> clause will replace the list of sequences
+   in the publication with the specified one.  The <literal>ADD SEQUENCE</literal>
+   and <literal>DROP SEQUENCE</literal> clauses will add and remove one or more
+   sequences from the publication.  Note that adding sequences to a publication
+   that is already subscribed to will require a <literal>ALTER SUBSCRIPTION
+   ... REFRESH PUBLICATION</literal> action on the subscribing side in order
+   to become effective.
+  </para>
+
+  <para>
+   The seventh variant of this command listed in the synopsis can change
    all of the publication properties specified in
    <xref linkend="sql-createpublication"/>.  Properties not mentioned in the
    command retain their previous settings.
@@ -123,6 +136,15 @@ ALTER PUBLICATION <replaceable class="parameter">name</replaceable> RENAME TO <r
     </listitem>
    </varlistentry>
 
+   <varlistentry>
+    <term><replaceable class="parameter">sequence_name</replaceable></term>
+    <listitem>
+     <para>
+      Name of an existing sequence.
+     </para>
+    </listitem>
+   </varlistentry>
+
    <varlistentry>
     <term><literal>SET ( <replaceable class="parameter">publication_parameter</replaceable> [= <replaceable class="parameter">value</replaceable>] [, ... ] )</literal></term>
     <listitem>
diff --git a/doc/src/sgml/ref/alter_subscription.sgml b/doc/src/sgml/ref/alter_subscription.sgml
index 0b027cc3462..8f28cf03f40 100644
--- a/doc/src/sgml/ref/alter_subscription.sgml
+++ b/doc/src/sgml/ref/alter_subscription.sgml
@@ -147,7 +147,7 @@ ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> RENAME TO <
     <listitem>
      <para>
       Fetch missing table information from publisher.  This will start
-      replication of tables that were added to the subscribed-to publications
+      replication of tables and sequences that were added to the subscribed-to publications
       since <command>CREATE SUBSCRIPTION</command> or
       the last invocation of <command>REFRESH PUBLICATION</command>.
      </para>
@@ -164,7 +164,7 @@ ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> RENAME TO <
           Specifies whether to copy pre-existing data in the publications
           that are being subscribed to when the replication starts.
           The default is <literal>true</literal>.  (Previously-subscribed
-          tables are not copied.)
+          tables and sequences are not copied.)
          </para>
         </listitem>
        </varlistentry>
diff --git a/src/backend/catalog/pg_publication.c b/src/backend/catalog/pg_publication.c
index e14ca2f5630..1a9e05ba98b 100644
--- a/src/backend/catalog/pg_publication.c
+++ b/src/backend/catalog/pg_publication.c
@@ -54,7 +54,8 @@ check_publication_add_relation(Relation targetrel)
 {
 	/* Must be a regular or partitioned table */
 	if (RelationGetForm(targetrel)->relkind != RELKIND_RELATION &&
-		RelationGetForm(targetrel)->relkind != RELKIND_PARTITIONED_TABLE)
+		RelationGetForm(targetrel)->relkind != RELKIND_PARTITIONED_TABLE &&
+		RelationGetForm(targetrel)->relkind != RELKIND_SEQUENCE)
 		ereport(ERROR,
 				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
 				 errmsg("cannot add relation \"%s\" to publication",
@@ -131,7 +132,8 @@ static bool
 is_publishable_class(Oid relid, Form_pg_class reltuple)
 {
 	return (reltuple->relkind == RELKIND_RELATION ||
-			reltuple->relkind == RELKIND_PARTITIONED_TABLE) &&
+			reltuple->relkind == RELKIND_PARTITIONED_TABLE ||
+			reltuple->relkind == RELKIND_SEQUENCE) &&
 		!IsCatalogRelationOid(relid) &&
 		reltuple->relpersistence == RELPERSISTENCE_PERMANENT &&
 		relid >= FirstNormalObjectId;
@@ -503,6 +505,11 @@ GetPublicationRelations(Oid pubid, PublicationPartOpt pub_partopt)
 		Form_pg_publication_rel pubrel;
 
 		pubrel = (Form_pg_publication_rel) GETSTRUCT(tup);
+
+		/* skip sequences here */
+		if (get_rel_relkind(pubrel->prrelid) == RELKIND_SEQUENCE)
+			continue;
+
 		result = GetPubPartitionOptionRelations(result, pub_partopt,
 												pubrel->prrelid);
 	}
@@ -517,6 +524,49 @@ GetPublicationRelations(Oid pubid, PublicationPartOpt pub_partopt)
 	return result;
 }
 
+/*
+ * Gets list of relation oids for a publication (sequences only).
+ *
+ * This should only be used for normal publications, the FOR ALL TABLES
+ * should use GetAllSequencesPublicationRelations().
+ */
+List *
+GetPublicationSequenceRelations(Oid pubid)
+{
+	List	   *result;
+	Relation	pubrelsrel;
+	ScanKeyData scankey;
+	SysScanDesc scan;
+	HeapTuple	tup;
+
+	/* Find all publications associated with the relation. */
+	pubrelsrel = table_open(PublicationRelRelationId, AccessShareLock);
+
+	ScanKeyInit(&scankey,
+				Anum_pg_publication_rel_prpubid,
+				BTEqualStrategyNumber, F_OIDEQ,
+				ObjectIdGetDatum(pubid));
+
+	scan = systable_beginscan(pubrelsrel, PublicationRelPrrelidPrpubidIndexId,
+							  true, NULL, 1, &scankey);
+
+	result = NIL;
+	while (HeapTupleIsValid(tup = systable_getnext(scan)))
+	{
+		Form_pg_publication_rel pubrel;
+
+		pubrel = (Form_pg_publication_rel) GETSTRUCT(tup);
+
+		if (get_rel_relkind(pubrel->prrelid) == RELKIND_SEQUENCE)
+			result = lappend_oid(result, pubrel->prrelid);
+	}
+
+	systable_endscan(scan);
+	table_close(pubrelsrel, AccessShareLock);
+
+	return result;
+}
+
 /*
  * Gets list of publication oids for publications marked as FOR ALL TABLES.
  */
@@ -762,6 +812,46 @@ GetAllSchemaPublicationRelations(Oid pubid, PublicationPartOpt pub_partopt)
 	return result;
 }
 
+/*
+ * Gets list of all relation published by FOR ALL TABLES publication(s).
+ *
+ * If the publication publishes partition changes via their respective root
+ * partitioned tables, we must exclude partitions in favor of including the
+ * root partitioned tables.
+ */
+List *
+GetAllSequencesPublicationRelations(void)
+{
+	Relation	classRel;
+	ScanKeyData key[1];
+	TableScanDesc scan;
+	HeapTuple	tuple;
+	List	   *result = NIL;
+
+	classRel = table_open(RelationRelationId, AccessShareLock);
+
+	ScanKeyInit(&key[0],
+				Anum_pg_class_relkind,
+				BTEqualStrategyNumber, F_CHAREQ,
+				CharGetDatum(RELKIND_SEQUENCE));
+
+	scan = table_beginscan_catalog(classRel, 1, key);
+
+	while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
+	{
+		Form_pg_class relForm = (Form_pg_class) GETSTRUCT(tuple);
+		Oid			relid = relForm->oid;
+
+		if (is_publishable_class(relid, relForm))
+			result = lappend_oid(result, relid);
+	}
+
+	table_endscan(scan);
+
+	table_close(classRel, AccessShareLock);
+	return result;
+}
+
 /*
  * Get publication using oid
  *
@@ -784,10 +874,12 @@ GetPublication(Oid pubid)
 	pub->oid = pubid;
 	pub->name = pstrdup(NameStr(pubform->pubname));
 	pub->alltables = pubform->puballtables;
+	pub->allsequences = pubform->puballsequences;
 	pub->pubactions.pubinsert = pubform->pubinsert;
 	pub->pubactions.pubupdate = pubform->pubupdate;
 	pub->pubactions.pubdelete = pubform->pubdelete;
 	pub->pubactions.pubtruncate = pubform->pubtruncate;
+	pub->pubactions.pubsequence = pubform->pubsequence;
 	pub->pubviaroot = pubform->pubviaroot;
 
 	ReleaseSysCache(tup);
@@ -937,3 +1029,56 @@ pg_get_publication_tables(PG_FUNCTION_ARGS)
 
 	SRF_RETURN_DONE(funcctx);
 }
+
+/*
+ * Returns Oids of sequences in a publication.
+ */
+Datum
+pg_get_publication_sequences(PG_FUNCTION_ARGS)
+{
+	FuncCallContext *funcctx;
+	char	   *pubname = text_to_cstring(PG_GETARG_TEXT_PP(0));
+	Publication *publication;
+	List	   *sequences;
+
+	/* stuff done only on the first call of the function */
+	if (SRF_IS_FIRSTCALL())
+	{
+		MemoryContext oldcontext;
+
+		/* create a function context for cross-call persistence */
+		funcctx = SRF_FIRSTCALL_INIT();
+
+		/* switch to memory context appropriate for multiple function calls */
+		oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
+
+		publication = GetPublicationByName(pubname, false);
+
+		/*
+		 * Publications support partitioned tables, although all changes are
+		 * replicated using leaf partition identity and schema, so we only
+		 * need those.
+		 */
+		if (publication->allsequences)
+			sequences = GetAllSequencesPublicationRelations();
+		else
+			sequences = GetPublicationSequenceRelations(publication->oid);
+
+		funcctx->user_fctx = (void *) sequences;
+
+		MemoryContextSwitchTo(oldcontext);
+	}
+
+	/* stuff done on every call of the function */
+	funcctx = SRF_PERCALL_SETUP();
+	sequences = (List *) funcctx->user_fctx;
+
+	if (funcctx->call_cntr < list_length(sequences))
+	{
+		Oid			relid = list_nth_oid(sequences, funcctx->call_cntr);
+
+		SRF_RETURN_NEXT(funcctx, ObjectIdGetDatum(relid));
+	}
+
+	SRF_RETURN_DONE(funcctx);
+}
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index 3cb69b1f87b..b5cc33aca34 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -374,6 +374,16 @@ CREATE VIEW pg_publication_tables AS
          pg_class C JOIN pg_namespace N ON (N.oid = C.relnamespace)
     WHERE C.oid = GPT.relid;
 
+CREATE VIEW pg_publication_sequences AS
+    SELECT
+        P.pubname AS pubname,
+        N.nspname AS schemaname,
+        C.relname AS sequencename
+    FROM pg_publication P,
+         LATERAL pg_get_publication_sequences(P.pubname) GPT,
+         pg_class C JOIN pg_namespace N ON (N.oid = C.relnamespace)
+    WHERE C.oid = GPT.relid;
+
 CREATE VIEW pg_locks AS
     SELECT * FROM pg_lock_status() AS L;
 
diff --git a/src/backend/commands/publicationcmds.c b/src/backend/commands/publicationcmds.c
index 0e4bb97fb73..3bc2e8ccb66 100644
--- a/src/backend/commands/publicationcmds.c
+++ b/src/backend/commands/publicationcmds.c
@@ -16,6 +16,7 @@
 
 #include "access/genam.h"
 #include "access/htup_details.h"
+#include "access/relation.h"
 #include "access/table.h"
 #include "access/xact.h"
 #include "catalog/catalog.h"
@@ -59,6 +60,12 @@ static void PublicationAddSchemas(Oid pubid, List *schemas, bool if_not_exists,
 								  AlterPublicationStmt *stmt);
 static void PublicationDropSchemas(Oid pubid, List *schemas, bool missing_ok);
 
+static List *OpenSequenceList(List *sequences);
+static void CloseSequenceList(List *rels);
+static void PublicationAddSequences(Oid pubid, List *rels, bool if_not_exists,
+								 AlterPublicationStmt *stmt);
+static void PublicationDropSequences(Oid pubid, List *rels, bool missing_ok);
+
 static void
 parse_publication_options(ParseState *pstate,
 						  List *options,
@@ -77,6 +84,7 @@ parse_publication_options(ParseState *pstate,
 	pubactions->pubupdate = true;
 	pubactions->pubdelete = true;
 	pubactions->pubtruncate = true;
+	pubactions->pubsequence = true;
 	*publish_via_partition_root = false;
 
 	/* Parse options */
@@ -101,6 +109,7 @@ parse_publication_options(ParseState *pstate,
 			pubactions->pubupdate = false;
 			pubactions->pubdelete = false;
 			pubactions->pubtruncate = false;
+			pubactions->pubsequence = false;
 
 			*publish_given = true;
 			publish = defGetString(defel);
@@ -123,6 +132,8 @@ parse_publication_options(ParseState *pstate,
 					pubactions->pubdelete = true;
 				else if (strcmp(publish_opt, "truncate") == 0)
 					pubactions->pubtruncate = true;
+				else if (strcmp(publish_opt, "sequence") == 0)
+					pubactions->pubsequence = true;
 				else
 					ereport(ERROR,
 							(errcode(ERRCODE_SYNTAX_ERROR),
@@ -149,7 +160,9 @@ parse_publication_options(ParseState *pstate,
  */
 static void
 ObjectsInPublicationToOids(List *pubobjspec_list, ParseState *pstate,
-						   List **rels, List **schemas)
+						   List **tables, List **sequences,
+						   List **tables_schemas, List **sequences_schemas,
+						   List **schemas)
 {
 	ListCell   *cell;
 	PublicationObjSpec *pubobj;
@@ -167,12 +180,23 @@ ObjectsInPublicationToOids(List *pubobjspec_list, ParseState *pstate,
 		switch (pubobj->pubobjtype)
 		{
 			case PUBLICATIONOBJ_TABLE:
-				*rels = lappend(*rels, pubobj->pubtable);
+				*tables = lappend(*tables, pubobj->pubtable);
+				break;
+			case PUBLICATIONOBJ_SEQUENCE:
+				*sequences = lappend(*sequences, pubobj->pubtable);
 				break;
 			case PUBLICATIONOBJ_TABLES_IN_SCHEMA:
 				schemaid = get_namespace_oid(pubobj->name, false);
 
 				/* Filter out duplicates if user specifies "sch1, sch1" */
+				*tables_schemas = list_append_unique_oid(*tables_schemas, schemaid);
+				*schemas = list_append_unique_oid(*schemas, schemaid);
+				break;
+			case PUBLICATIONOBJ_SEQUENCES_IN_SCHEMA:
+				schemaid = get_namespace_oid(pubobj->name, false);
+
+				/* Filter out duplicates if user specifies "sch1, sch1" */
+				*sequences_schemas = list_append_unique_oid(*sequences_schemas, schemaid);
 				*schemas = list_append_unique_oid(*schemas, schemaid);
 				break;
 			case PUBLICATIONOBJ_TABLES_IN_CUR_SCHEMA:
@@ -186,6 +210,21 @@ ObjectsInPublicationToOids(List *pubobjspec_list, ParseState *pstate,
 				list_free(search_path);
 
 				/* Filter out duplicates if user specifies "sch1, sch1" */
+				*tables_schemas = list_append_unique_oid(*tables_schemas, schemaid);
+				*schemas = list_append_unique_oid(*schemas, schemaid);
+				break;
+			case PUBLICATIONOBJ_SEQUENCES_IN_CUR_SCHEMA:
+				search_path = fetch_search_path(false);
+				if (search_path == NIL) /* nothing valid in search_path? */
+					ereport(ERROR,
+							errcode(ERRCODE_UNDEFINED_SCHEMA),
+							errmsg("no schema has been selected for CURRENT_SCHEMA"));
+
+				schemaid = linitial_oid(search_path);
+				list_free(search_path);
+
+				/* Filter out duplicates if user specifies "sch1, sch1" */
+				*sequences_schemas = list_append_unique_oid(*sequences_schemas, schemaid);
 				*schemas = list_append_unique_oid(*schemas, schemaid);
 				break;
 			default:
@@ -251,7 +290,10 @@ CreatePublication(ParseState *pstate, CreatePublicationStmt *stmt)
 	bool		publish_via_partition_root_given;
 	bool		publish_via_partition_root;
 	AclResult	aclresult;
-	List	   *relations = NIL;
+	List	   *tables = NIL;
+	List	   *sequences = NIL;
+	List	   *tables_schemaidlist = NIL;
+	List	   *sequences_schemaidlist = NIL;
 	List	   *schemaidlist = NIL;
 
 	/* must have CREATE privilege on database */
@@ -306,6 +348,8 @@ CreatePublication(ParseState *pstate, CreatePublicationStmt *stmt)
 		BoolGetDatum(pubactions.pubdelete);
 	values[Anum_pg_publication_pubtruncate - 1] =
 		BoolGetDatum(pubactions.pubtruncate);
+	values[Anum_pg_publication_pubsequence - 1] =
+		BoolGetDatum(pubactions.pubsequence);
 	values[Anum_pg_publication_pubviaroot - 1] =
 		BoolGetDatum(publish_via_partition_root);
 
@@ -330,26 +374,40 @@ CreatePublication(ParseState *pstate, CreatePublicationStmt *stmt)
 	}
 	else
 	{
-		ObjectsInPublicationToOids(stmt->pubobjects, pstate, &relations,
+		ObjectsInPublicationToOids(stmt->pubobjects, pstate,
+								   &tables, &sequences,
+								   &tables_schemaidlist,
+								   &sequences_schemaidlist,
 								   &schemaidlist);
 
 		/* FOR ALL TABLES IN SCHEMA requires superuser */
-		if (list_length(schemaidlist) > 0 && !superuser())
+		if (list_length(tables_schemaidlist) > 0 && !superuser())
 			ereport(ERROR,
 					errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
 					errmsg("must be superuser to create FOR ALL TABLES IN SCHEMA publication"));
 
-		if (list_length(relations) > 0)
+		if (list_length(tables) > 0)
 		{
 			List	   *rels;
 
-			rels = OpenTableList(relations);
-			CheckObjSchemaNotAlreadyInPublication(rels, schemaidlist,
+			rels = OpenTableList(tables);
+			CheckObjSchemaNotAlreadyInPublication(rels, tables_schemaidlist,
 												  PUBLICATIONOBJ_TABLE);
 			PublicationAddTables(puboid, rels, true, NULL);
 			CloseTableList(rels);
 		}
 
+		if (list_length(sequences) > 0)
+		{
+			List	   *rels;
+
+			rels = OpenSequenceList(sequences);
+			CheckObjSchemaNotAlreadyInPublication(rels, sequences_schemaidlist,
+												  PUBLICATIONOBJ_SEQUENCE);
+			PublicationAddTables(puboid, rels, true, NULL);
+			CloseSequenceList(rels);
+		}
+
 		if (list_length(schemaidlist) > 0)
 		{
 			/*
@@ -653,12 +711,13 @@ AlterPublicationSchemas(AlterPublicationStmt *stmt,
  */
 static void
 CheckAlterPublication(AlterPublicationStmt *stmt, HeapTuple tup,
-					  List *tables, List *schemaidlist)
+					  List *tables, List *tables_schemaidlist,
+					  List *sequences, List *sequences_schemaidlist)
 {
 	Form_pg_publication pubform = (Form_pg_publication) GETSTRUCT(tup);
 
 	if ((stmt->action == AP_AddObjects || stmt->action == AP_SetObjects) &&
-		schemaidlist && !superuser())
+		(tables_schemaidlist || sequences_schemaidlist) && !superuser())
 		ereport(ERROR,
 				(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
 				 errmsg("must be superuser to add or set schemas")));
@@ -667,13 +726,24 @@ CheckAlterPublication(AlterPublicationStmt *stmt, HeapTuple tup,
 	 * Check that user is allowed to manipulate the publication tables in
 	 * schema
 	 */
-	if (schemaidlist && pubform->puballtables)
+	if (tables_schemaidlist && pubform->puballtables)
 		ereport(ERROR,
 				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
 				 errmsg("publication \"%s\" is defined as FOR ALL TABLES",
 						NameStr(pubform->pubname)),
 				 errdetail("Tables from schema cannot be added to, dropped from, or set on FOR ALL TABLES publications.")));
 
+	/*
+	 * Check that user is allowed to manipulate the publication sequences in
+	 * schema
+	 */
+	if (sequences_schemaidlist && pubform->puballsequences)
+		ereport(ERROR,
+				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+				 errmsg("publication \"%s\" is defined as FOR ALL SEQUENCES",
+						NameStr(pubform->pubname)),
+				 errdetail("Sequences from schema cannot be added to, dropped from, or set on FOR ALL SEQUENCES publications.")));
+
 	/* Check that user is allowed to manipulate the publication tables. */
 	if (tables && pubform->puballtables)
 		ereport(ERROR,
@@ -681,6 +751,108 @@ CheckAlterPublication(AlterPublicationStmt *stmt, HeapTuple tup,
 				 errmsg("publication \"%s\" is defined as FOR ALL TABLES",
 						NameStr(pubform->pubname)),
 				 errdetail("Tables cannot be added to or dropped from FOR ALL TABLES publications.")));
+
+	/* Check that user is allowed to manipulate the publication tables. */
+	if (sequences && pubform->puballsequences)
+		ereport(ERROR,
+				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+				 errmsg("publication \"%s\" is defined as FOR ALL SEQUENCES",
+						NameStr(pubform->pubname)),
+				 errdetail("Sequences cannot be added to or dropped from FOR ALL SEQUENCES publications.")));
+}
+
+/*
+ * Add or remove sequence to/from publication.
+ */
+static void
+AlterPublicationSequences(AlterPublicationStmt *stmt, HeapTuple tup,
+						  List *sequences, List *schemaidlist)
+{
+	List	   *rels = NIL;
+	Form_pg_publication pubform = (Form_pg_publication) GETSTRUCT(tup);
+	Oid			pubid = pubform->oid;
+
+	/*
+	 * It is quite possible that for the SET case user has not specified any
+	 * tables in which case we need to remove all the existing tables.
+	 */
+	if (!sequences && stmt->action != AP_SetObjects)
+		return;
+
+	rels = OpenSequenceList(sequences);
+
+	if (stmt->action == AP_AddObjects)
+	{
+		List	   *schemas = NIL;
+
+		/*
+		 * Check if the relation is member of the existing schema in the
+		 * publication or member of the schema list specified.
+		 */
+		schemas = list_concat_copy(schemaidlist, GetPublicationSchemas(pubid));
+		CheckObjSchemaNotAlreadyInPublication(rels, schemas,
+											  PUBLICATIONOBJ_SEQUENCE);
+		PublicationAddSequences(pubid, rels, false, stmt);
+	}
+	else if (stmt->action == AP_DropObjects)
+		PublicationDropSequences(pubid, rels, false);
+	else						/* DEFELEM_SET */
+	{
+		List	   *oldrelids = GetPublicationRelations(pubid,
+														PUBLICATION_PART_ROOT);
+		List	   *delrels = NIL;
+		ListCell   *oldlc;
+
+		CheckObjSchemaNotAlreadyInPublication(rels, schemaidlist,
+											  PUBLICATIONOBJ_SEQUENCE);
+
+		/* Calculate which relations to drop. */
+		foreach(oldlc, oldrelids)
+		{
+			Oid			oldrelid = lfirst_oid(oldlc);
+			ListCell   *newlc;
+			bool		found = false;
+
+			foreach(newlc, rels)
+			{
+				PublicationRelInfo *newpubrel;
+
+				newpubrel = (PublicationRelInfo *) lfirst(newlc);
+				if (RelationGetRelid(newpubrel->relation) == oldrelid)
+				{
+					found = true;
+					break;
+				}
+			}
+			/* Not yet in the list, open it and add to the list */
+			if (!found)
+			{
+				Relation	oldrel;
+				PublicationRelInfo *pubrel;
+
+				/* Wrap relation into PublicationRelInfo */
+				oldrel = table_open(oldrelid, ShareUpdateExclusiveLock);
+
+				pubrel = palloc(sizeof(PublicationRelInfo));
+				pubrel->relation = oldrel;
+
+				delrels = lappend(delrels, pubrel);
+			}
+		}
+
+		/* And drop them. */
+		PublicationDropSequences(pubid, delrels, true);
+
+		/*
+		 * Don't bother calculating the difference for adding, we'll catch and
+		 * skip existing ones when doing catalog update.
+		 */
+		PublicationAddSequences(pubid, rels, true, stmt);
+
+		CloseSequenceList(delrels);
+	}
+
+	CloseSequenceList(rels);
 }
 
 /*
@@ -718,13 +890,21 @@ AlterPublication(ParseState *pstate, AlterPublicationStmt *stmt)
 		AlterPublicationOptions(pstate, stmt, rel, tup);
 	else
 	{
-		List	   *relations = NIL;
+		List	   *tables = NIL;
+		List	   *sequences = NIL;
+		List	   *tables_schemaidlist = NIL;
+		List	   *sequences_schemaidlist = NIL;
 		List	   *schemaidlist = NIL;
 
-		ObjectsInPublicationToOids(stmt->pubobjects, pstate, &relations,
+		ObjectsInPublicationToOids(stmt->pubobjects, pstate,
+								   &tables, &sequences,
+								   &tables_schemaidlist,
+								   &sequences_schemaidlist,
 								   &schemaidlist);
 
-		CheckAlterPublication(stmt, tup, relations, schemaidlist);
+		CheckAlterPublication(stmt, tup,
+							  tables, tables_schemaidlist,
+							  sequences, sequences_schemaidlist);
 
 		/*
 		 * Lock the publication so nobody else can do anything with it. This
@@ -749,7 +929,9 @@ AlterPublication(ParseState *pstate, AlterPublicationStmt *stmt)
 					errmsg("publication \"%s\" does not exist",
 						   stmt->pubname));
 
-		AlterPublicationTables(stmt, tup, relations, schemaidlist);
+		AlterPublicationTables(stmt, tup, tables, tables_schemaidlist);
+		AlterPublicationSequences(stmt, tup, sequences, sequences_schemaidlist);
+
 		AlterPublicationSchemas(stmt, tup, schemaidlist);
 	}
 
@@ -1157,6 +1339,144 @@ PublicationDropSchemas(Oid pubid, List *schemas, bool missing_ok)
 	}
 }
 
+/*
+ * Open relations specified by a PublicationTable list.
+ * In the returned list of PublicationRelInfo, tables are locked
+ * in ShareUpdateExclusiveLock mode in order to add them to a publication.
+ */
+static List *
+OpenSequenceList(List *sequences)
+{
+	List	   *relids = NIL;
+	List	   *rels = NIL;
+	ListCell   *lc;
+
+	/*
+	 * Open, share-lock, and check all the explicitly-specified relations
+	 */
+	foreach(lc, sequences)
+	{
+		PublicationTable *s = lfirst_node(PublicationTable, lc);
+		Relation	rel;
+		Oid			myrelid;
+		PublicationRelInfo *pub_rel;
+
+		/* Allow query cancel in case this takes a long time */
+		CHECK_FOR_INTERRUPTS();
+
+		rel = table_openrv(s->relation, ShareUpdateExclusiveLock);
+		myrelid = RelationGetRelid(rel);
+
+		/*
+		 * Filter out duplicates if user specifies "foo, foo".
+		 *
+		 * Note that this algorithm is known to not be very efficient (O(N^2))
+		 * but given that it only works on list of tables given to us by user
+		 * it's deemed acceptable.
+		 */
+		if (list_member_oid(relids, myrelid))
+		{
+			table_close(rel, ShareUpdateExclusiveLock);
+			continue;
+		}
+
+		pub_rel = palloc(sizeof(PublicationRelInfo));
+		pub_rel->relation = rel;
+		rels = lappend(rels, pub_rel);
+		relids = lappend_oid(relids, myrelid);
+	}
+
+	list_free(relids);
+
+	return rels;
+}
+
+/*
+ * Close all relations in the list.
+ */
+static void
+CloseSequenceList(List *rels)
+{
+	ListCell   *lc;
+
+	foreach(lc, rels)
+	{
+		PublicationRelInfo *pub_rel;
+
+		pub_rel = (PublicationRelInfo *) lfirst(lc);
+		table_close(pub_rel->relation, NoLock);
+	}
+}
+
+/*
+ * Add listed tables to the publication.
+ */
+static void
+PublicationAddSequences(Oid pubid, List *rels, bool if_not_exists,
+						AlterPublicationStmt *stmt)
+{
+	ListCell   *lc;
+
+	Assert(!stmt || !stmt->for_all_sequences);
+
+	foreach(lc, rels)
+	{
+		PublicationRelInfo *pub_rel = (PublicationRelInfo *) lfirst(lc);
+		Relation	rel = pub_rel->relation;
+		ObjectAddress obj;
+
+		/* Must be owner of the sequence or superuser. */
+		if (!pg_class_ownercheck(RelationGetRelid(rel), GetUserId()))
+			aclcheck_error(ACLCHECK_NOT_OWNER, get_relkind_objtype(rel->rd_rel->relkind),
+						   RelationGetRelationName(rel));
+
+		obj = publication_add_relation(pubid, pub_rel, if_not_exists);
+		if (stmt)
+		{
+			EventTriggerCollectSimpleCommand(obj, InvalidObjectAddress,
+											 (Node *) stmt);
+
+			InvokeObjectPostCreateHook(PublicationRelRelationId,
+									   obj.objectId, 0);
+		}
+	}
+}
+
+/*
+ * Remove listed sequences from the publication.
+ */
+static void
+PublicationDropSequences(Oid pubid, List *rels, bool missing_ok)
+{
+	ObjectAddress obj;
+	ListCell   *lc;
+	Oid			prid;
+
+	foreach(lc, rels)
+	{
+		Relation	rel = (Relation) lfirst(lc);
+		Oid			relid = RelationGetRelid(rel);
+
+		prid = GetSysCacheOid2(PUBLICATIONRELMAP, Anum_pg_publication_rel_oid,
+							   ObjectIdGetDatum(relid),
+							   ObjectIdGetDatum(pubid));
+		if (!OidIsValid(prid))
+		{
+			if (missing_ok)
+				continue;
+
+			ereport(ERROR,
+					(errcode(ERRCODE_UNDEFINED_OBJECT),
+					 errmsg("relation \"%s\" is not part of the publication",
+							RelationGetRelationName(rel))));
+		}
+
+		ObjectAddressSet(obj, PublicationRelRelationId, prid);
+		performDeletion(&obj, DROP_CASCADE, 0);
+	}
+}
+
+
 /*
  * Internal workhorse for changing a publication owner
  */
diff --git a/src/backend/commands/sequence.c b/src/backend/commands/sequence.c
index ab592ce2f15..fe4f21ec438 100644
--- a/src/backend/commands/sequence.c
+++ b/src/backend/commands/sequence.c
@@ -336,6 +336,85 @@ ResetSequence(Oid seq_relid)
 	relation_close(seq_rel, NoLock);
 }
 
+/*
+ * Reset a sequence to its initial value.
+ *
+ * The change is made transactionally, so that on failure of the current
+ * transaction, the sequence will be restored to its previous state.
+ * We do that by creating a whole new relfilenode for the sequence; so this
+ * works much like the rewriting forms of ALTER TABLE.
+ *
+ * Caller is assumed to have acquired AccessExclusiveLock on the sequence,
+ * which must not be released until end of transaction.  Caller is also
+ * responsible for permissions checking.
+ */
+void
+ResetSequence2(Oid seq_relid, int64 last_value, int64 log_cnt, bool is_called)
+{
+	Relation	seq_rel;
+	SeqTable	elm;
+	Form_pg_sequence_data seq;
+	Buffer		buf;
+	HeapTupleData seqdatatuple;
+	HeapTuple	tuple;
+
+	/*
+	 * Read the old sequence.  This does a bit more work than really
+	 * necessary, but it's simple, and we do want to double-check that it's
+	 * indeed a sequence.
+	 */
+	init_sequence(seq_relid, &elm, &seq_rel);
+	(void) read_seq_tuple(seq_rel, &buf, &seqdatatuple);
+
+	/*
+	 * Copy the existing sequence tuple.
+	 */
+	tuple = heap_copytuple(&seqdatatuple);
+
+	/* Now we're done with the old page */
+	UnlockReleaseBuffer(buf);
+
+	/*
+	 * Modify the copied tuple to execute the restart (compare the RESTART
+	 * action in AlterSequence)
+	 */
+	seq = (Form_pg_sequence_data) GETSTRUCT(tuple);
+	seq->last_value = last_value;
+	seq->is_called = is_called;
+	seq->log_cnt = log_cnt;
+
+	/*
+	 * Create a new storage file for the sequence.
+	 */
+	RelationSetNewRelfilenode(seq_rel, seq_rel->rd_rel->relpersistence);
+
+	/*
+	 * Ensure sequence's relfrozenxid is at 0, since it won't contain any
+	 * unfrozen XIDs.  Same with relminmxid, since a sequence will never
+	 * contain multixacts.
+	 */
+	Assert(seq_rel->rd_rel->relfrozenxid == InvalidTransactionId);
+	Assert(seq_rel->rd_rel->relminmxid == InvalidMultiXactId);
+
+	/*
+	 * Insert the modified tuple into the new storage file.
+	 *
+	 * XXX Maybe this should also use created=true, just like the other places
+	 * calling fill_seq_with_data. That's probably needed for correct cascading
+	 * replication.
+	 *
+	 * XXX That'd mean all fill_seq_with_data callers use created=true, making
+	 * the parameter unnecessary.
+	 */
+	fill_seq_with_data(seq_rel, tuple);
+
+	/* Clear local cache so that we don't think we have cached numbers */
+	/* Note that we do not change the currval() state */
+	elm->cached = elm->last;
+
+	relation_close(seq_rel, NoLock);
+}
+
 /*
  * Initialize a sequence's relation with the specified tuple as content
  */
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index 3ef6607d246..5beb67e7652 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -85,6 +85,7 @@ typedef struct SubOpts
 } SubOpts;
 
 static List *fetch_table_list(WalReceiverConn *wrconn, List *publications);
+static List *fetch_sequence_list(WalReceiverConn *wrconn, List *publications);
 static void check_duplicates_in_publist(List *publist, Datum *datums);
 static List *merge_publications(List *oldpublist, List *newpublist, bool addpub, const char *subname);
 static void ReportSlotConnectionError(List *rstates, Oid subid, char *slotname, char *err);
@@ -496,6 +497,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
 		char	   *err;
 		WalReceiverConn *wrconn;
 		List	   *tables;
+		List	   *sequences;
 		ListCell   *lc;
 		char		table_state;
 
@@ -534,6 +536,26 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
 										InvalidXLogRecPtr);
 			}
 
+			/*
+			 * Get the sequence list from publisher and build local sequence
+			 * status info.
+			 */
+			sequences = fetch_sequence_list(wrconn, publications);
+			foreach(lc, sequences)
+			{
+				RangeVar   *rv = (RangeVar *) lfirst(lc);
+				Oid			relid;
+
+				relid = RangeVarGetRelid(rv, AccessShareLock, false);
+
+				/* Check for supported relkind. */
+				CheckSubscriptionRelkind(get_rel_relkind(relid),
+										 rv->schemaname, rv->relname);
+
+				AddSubscriptionRelState(subid, relid, table_state,
+										InvalidXLogRecPtr);
+			}
+
 			/*
 			 * If requested, create permanent slot for the subscription. We
 			 * won't use the initial snapshot for anything, so no need to
@@ -706,6 +728,10 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data)
 		{
 			Oid			relid = subrel_local_oids[off];
 
+			/* XXX ignore sequences - maybe do this in GetSubscriptionRelations? */
+			if (get_rel_relkind(relid) == RELKIND_SEQUENCE)
+				continue;
+
 			if (!bsearch(&relid, pubrel_local_oids,
 						 list_length(pubrel_names), sizeof(Oid), oid_cmp))
 			{
@@ -797,6 +823,183 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data)
 				ReplicationSlotDropAtPubNode(wrconn, syncslotname, true);
 			}
 		}
+
+		/*
+		 * XXX now do the same thing for sequences, maybe before the preceding
+		 * block, or earlier?
+		 */
+
+		/* Get the table list from publisher. */
+		pubrel_names = fetch_sequence_list(wrconn, sub->publications);
+
+		/* Get local table list. */
+		subrel_states = GetSubscriptionRelations(sub->oid);
+
+		/*
+		 * Build qsorted array of local table oids for faster lookup. This can
+		 * potentially contain all tables in the database so speed of lookup
+		 * is important.
+		 */
+		subrel_local_oids = palloc(list_length(subrel_states) * sizeof(Oid));
+		off = 0;
+		foreach(lc, subrel_states)
+		{
+			SubscriptionRelState *relstate = (SubscriptionRelState *) lfirst(lc);
+
+			subrel_local_oids[off++] = relstate->relid;
+		}
+		qsort(subrel_local_oids, list_length(subrel_states),
+			  sizeof(Oid), oid_cmp);
+
+		/*
+		 * Rels that we want to remove from subscription and drop any slots
+		 * and origins corresponding to them.
+		 */
+		sub_remove_rels = palloc(list_length(subrel_states) * sizeof(SubRemoveRels));
+
+		/*
+		 * Walk over the remote tables and try to match them to locally known
+		 * tables. If the table is not known locally create a new state for
+		 * it.
+		 *
+		 * Also builds array of local oids of remote tables for the next step.
+		 */
+		off = 0;
+		pubrel_local_oids = palloc(list_length(pubrel_names) * sizeof(Oid));
+
+		foreach(lc, pubrel_names)
+		{
+			RangeVar   *rv = (RangeVar *) lfirst(lc);
+			Oid			relid;
+
+			relid = RangeVarGetRelid(rv, AccessShareLock, false);
+
+			/* Check for supported relkind. */
+			CheckSubscriptionRelkind(get_rel_relkind(relid),
+									 rv->schemaname, rv->relname);
+
+			pubrel_local_oids[off++] = relid;
+
+			if (!bsearch(&relid, subrel_local_oids,
+						 list_length(subrel_states), sizeof(Oid), oid_cmp))
+			{
+				AddSubscriptionRelState(sub->oid, relid,
+										copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY,
+										InvalidXLogRecPtr);
+				ereport(DEBUG1,
+						(errmsg_internal("table \"%s.%s\" added to subscription \"%s\"",
+										 rv->schemaname, rv->relname, sub->name)));
+			}
+		}
+
+		/*
+		 * Next remove state for tables we should not care about anymore using
+		 * the data we collected above
+		 */
+		qsort(pubrel_local_oids, list_length(pubrel_names),
+			  sizeof(Oid), oid_cmp);
+
+		remove_rel_len = 0;
+		for (off = 0; off < list_length(subrel_states); off++)
+		{
+			Oid			relid = subrel_local_oids[off];
+
+			/* XXX ignore non-sequences - maybe do this in GetSubscriptionRelations? */
+			if (get_rel_relkind(relid) != RELKIND_SEQUENCE)
+				continue;
+
+			if (!bsearch(&relid, pubrel_local_oids,
+						 list_length(pubrel_names), sizeof(Oid), oid_cmp))
+			{
+				char		state;
+				XLogRecPtr	statelsn;
+
+				/*
+				 * Lock pg_subscription_rel with AccessExclusiveLock to
+				 * prevent any race conditions with the apply worker
+				 * re-launching workers at the same time this code is trying
+				 * to remove those tables.
+				 *
+				 * Even if new worker for this particular rel is restarted it
+				 * won't be able to make any progress as we hold exclusive
+				 * lock on subscription_rel till the transaction end. It will
+				 * simply exit as there is no corresponding rel entry.
+				 *
+				 * This locking also ensures that the state of rels won't
+				 * change till we are done with this refresh operation.
+				 */
+				if (!rel)
+					rel = table_open(SubscriptionRelRelationId, AccessExclusiveLock);
+
+				/* Last known rel state. */
+				state = GetSubscriptionRelState(sub->oid, relid, &statelsn);
+
+				sub_remove_rels[remove_rel_len].relid = relid;
+				sub_remove_rels[remove_rel_len++].state = state;
+
+				RemoveSubscriptionRel(sub->oid, relid);
+
+				logicalrep_worker_stop(sub->oid, relid);
+
+				/*
+				 * For READY state, we would have already dropped the
+				 * tablesync origin.
+				 */
+				if (state != SUBREL_STATE_READY)
+				{
+					char		originname[NAMEDATALEN];
+
+					/*
+					 * Drop the tablesync's origin tracking if exists.
+					 *
+					 * It is possible that the origin is not yet created for
+					 * tablesync worker, this can happen for the states before
+					 * SUBREL_STATE_FINISHEDCOPY. The apply worker can also
+					 * concurrently try to drop the origin and by this time
+					 * the origin might be already removed. For these reasons,
+					 * passing missing_ok = true.
+					 */
+					ReplicationOriginNameForTablesync(sub->oid, relid, originname,
+													  sizeof(originname));
+					replorigin_drop_by_name(originname, true, false);
+				}
+
+				ereport(DEBUG1,
+						(errmsg_internal("table \"%s.%s\" removed from subscription \"%s\"",
+										 get_namespace_name(get_rel_namespace(relid)),
+										 get_rel_name(relid),
+										 sub->name)));
+			}
+		}
+
+		/*
+		 * Drop the tablesync slots associated with removed tables. This has
+		 * to be at the end because otherwise if there is an error while doing
+		 * the database operations we won't be able to rollback dropped slots.
+		 */
+		for (off = 0; off < remove_rel_len; off++)
+		{
+			if (sub_remove_rels[off].state != SUBREL_STATE_READY &&
+				sub_remove_rels[off].state != SUBREL_STATE_SYNCDONE)
+			{
+				char		syncslotname[NAMEDATALEN] = {0};
+
+				/*
+				 * For READY/SYNCDONE states we know the tablesync slot has
+				 * already been dropped by the tablesync worker.
+				 *
+				 * For other states, there is no certainty, maybe the slot
+				 * does not exist yet. Also, if we fail after removing some of
+				 * the slots, next time, it will again try to drop already
+				 * dropped slots and fail. For these reasons, we allow
+				 * missing_ok = true for the drop.
+				 */
+				ReplicationSlotNameForTablesync(sub->oid, sub_remove_rels[off].relid,
+												syncslotname, sizeof(syncslotname));
+				ReplicationSlotDropAtPubNode(wrconn, syncslotname, true);
+			}
+		}
+
 	}
 	PG_FINALLY();
 	{
@@ -1616,6 +1819,75 @@ fetch_table_list(WalReceiverConn *wrconn, List *publications)
 	return tablelist;
 }
 
+/*
+ * Get the list of sequences which belong to specified publications on the
+ * publisher connection.
+ */
+static List *
+fetch_sequence_list(WalReceiverConn *wrconn, List *publications)
+{
+	WalRcvExecResult *res;
+	StringInfoData cmd;
+	TupleTableSlot *slot;
+	Oid			tableRow[2] = {TEXTOID, TEXTOID};
+	ListCell   *lc;
+	bool		first;
+	List	   *tablelist = NIL;
+
+	Assert(list_length(publications) > 0);
+
+	initStringInfo(&cmd);
+	appendStringInfoString(&cmd, "SELECT DISTINCT s.schemaname, s.sequencename\n"
+						   "  FROM pg_catalog.pg_publication_sequences s\n"
+						   " WHERE s.pubname IN (");
+	first = true;
+	foreach(lc, publications)
+	{
+		char	   *pubname = strVal(lfirst(lc));
+
+		if (first)
+			first = false;
+		else
+			appendStringInfoString(&cmd, ", ");
+
+		appendStringInfoString(&cmd, quote_literal_cstr(pubname));
+	}
+	appendStringInfoChar(&cmd, ')');
+
+	res = walrcv_exec(wrconn, cmd.data, 2, tableRow);
+	pfree(cmd.data);
+
+	if (res->status != WALRCV_OK_TUPLES)
+		ereport(ERROR,
+				(errmsg("could not receive list of replicated tables from the publisher: %s",
+						res->err)));
+
+	/* Process tables. */
+	slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
+	while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
+	{
+		char	   *nspname;
+		char	   *relname;
+		bool		isnull;
+		RangeVar   *rv;
+
+		nspname = TextDatumGetCString(slot_getattr(slot, 1, &isnull));
+		Assert(!isnull);
+		relname = TextDatumGetCString(slot_getattr(slot, 2, &isnull));
+		Assert(!isnull);
+
+		rv = makeRangeVar(nspname, relname, -1);
+		tablelist = lappend(tablelist, rv);
+
+		ExecClearTuple(slot);
+	}
+	ExecDropSingleTupleTableSlot(slot);
+
+	walrcv_clear_result(res);
+
+	return tablelist;
+}
+
 /*
  * This is to report the connection failure while dropping replication slots.
  * Here, we report the WARNING for all tablesync slots so that user can drop
diff --git a/src/backend/executor/execReplication.c b/src/backend/executor/execReplication.c
index 313c87398b2..78f14119d98 100644
--- a/src/backend/executor/execReplication.c
+++ b/src/backend/executor/execReplication.c
@@ -608,7 +608,7 @@ void
 CheckSubscriptionRelkind(char relkind, const char *nspname,
 						 const char *relname)
 {
-	if (relkind != RELKIND_RELATION && relkind != RELKIND_PARTITIONED_TABLE)
+	if (relkind != RELKIND_RELATION && relkind != RELKIND_PARTITIONED_TABLE && relkind != RELKIND_SEQUENCE)
 		ereport(ERROR,
 				(errcode(ERRCODE_WRONG_OBJECT_TYPE),
 				 errmsg("cannot use relation \"%s.%s\" as logical replication target",
diff --git a/src/backend/nodes/copyfuncs.c b/src/backend/nodes/copyfuncs.c
index 6bd95bbce24..8b7e9710401 100644
--- a/src/backend/nodes/copyfuncs.c
+++ b/src/backend/nodes/copyfuncs.c
@@ -4852,6 +4852,7 @@ _copyCreatePublicationStmt(const CreatePublicationStmt *from)
 	COPY_NODE_FIELD(options);
 	COPY_NODE_FIELD(pubobjects);
 	COPY_SCALAR_FIELD(for_all_tables);
+	COPY_SCALAR_FIELD(for_all_sequences);
 
 	return newnode;
 }
diff --git a/src/backend/nodes/equalfuncs.c b/src/backend/nodes/equalfuncs.c
index 4126516222b..55a7dbbddf3 100644
--- a/src/backend/nodes/equalfuncs.c
+++ b/src/backend/nodes/equalfuncs.c
@@ -2337,6 +2337,7 @@ _equalAlterPublicationStmt(const AlterPublicationStmt *a,
 	COMPARE_NODE_FIELD(options);
 	COMPARE_NODE_FIELD(pubobjects);
 	COMPARE_SCALAR_FIELD(for_all_tables);
+	COMPARE_SCALAR_FIELD(for_all_sequences);
 	COMPARE_SCALAR_FIELD(action);
 
 	return true;
diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y
index c4f32425060..bbde765cf56 100644
--- a/src/backend/parser/gram.y
+++ b/src/backend/parser/gram.y
@@ -9771,6 +9771,26 @@ PublicationObjSpec:
 					$$->pubobjtype = PUBLICATIONOBJ_TABLES_IN_CUR_SCHEMA;
 					$$->location = @5;
 				}
+			| SEQUENCE relation_expr
+				{
+					$$ = makeNode(PublicationObjSpec);
+					$$->pubobjtype = PUBLICATIONOBJ_SEQUENCE;
+					$$->pubtable = makeNode(PublicationTable);
+					$$->pubtable->relation = $2;
+				}
+			| ALL SEQUENCES IN_P SCHEMA ColId
+				{
+					$$ = makeNode(PublicationObjSpec);
+					$$->pubobjtype = PUBLICATIONOBJ_SEQUENCES_IN_SCHEMA;
+					$$->name = $5;
+					$$->location = @5;
+				}
+			| ALL SEQUENCES IN_P SCHEMA CURRENT_SCHEMA
+				{
+					$$ = makeNode(PublicationObjSpec);
+					$$->pubobjtype = PUBLICATIONOBJ_SEQUENCES_IN_CUR_SCHEMA;
+					$$->location = @5;
+				}
 			| ColId
 				{
 					$$ = makeNode(PublicationObjSpec);
@@ -10106,6 +10126,12 @@ UnlistenStmt:
 				}
 		;
 
+/*
+ * FIXME
+ *
+ * opt_publication_for_sequences and publication_for_sequences should be
+ * copies for sequences
+ */
 
 /*****************************************************************************
  *
@@ -10114,6 +10140,12 @@ UnlistenStmt:
  *		BEGIN / COMMIT / ROLLBACK
  *		(also older versions END / ABORT)
  *
+ * ALTER PUBLICATION name ADD SEQUENCE sequence [, sequence2]
+ *
+ * ALTER PUBLICATION name DROP SEQUENCE sequence [, sequence2]
+ *
+ * ALTER PUBLICATION name SET SEQUENCE sequence [, sequence2]
+ *
  *****************************************************************************/
 
 TransactionStmt:
diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c
index 953942692ce..e8ead1387ae 100644
--- a/src/backend/replication/logical/proto.c
+++ b/src/backend/replication/logical/proto.c
@@ -647,6 +647,56 @@ logicalrep_write_message(StringInfo out, TransactionId xid, XLogRecPtr lsn,
 	pq_sendbytes(out, message, sz);
 }
 
+/*
+ * Write SEQUENCE to stream
+ */
+void
+logicalrep_write_sequence(StringInfo out, Relation rel, TransactionId xid,
+						  XLogRecPtr lsn, bool transactional,
+						  int64 last_value, int64 log_cnt, bool is_called)
+{
+	uint8		flags = 0;
+	char	   *relname;
+
+	pq_sendbyte(out, LOGICAL_REP_MSG_SEQUENCE);
+
+	/* transaction ID (if not valid, we're not streaming) */
+	if (TransactionIdIsValid(xid))
+		pq_sendint32(out, xid);
+
+	pq_sendint8(out, flags);
+	pq_sendint64(out, lsn);
+
+	logicalrep_write_namespace(out, RelationGetNamespace(rel));
+	relname = RelationGetRelationName(rel);
+	pq_sendstring(out, relname);
+
+	pq_sendint8(out, transactional);
+	pq_sendint64(out, last_value);
+	pq_sendint64(out, log_cnt);
+	pq_sendint8(out, is_called);
+}
+
+/*
+ * Read SEQUENCE from the stream.
+ */
+void
+logicalrep_read_sequence(StringInfo in, LogicalRepSequence *seqdata)
+{
+	/* XXX skipping flags and lsn */
+	pq_getmsgint(in, 1);
+	pq_getmsgint64(in);
+
+	/* Read relation name from stream */
+	seqdata->nspname = pstrdup(logicalrep_read_namespace(in));
+	seqdata->seqname = pstrdup(pq_getmsgstring(in));
+
+	seqdata->transactional = pq_getmsgint(in, 1);
+	seqdata->last_value = pq_getmsgint64(in);
+	seqdata->log_cnt = pq_getmsgint64(in);
+	seqdata->is_called = pq_getmsgint(in, 1);
+}
+
 /*
  * Write relation description to the output stream.
  */
@@ -1203,6 +1253,8 @@ logicalrep_message_type(LogicalRepMsgType action)
 			return "STREAM ABORT";
 		case LOGICAL_REP_MSG_STREAM_PREPARE:
 			return "STREAM PREPARE";
+		case LOGICAL_REP_MSG_SEQUENCE:
+			return "SEQUENCE";
 	}
 
 	elog(ERROR, "invalid logical replication message type \"%c\"", action);
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index e596b69d466..f746c13ca44 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -100,6 +100,7 @@
 #include "catalog/pg_subscription_rel.h"
 #include "catalog/pg_type.h"
 #include "commands/copy.h"
+#include "commands/sequence.h"
 #include "miscadmin.h"
 #include "parser/parse_relation.h"
 #include "pgstat.h"
@@ -359,6 +360,12 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn)
  *
  * If the synchronization position is reached (SYNCDONE), then the table can
  * be marked as READY and is no longer tracked.
+ *
+ * XXX This needs to handle sequences too - after AlterSubscription_refresh
+ * starts caring about sequences, GetSubscriptionNotReadyRelations won't
+ * return just tables, and we'll have to sync them here. Not sure it's worth
+ * creating a new "sync" worker per sequence, maybe we should just sync them
+ * in the current process (it's pretty light-weight).
  */
 static void
 process_syncing_tables_for_apply(XLogRecPtr current_lsn)
@@ -873,6 +880,95 @@ copy_table(Relation rel)
 	logicalrep_rel_close(relmapentry, NoLock);
 }
 
+
+
+/*
+ * FIXME add comment
+ */
+static void
+fetch_sequence_data(char *nspname, char *relname,
+					int64 *last_value, int64 *log_cnt, bool *is_called)
+{
+	WalRcvExecResult *res;
+	StringInfoData cmd;
+	TupleTableSlot *slot;
+	Oid			tableRow[3] = {INT8OID, INT8OID, BOOLOID};
+
+	initStringInfo(&cmd);
+	appendStringInfo(&cmd, "SELECT last_value, log_cnt, is_called\n"
+					   "  FROM %s", quote_qualified_identifier(nspname, relname));
+
+	res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data, 3, tableRow);
+	pfree(cmd.data);
+
+	if (res->status != WALRCV_OK_TUPLES)
+		ereport(ERROR,
+				(errmsg("could not receive list of replicated tables from the publisher: %s",
+						res->err)));
+
+	/* Process the sequence. */
+	slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
+	while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
+	{
+		bool		isnull;
+
+		*last_value = DatumGetInt64(slot_getattr(slot, 1, &isnull));
+		Assert(!isnull);
+
+		*log_cnt = DatumGetInt64(slot_getattr(slot, 2, &isnull));
+		Assert(!isnull);
+
+		*is_called = DatumGetBool(slot_getattr(slot, 3, &isnull));
+		Assert(!isnull);
+
+		ExecClearTuple(slot);
+	}
+	ExecDropSingleTupleTableSlot(slot);
+
+	walrcv_clear_result(res);
+}
+
+/*
+ * Copy existing data of a sequence from publisher.
+ *
+ * Caller is responsible for locking the local relation.
+ */
+static void
+copy_sequence(Relation rel)
+{
+	LogicalRepRelMapEntry *relmapentry;
+	LogicalRepRelation lrel;
+	StringInfoData cmd;
+	int64		last_value = 0,
+				log_cnt = 0;
+	bool		is_called = 0;
+
+	/* Get the publisher relation info. */
+	fetch_remote_table_info(get_namespace_name(RelationGetNamespace(rel)),
+							RelationGetRelationName(rel), &lrel);
+
+	/* Put the relation into relmap. */
+	logicalrep_relmap_update(&lrel);
+
+	/* Map the publisher relation to local one. */
+	relmapentry = logicalrep_rel_open(lrel.remoteid, NoLock);
+	Assert(rel == relmapentry->localrel);
+
+	/* Start copy on the publisher. */
+	initStringInfo(&cmd);
+
+	Assert(lrel.relkind == RELKIND_SEQUENCE);
+
+	fetch_sequence_data(lrel.nspname, lrel.relname, &last_value, &log_cnt, &is_called);
+
+	ResetSequence2(RelationGetRelid(rel), last_value, log_cnt, is_called);
+
+	logicalrep_rel_close(relmapentry, NoLock);
+}
+
+
+
+
 /*
  * Determine the tablesync slot name.
  *
@@ -1134,10 +1230,20 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
 						originname)));
 	}
 
-	/* Now do the initial data copy */
-	PushActiveSnapshot(GetTransactionSnapshot());
-	copy_table(rel);
-	PopActiveSnapshot();
+	if (get_rel_relkind(RelationGetRelid(rel)) == RELKIND_SEQUENCE)
+	{
+		/* Now do the initial sequence copy */
+		PushActiveSnapshot(GetTransactionSnapshot());
+		copy_sequence(rel);
+		PopActiveSnapshot();
+	}
+	else
+	{
+		/* Now do the initial data copy */
+		PushActiveSnapshot(GetTransactionSnapshot());
+		copy_table(rel);
+		PopActiveSnapshot();
+	}
 
 	res = walrcv_exec(LogRepWorkerWalRcvConn, "COMMIT", 0, NULL);
 	if (res->status != WALRCV_OK_COMMAND)
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index d77bb32bb9e..68708d3907f 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -144,6 +144,7 @@
 #include "catalog/pg_tablespace.h"
 #include "commands/tablecmds.h"
 #include "commands/tablespace.h"
+#include "commands/sequence.h"
 #include "commands/trigger.h"
 #include "executor/executor.h"
 #include "executor/execPartition.h"
@@ -1093,6 +1094,61 @@ apply_handle_origin(StringInfo s)
 				 errmsg_internal("ORIGIN message sent out of order")));
 }
 
+/*
+ * Handle SEQUENCE message.
+ */
+static void
+apply_handle_sequence(StringInfo s)
+{
+	LogicalRepSequence	seq;
+	Oid					relid;
+
+	if (handle_streamed_transaction(LOGICAL_REP_MSG_SEQUENCE, s))
+		return;
+
+	logicalrep_read_sequence(s, &seq);
+
+	/*
+	 * Non-transactional sequence updates should not be part of a remote
+	 * transaction. There should not be any running transaction.
+	 */
+	Assert((!seq.transactional) || in_remote_transaction);
+	Assert(!(!seq.transactional && in_remote_transaction));
+	Assert(!(!seq.transactional && IsTransactionState()));
+
+	/*
+	 * Make sure we're in a transaction (needed by ResetSequence2). For
+	 * non-transactional updates we're guaranteed to start a new one,
+	 * and we'll commit it at the end.
+	 */
+	if (!IsTransactionState())
+	{
+		StartTransactionCommand();
+		maybe_reread_subscription();
+	}
+
+	relid = RangeVarGetRelid(makeRangeVar(seq.nspname,
+										  seq.seqname, -1),
+							 RowExclusiveLock, false);
+
+	/* lock the sequence in AccessExclusiveLock, as expected by ResetSequence2 */
+	elog(WARNING, "locking sequence %d in exclusive mode", relid);
+	LockRelationOid(relid, AccessExclusiveLock);
+
+	elog(WARNING, "applying sequence %s.%s transactional %d last_value %ld log_cnt %ld is_called %d",
+		 seq.nspname, seq.seqname, seq.transactional, seq.last_value, seq.log_cnt, seq.is_called);
+
+	/* apply the sequence change */
+	ResetSequence2(relid, seq.last_value, seq.log_cnt, seq.is_called);
+
+	/*
+	 * Commit the per-stream transaction (we only do this when not in
+	 * remote transaction, i.e. for non-transactional sequence updates.
+	 */
+	if (!in_remote_transaction)
+		CommitTransactionCommand();
+}
+
 /*
  * Handle STREAM START message.
  */
@@ -2421,6 +2477,10 @@ apply_dispatch(StringInfo s)
 			 */
 			break;
 
+		case LOGICAL_REP_MSG_SEQUENCE:
+			apply_handle_sequence(s);
+			return;
+
 		case LOGICAL_REP_MSG_STREAM_START:
 			apply_handle_stream_start(s);
 			break;
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index 6df705f90ff..b82c6b10305 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -49,6 +49,10 @@ static void pgoutput_message(LogicalDecodingContext *ctx,
 							 ReorderBufferTXN *txn, XLogRecPtr message_lsn,
 							 bool transactional, const char *prefix,
 							 Size sz, const char *message);
+static void pgoutput_sequence(LogicalDecodingContext *ctx,
+							  ReorderBufferTXN *txn, XLogRecPtr sequence_lsn,
+							  Relation rel, bool transactional,
+							  int64 last_value, int64 log_cnt, bool is_called);
 static bool pgoutput_origin_filter(LogicalDecodingContext *ctx,
 								   RepOriginId origin_id);
 static void pgoutput_begin_prepare_txn(LogicalDecodingContext *ctx,
@@ -161,6 +165,7 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
 	cb->change_cb = pgoutput_change;
 	cb->truncate_cb = pgoutput_truncate;
 	cb->message_cb = pgoutput_message;
+	cb->sequence_cb = pgoutput_sequence;
 	cb->commit_cb = pgoutput_commit_txn;
 
 	cb->begin_prepare_cb = pgoutput_begin_prepare_txn;
@@ -177,6 +182,7 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
 	cb->stream_commit_cb = pgoutput_stream_commit;
 	cb->stream_change_cb = pgoutput_change;
 	cb->stream_message_cb = pgoutput_message;
+	cb->stream_sequence_cb = pgoutput_sequence;
 	cb->stream_truncate_cb = pgoutput_truncate;
 	/* transaction streaming - two-phase commit */
 	cb->stream_prepare_cb = pgoutput_stream_prepare_txn;
@@ -190,6 +196,7 @@ parse_output_parameters(List *options, PGOutputData *data)
 	bool		publication_names_given = false;
 	bool		binary_option_given = false;
 	bool		messages_option_given = false;
+	bool		sequences_option_given = false;
 	bool		streaming_given = false;
 	bool		two_phase_option_given = false;
 
@@ -197,6 +204,7 @@ parse_output_parameters(List *options, PGOutputData *data)
 	data->streaming = false;
 	data->messages = false;
 	data->two_phase = false;
+	data->sequences = true;
 
 	foreach(lc, options)
 	{
@@ -262,6 +270,16 @@ parse_output_parameters(List *options, PGOutputData *data)
 
 			data->messages = defGetBoolean(defel);
 		}
+		else if (strcmp(defel->defname, "sequences") == 0)
+		{
+			if (sequences_option_given)
+				ereport(ERROR,
+						(errcode(ERRCODE_SYNTAX_ERROR),
+						 errmsg("conflicting or redundant options")));
+			sequences_option_given = true;
+
+			data->sequences = defGetBoolean(defel);
+		}
 		else if (strcmp(defel->defname, "streaming") == 0)
 		{
 			if (streaming_given)
@@ -858,6 +876,51 @@ pgoutput_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 	OutputPluginWrite(ctx, true);
 }
 
+static void
+pgoutput_sequence(LogicalDecodingContext *ctx,
+				  ReorderBufferTXN *txn, XLogRecPtr sequence_lsn,
+				  Relation rel, bool transactional,
+				  int64 last_value, int64 log_cnt, bool is_called)
+{
+	PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
+	TransactionId xid = InvalidTransactionId;
+	RelationSyncEntry *relentry;
+
+	if (!data->sequences)
+		return;
+
+	if (!is_publishable_relation(rel))
+		return;
+
+	/*
+	 * Remember the xid for the message in streaming mode. See
+	 * pgoutput_change.
+	 */
+	if (in_streaming)
+		xid = txn->xid;
+
+	relentry = get_rel_sync_entry(data, RelationGetRelid(rel));
+
+	/*
+	 * First check the sequence filter.
+	 *
+	 * We handle just REORDER_BUFFER_CHANGE_SEQUENCE here.
+	 */
+	if (!relentry->pubactions.pubsequence)
+		return;
+
+	OutputPluginPrepareWrite(ctx, true);
+	logicalrep_write_sequence(ctx->out,
+							  rel,
+							  xid,
+							  sequence_lsn,
+							  transactional,
+							  last_value,
+							  log_cnt,
+							  is_called);
+	OutputPluginWrite(ctx, true);
+}
+
 /*
  * Currently we always forward.
  */
@@ -1141,7 +1204,8 @@ get_rel_sync_entry(PGOutputData *data, Oid relid)
 		entry->schema_sent = false;
 		entry->streamed_txns = NIL;
 		entry->pubactions.pubinsert = entry->pubactions.pubupdate =
-			entry->pubactions.pubdelete = entry->pubactions.pubtruncate = false;
+			entry->pubactions.pubdelete = entry->pubactions.pubtruncate =
+			entry->pubactions.pubsequence = false;
 		entry->publish_as_relid = InvalidOid;
 		entry->map = NULL;		/* will be set by maybe_send_schema() if
 								 * needed */
@@ -1163,6 +1227,7 @@ get_rel_sync_entry(PGOutputData *data, Oid relid)
 		Oid			publish_as_relid = relid;
 		bool		am_partition = get_rel_relispartition(relid);
 		char		relkind = get_rel_relkind(relid);
+		bool		is_sequence = (get_rel_relkind(relid) == RELKIND_SEQUENCE);
 
 		/* Reload publications if needed before use. */
 		if (!publications_valid)
@@ -1191,6 +1256,7 @@ get_rel_sync_entry(PGOutputData *data, Oid relid)
 		entry->pubactions.pubupdate = false;
 		entry->pubactions.pubdelete = false;
 		entry->pubactions.pubtruncate = false;
+		entry->pubactions.pubsequence = false;
 		if (entry->map)
 		{
 			/*
@@ -1213,12 +1279,23 @@ get_rel_sync_entry(PGOutputData *data, Oid relid)
 			Publication *pub = lfirst(lc);
 			bool		publish = false;
 
-			if (pub->alltables)
+			if (pub->alltables && (!is_sequence))
 			{
 				publish = true;
 				if (pub->pubviaroot && am_partition)
 					publish_as_relid = llast_oid(get_partition_ancestors(relid));
 			}
+			else if (pub->allsequences && is_sequence)
+			{
+				publish = true;
+			}
+
+			/* if a sequence, just cross-check the list of publications */
+			if (!publish && is_sequence)
+			{
+				if (list_member_oid(pubids, pub->oid))
+					publish = true;
+			}
 
 			if (!publish)
 			{
@@ -1275,10 +1352,12 @@ get_rel_sync_entry(PGOutputData *data, Oid relid)
 				entry->pubactions.pubupdate |= pub->pubactions.pubupdate;
 				entry->pubactions.pubdelete |= pub->pubactions.pubdelete;
 				entry->pubactions.pubtruncate |= pub->pubactions.pubtruncate;
+				entry->pubactions.pubsequence |= pub->pubactions.pubsequence;
 			}
 
 			if (entry->pubactions.pubinsert && entry->pubactions.pubupdate &&
-				entry->pubactions.pubdelete && entry->pubactions.pubtruncate)
+				entry->pubactions.pubdelete && entry->pubactions.pubtruncate &&
+				entry->pubactions.pubsequence)
 				break;
 		}
 
diff --git a/src/backend/utils/cache/relcache.c b/src/backend/utils/cache/relcache.c
index 2707fed12f4..45a8b3e490a 100644
--- a/src/backend/utils/cache/relcache.c
+++ b/src/backend/utils/cache/relcache.c
@@ -5586,6 +5586,7 @@ GetRelationPublicationActions(Relation relation)
 		pubactions->pubupdate |= pubform->pubupdate;
 		pubactions->pubdelete |= pubform->pubdelete;
 		pubactions->pubtruncate |= pubform->pubtruncate;
+		pubactions->pubsequence |= pubform->pubsequence;
 
 		ReleaseSysCache(tup);
 
@@ -5594,7 +5595,8 @@ GetRelationPublicationActions(Relation relation)
 		 * other publications.
 		 */
 		if (pubactions->pubinsert && pubactions->pubupdate &&
-			pubactions->pubdelete && pubactions->pubtruncate)
+			pubactions->pubdelete && pubactions->pubtruncate &&
+			pubactions->pubsequence)
 			break;
 	}
 
diff --git a/src/bin/psql/tab-complete.c b/src/bin/psql/tab-complete.c
index 98882272130..4900c48ff19 100644
--- a/src/bin/psql/tab-complete.c
+++ b/src/bin/psql/tab-complete.c
@@ -1782,20 +1782,20 @@ psql_completion(const char *text, int start, int end)
 		COMPLETE_WITH("ADD", "DROP", "OWNER TO", "RENAME TO", "SET");
 	/* ALTER PUBLICATION <name> ADD */
 	else if (Matches("ALTER", "PUBLICATION", MatchAny, "ADD"))
-		COMPLETE_WITH("ALL TABLES IN SCHEMA", "TABLE");
-	else if (Matches("ALTER", "PUBLICATION", MatchAny, "ADD|SET", "TABLE") ||
-			 (HeadMatches("ALTER", "PUBLICATION", MatchAny, "ADD|SET", "TABLE") &&
+		COMPLETE_WITH("ALL TABLES IN SCHEMA", "TABLE|SEQUENCE");
+	else if (Matches("ALTER", "PUBLICATION", MatchAny, "ADD|SET", "TABLE|SEQUENCE") ||
+			 (HeadMatches("ALTER", "PUBLICATION", MatchAny, "ADD|SET", "TABLE|SEQUENCE") &&
 			  ends_with(prev_wd, ',')))
 		COMPLETE_WITH_SCHEMA_QUERY(Query_for_list_of_tables);
-	else if (HeadMatches("ALTER", "PUBLICATION", MatchAny, "ADD|SET", "TABLE"))
+	else if (HeadMatches("ALTER", "PUBLICATION", MatchAny, "ADD|SET", "TABLE|SEQUENCE"))
 		COMPLETE_WITH(",");
 	/* ALTER PUBLICATION <name> DROP */
 	else if (Matches("ALTER", "PUBLICATION", MatchAny, "DROP"))
-		COMPLETE_WITH("ALL TABLES IN SCHEMA", "TABLE");
+		COMPLETE_WITH("ALL TABLES IN SCHEMA", "TABLE|SEQUENCE");
 	/* ALTER PUBLICATION <name> SET */
 	else if (Matches("ALTER", "PUBLICATION", MatchAny, "SET"))
-		COMPLETE_WITH("(", "ALL TABLES IN SCHEMA", "TABLE");
-	else if (Matches("ALTER", "PUBLICATION", MatchAny, "ADD|DROP|SET", "ALL", "TABLES", "IN", "SCHEMA"))
+		COMPLETE_WITH("(", "ALL TABLES IN SCHEMA", "TABLE|SEQUENCE");
+	else if (Matches("ALTER", "PUBLICATION", MatchAny, "ADD|DROP|SET", "ALL", "TABLES|SEQUENCES", "IN", "SCHEMA"))
 		COMPLETE_WITH_QUERY_PLUS(Query_for_list_of_schemas
 								 " AND nspname NOT LIKE E'pg\\\\_%'",
 								 "CURRENT_SCHEMA");
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index 62f36daa981..a66eb8109d0 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -11546,6 +11546,11 @@
   provolatile => 's', prorettype => 'oid', proargtypes => 'text',
   proallargtypes => '{text,oid}', proargmodes => '{i,o}',
   proargnames => '{pubname,relid}', prosrc => 'pg_get_publication_tables' },
+{ oid => '8000', descr => 'get OIDs of sequences in a publication',
+  proname => 'pg_get_publication_sequences', prorows => '1000', proretset => 't',
+  provolatile => 's', prorettype => 'oid', proargtypes => 'text',
+  proallargtypes => '{text,oid}', proargmodes => '{i,o}',
+  proargnames => '{pubname,relid}', prosrc => 'pg_get_publication_sequences' },
 { oid => '6121',
   descr => 'returns whether a relation can be part of a publication',
   proname => 'pg_relation_is_publishable', provolatile => 's',
diff --git a/src/include/catalog/pg_publication.h b/src/include/catalog/pg_publication.h
index 841b9b6c253..e56286772f4 100644
--- a/src/include/catalog/pg_publication.h
+++ b/src/include/catalog/pg_publication.h
@@ -40,6 +40,12 @@ CATALOG(pg_publication,6104,PublicationRelationId)
 	 */
 	bool		puballtables;
 
+	/*
+	 * indicates that this is special publication which should encompass all
+	 * sequences in the database (except for the unlogged and temp ones)
+	 */
+	bool		puballsequences;
+
 	/* true if inserts are published */
 	bool		pubinsert;
 
@@ -52,6 +58,9 @@ CATALOG(pg_publication,6104,PublicationRelationId)
 	/* true if truncates are published */
 	bool		pubtruncate;
 
+	/* true if sequences are published */
+	bool		pubsequence;
+
 	/* true if partition changes are published using root schema */
 	bool		pubviaroot;
 } FormData_pg_publication;
@@ -72,6 +81,7 @@ typedef struct PublicationActions
 	bool		pubupdate;
 	bool		pubdelete;
 	bool		pubtruncate;
+	bool		pubsequence;
 } PublicationActions;
 
 typedef struct Publication
@@ -79,6 +89,7 @@ typedef struct Publication
 	Oid			oid;
 	char	   *name;
 	bool		alltables;
+	bool		allsequences;
 	bool		pubviaroot;
 	PublicationActions pubactions;
 } Publication;
@@ -121,6 +132,9 @@ extern List *GetPubPartitionOptionRelations(List *result,
 											PublicationPartOpt pub_partopt,
 											Oid relid);
 
+extern List *GetAllSequencesPublicationRelations(void);
+extern List *GetPublicationSequenceRelations(Oid pubid);
+
 extern bool is_publishable_relation(Relation rel);
 extern bool is_schema_publication(Oid pubid);
 extern ObjectAddress publication_add_relation(Oid pubid, PublicationRelInfo *targetrel,
diff --git a/src/include/commands/sequence.h b/src/include/commands/sequence.h
index 9fecc41954e..d8c255a7af5 100644
--- a/src/include/commands/sequence.h
+++ b/src/include/commands/sequence.h
@@ -60,6 +60,7 @@ extern ObjectAddress DefineSequence(ParseState *pstate, CreateSeqStmt *stmt);
 extern ObjectAddress AlterSequence(ParseState *pstate, AlterSeqStmt *stmt);
 extern void DeleteSequenceTuple(Oid relid);
 extern void ResetSequence(Oid seq_relid);
+extern void ResetSequence2(Oid seq_relid, int64 last_value, int64 log_cnt, bool is_called);
 extern void ResetSequenceCaches(void);
 
 extern void seq_redo(XLogReaderState *rptr);
diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h
index 37fcc4c9b5a..4a990364e4a 100644
--- a/src/include/nodes/parsenodes.h
+++ b/src/include/nodes/parsenodes.h
@@ -3656,6 +3656,10 @@ typedef enum PublicationObjSpecType
 	PUBLICATIONOBJ_TABLES_IN_SCHEMA,	/* All tables in schema */
 	PUBLICATIONOBJ_TABLES_IN_CUR_SCHEMA,	/* All tables in first element of
 											 * search_path */
+	PUBLICATIONOBJ_SEQUENCE,		/* Sequence type */
+	PUBLICATIONOBJ_SEQUENCES_IN_SCHEMA, /* Sequences in schema type */
+	PUBLICATIONOBJ_SEQUENCES_IN_CUR_SCHEMA, /* Get the first element of
+											 * search_path */
 	PUBLICATIONOBJ_CONTINUATION /* Continuation of previous type */
 } PublicationObjSpecType;
 
@@ -3675,6 +3679,7 @@ typedef struct CreatePublicationStmt
 	List	   *options;		/* List of DefElem nodes */
 	List	   *pubobjects;		/* Optional list of publication objects */
 	bool		for_all_tables; /* Special publication for all tables in db */
+	bool		for_all_sequences; /* Special publication for all sequences in db */
 } CreatePublicationStmt;
 
 typedef enum AlterPublicationAction
@@ -3698,6 +3703,7 @@ typedef struct AlterPublicationStmt
 	 */
 	List	   *pubobjects;		/* Optional list of publication objects */
 	bool		for_all_tables; /* Special publication for all tables in db */
+	bool		for_all_sequences; /* Special publication for all sequences in db */
 	AlterPublicationAction action;	/* What action to perform with the given
 									 * objects */
 } AlterPublicationStmt;
diff --git a/src/include/replication/logicalproto.h b/src/include/replication/logicalproto.h
index 22fffaca62d..8f8c325522d 100644
--- a/src/include/replication/logicalproto.h
+++ b/src/include/replication/logicalproto.h
@@ -60,6 +60,7 @@ typedef enum LogicalRepMsgType
 	LOGICAL_REP_MSG_RELATION = 'R',
 	LOGICAL_REP_MSG_TYPE = 'Y',
 	LOGICAL_REP_MSG_MESSAGE = 'M',
+	LOGICAL_REP_MSG_SEQUENCE = 'X',	/* FIXME change */
 	LOGICAL_REP_MSG_BEGIN_PREPARE = 'b',
 	LOGICAL_REP_MSG_PREPARE = 'P',
 	LOGICAL_REP_MSG_COMMIT_PREPARED = 'K',
@@ -117,6 +118,18 @@ typedef struct LogicalRepTyp
 	char	   *typname;		/* name of the remote type */
 } LogicalRepTyp;
 
+/* Sequence info */
+typedef struct LogicalRepSequence
+{
+	Oid			remoteid;		/* unique id of the remote sequence */
+	char	   *nspname;		/* schema name of remote sequence */
+	char	   *seqname;		/* name of the remote sequence */
+	bool		transactional;
+	int64		last_value;
+	int64		log_cnt;
+	bool		is_called;
+} LogicalRepSequence;
+
 /* Transaction info */
 typedef struct LogicalRepBeginData
 {
@@ -227,6 +240,12 @@ extern List *logicalrep_read_truncate(StringInfo in,
 									  bool *cascade, bool *restart_seqs);
 extern void logicalrep_write_message(StringInfo out, TransactionId xid, XLogRecPtr lsn,
 									 bool transactional, const char *prefix, Size sz, const char *message);
+extern void logicalrep_write_sequence(StringInfo out, Relation rel,
+									  TransactionId xid, XLogRecPtr lsn,
+									  bool transactional,
+									  int64 last_value, int64 log_cnt,
+									  bool is_called);
+extern void logicalrep_read_sequence(StringInfo in, LogicalRepSequence *seqdata);
 extern void logicalrep_write_rel(StringInfo out, TransactionId xid,
 								 Relation rel);
 extern LogicalRepRelation *logicalrep_read_rel(StringInfo in);
diff --git a/src/include/replication/pgoutput.h b/src/include/replication/pgoutput.h
index 78aa9151ef5..a6f6843ada6 100644
--- a/src/include/replication/pgoutput.h
+++ b/src/include/replication/pgoutput.h
@@ -28,6 +28,7 @@ typedef struct PGOutputData
 	bool		streaming;
 	bool		messages;
 	bool		two_phase;
+	bool		sequences;
 } PGOutputData;
 
 #endif							/* PGOUTPUT_H */
diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out
index 1420288d67b..3f50e100f8d 100644
--- a/src/test/regress/expected/rules.out
+++ b/src/test/regress/expected/rules.out
@@ -1429,6 +1429,14 @@ pg_prepared_xacts| SELECT p.transaction,
    FROM ((pg_prepared_xact() p(transaction, gid, prepared, ownerid, dbid)
      LEFT JOIN pg_authid u ON ((p.ownerid = u.oid)))
      LEFT JOIN pg_database d ON ((p.dbid = d.oid)));
+pg_publication_sequences| SELECT p.pubname,
+    n.nspname AS schemaname,
+    c.relname AS sequencename
+   FROM pg_publication p,
+    LATERAL pg_get_publication_sequences((p.pubname)::text) gpt(relid),
+    (pg_class c
+     JOIN pg_namespace n ON ((n.oid = c.relnamespace)))
+  WHERE (c.oid = gpt.relid);
 pg_publication_tables| SELECT p.pubname,
     n.nspname AS schemaname,
     c.relname AS tablename
diff --git a/src/test/subscription/t/028_sequences.pl b/src/test/subscription/t/028_sequences.pl
new file mode 100644
index 00000000000..58775769cdc
--- /dev/null
+++ b/src/test/subscription/t/028_sequences.pl
@@ -0,0 +1,196 @@
+
+# Copyright (c) 2021, PostgreSQL Global Development Group
+
+# This tests that sequences are replicated correctly by logical replication
+use strict;
+use warnings;
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More tests => 6;
+
+# Initialize publisher node
+my $node_publisher = PostgreSQL::Test::Cluster->new('publisher');
+$node_publisher->init(allows_streaming => 'logical');
+$node_publisher->start;
+
+# Create subscriber node
+my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber');
+$node_subscriber->init(allows_streaming => 'logical');
+$node_subscriber->start;
+
+# Create some preexisting content on publisher
+my $ddl = qq(
+	CREATE SEQUENCE s;
+);
+
+# Setup structure on the publisher
+$node_publisher->safe_psql('postgres', $ddl);
+
+# Create some the same structure on subscriber, and an extra sequence that
+# we'll create on the publisher later
+$ddl = qq(
+	CREATE SEQUENCE s;
+	CREATE SEQUENCE s2;
+);
+
+$node_subscriber->safe_psql('postgres', $ddl);
+
+# Setup logical replication
+my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
+$node_publisher->safe_psql('postgres',
+	"CREATE PUBLICATION seq_pub");
+
+$node_publisher->safe_psql('postgres',
+	"ALTER PUBLICATION seq_pub ADD SEQUENCE s");
+
+$node_subscriber->safe_psql('postgres',
+	"CREATE SUBSCRIPTION seq_sub CONNECTION '$publisher_connstr' PUBLICATION seq_pub WITH (slot_name = seq_sub_slot)"
+);
+
+$node_publisher->wait_for_catchup('seq_sub');
+
+# Wait for initial sync to finish as well
+my $synced_query =
+  "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('s', 'r');";
+$node_subscriber->poll_query_until('postgres', $synced_query)
+  or die "Timed out while waiting for subscriber to synchronize data";
+
+# Insert initial test data
+$node_publisher->safe_psql(
+	'postgres', qq(
+	-- generate a number of values using the sequence
+	SELECT nextval('s') FROM generate_series(1,100);
+));
+
+$node_publisher->wait_for_catchup('seq_sub');
+
+# Check the data on subscriber
+my $result = $node_subscriber->safe_psql(
+	'postgres', qq(
+	SELECT * FROM s;
+));
+
+is( $result, '132|0|t',
+	'check replicated sequence values on subscriber');
+
+
+# advance the sequence in a rolled-back transaction - should not be replicated
+$node_publisher->safe_psql(
+	'postgres', qq(
+	BEGIN;
+	SELECT nextval('s') FROM generate_series(1,100);
+	ROLLBACK;
+));
+
+$node_publisher->wait_for_catchup('seq_sub');
+
+# Check the data on subscriber
+$result = $node_subscriber->safe_psql(
+	'postgres', qq(
+	SELECT * FROM s;
+));
+
+is( $result, '231|0|t',
+	'check replicated sequence values on subscriber');
+
+
+# create a new sequence and roll it back - should not be replicated, due to
+# the transactional behavior
+$node_publisher->safe_psql(
+	'postgres', qq(
+	BEGIN;
+	CREATE SEQUENCE s2;
+	ALTER PUBLICATION seq_pub ADD SEQUENCE s2;
+	SELECT nextval('s2') FROM generate_series(1,100);
+	ROLLBACK;
+));
+
+$node_publisher->wait_for_catchup('seq_sub');
+
+# Check the data on subscriber
+$result = $node_subscriber->safe_psql(
+	'postgres', qq(
+	SELECT * FROM s2;
+));
+
+is( $result, '1|0|f',
+	'check replicated sequence values on subscriber');
+
+
+# create a new sequence, advance it in a rolled-back transaction, but commit
+# the create - the advance should be replicated nevertheless
+$node_publisher->safe_psql(
+	'postgres', qq(
+	BEGIN;
+	CREATE SEQUENCE s2;
+	ALTER PUBLICATION seq_pub ADD SEQUENCE s2;
+	SAVEPOINT sp1;
+	SELECT nextval('s2') FROM generate_series(1,100);
+	ROLLBACK TO sp1;
+	COMMIT;
+));
+
+$node_publisher->wait_for_catchup('seq_sub');
+
+# Wait for sync of the second sequence we just added to finish
+$synced_query =
+  "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('s', 'r');";
+$node_subscriber->poll_query_until('postgres', $synced_query)
+  or die "Timed out while waiting for subscriber to synchronize data";
+
+# Check the data on subscriber
+$result = $node_subscriber->safe_psql(
+	'postgres', qq(
+	SELECT * FROM s2;
+));
+
+is( $result, '132|0|t',
+	'check replicated sequence values on subscriber');
+
+
+# advance the new sequence in a transaction, and roll it back - in this case
+# it should not be replicated at commit
+$node_publisher->safe_psql(
+	'postgres', qq(
+	BEGIN;
+	SELECT nextval('s2') FROM generate_series(1,100);
+	ROLLBACK;
+));
+
+$node_publisher->wait_for_catchup('seq_sub');
+
+# Check the data on subscriber
+$result = $node_subscriber->safe_psql(
+	'postgres', qq(
+	SELECT * FROM s2;
+));
+
+is( $result, '231|0|t',
+	'check replicated sequence values on subscriber');
+
+
+# advance the sequence in a subtransaction - the subtransaction gets rolled
+# back, but commit the main one - the changes should still be replicated
+$node_publisher->safe_psql(
+	'postgres', qq(
+	BEGIN;
+	SAVEPOINT s1;
+	SELECT nextval('s2') FROM generate_series(1,100);
+	ROLLBACK TO s1;
+	COMMIT;
+));
+
+$node_publisher->wait_for_catchup('seq_sub');
+
+# Check the data on subscriber
+$result = $node_subscriber->safe_psql(
+	'postgres', qq(
+	SELECT * FROM s2;
+));
+
+is( $result, '330|0|t',
+	'check replicated sequence values on subscriber');
+
+
+$node_subscriber->stop('fast');
+$node_publisher->stop('fast');
-- 
2.34.1

From bd37ce394dc14d0d2962b28251e058e12a789614 Mon Sep 17 00:00:00 2001
From: Tomas Vondra <to...@2ndquadrant.com>
Date: Sat, 12 Feb 2022 01:24:47 +0100
Subject: [PATCH 2/2] tweak test

---
 src/test/subscription/t/028_sequences.pl | 14 ++++++++------
 1 file changed, 8 insertions(+), 6 deletions(-)

diff --git a/src/test/subscription/t/028_sequences.pl b/src/test/subscription/t/028_sequences.pl
index 58775769cdc..58ed02462d8 100644
--- a/src/test/subscription/t/028_sequences.pl
+++ b/src/test/subscription/t/028_sequences.pl
@@ -20,6 +20,7 @@ $node_subscriber->start;
 
 # Create some preexisting content on publisher
 my $ddl = qq(
+	CREATE TABLE seq_test (v BIGINT);
 	CREATE SEQUENCE s;
 );
 
@@ -29,6 +30,7 @@ $node_publisher->safe_psql('postgres', $ddl);
 # Create some the same structure on subscriber, and an extra sequence that
 # we'll create on the publisher later
 $ddl = qq(
+	CREATE TABLE seq_test (v BIGINT);
 	CREATE SEQUENCE s;
 	CREATE SEQUENCE s2;
 );
@@ -59,7 +61,7 @@ $node_subscriber->poll_query_until('postgres', $synced_query)
 $node_publisher->safe_psql(
 	'postgres', qq(
 	-- generate a number of values using the sequence
-	SELECT nextval('s') FROM generate_series(1,100);
+	INSERT INTO seq_test SELECT nextval('s') FROM generate_series(1,100);
 ));
 
 $node_publisher->wait_for_catchup('seq_sub');
@@ -78,7 +80,7 @@ is( $result, '132|0|t',
 $node_publisher->safe_psql(
 	'postgres', qq(
 	BEGIN;
-	SELECT nextval('s') FROM generate_series(1,100);
+	INSERT INTO seq_test SELECT nextval('s') FROM generate_series(1,100);
 	ROLLBACK;
 ));
 
@@ -101,7 +103,7 @@ $node_publisher->safe_psql(
 	BEGIN;
 	CREATE SEQUENCE s2;
 	ALTER PUBLICATION seq_pub ADD SEQUENCE s2;
-	SELECT nextval('s2') FROM generate_series(1,100);
+	INSERT INTO seq_test SELECT nextval('s2') FROM generate_series(1,100);
 	ROLLBACK;
 ));
 
@@ -125,7 +127,7 @@ $node_publisher->safe_psql(
 	CREATE SEQUENCE s2;
 	ALTER PUBLICATION seq_pub ADD SEQUENCE s2;
 	SAVEPOINT sp1;
-	SELECT nextval('s2') FROM generate_series(1,100);
+	INSERT INTO seq_test SELECT nextval('s2') FROM generate_series(1,100);
 	ROLLBACK TO sp1;
 	COMMIT;
 ));
@@ -153,7 +155,7 @@ is( $result, '132|0|t',
 $node_publisher->safe_psql(
 	'postgres', qq(
 	BEGIN;
-	SELECT nextval('s2') FROM generate_series(1,100);
+	INSERT INTO seq_test SELECT nextval('s2') FROM generate_series(1,100);
 	ROLLBACK;
 ));
 
@@ -175,7 +177,7 @@ $node_publisher->safe_psql(
 	'postgres', qq(
 	BEGIN;
 	SAVEPOINT s1;
-	SELECT nextval('s2') FROM generate_series(1,100);
+	INSERT INTO seq_test SELECT nextval('s2') FROM generate_series(1,100);
 	ROLLBACK TO s1;
 	COMMIT;
 ));
-- 
2.34.1

Reply via email to