From a128e79d5da7b70386fe0063db09812255827eb2 Mon Sep 17 00:00:00 2001
From: "Chao Li (Evan)" <lic@highgo.com>
Date: Mon, 2 Feb 2026 15:50:57 +0800
Subject: [PATCH v2] Add fallbackfull option to publication

When a publication's fallbackfull option is true, then tables in
the publication get an ability of fallback replica to FULL if a
table's replica identity is DEFAULT but has no PK.

The code change in currently at PoC quality level, dirty logs
are included, so please reviewers only focus on the design part.

Author: Chao Li <lic@highgo.com>
---
 src/backend/access/heap/heapam.c            | 61 ++++++++++++---------
 src/backend/catalog/pg_publication.c        |  1 +
 src/backend/commands/publicationcmds.c      | 58 +++++++++++++++++---
 src/backend/executor/execReplication.c      |  2 +-
 src/backend/replication/logical/proto.c     | 27 ++++++++-
 src/backend/replication/logical/relation.c  | 50 ++++++++++++++++-
 src/backend/replication/logical/worker.c    | 28 +++++++++-
 src/backend/replication/pgoutput/pgoutput.c | 33 ++++++++++-
 src/include/catalog/pg_publication.h        |  4 ++
 src/include/replication/logicalproto.h      | 13 ++++-
 src/include/replication/logicalrelation.h   |  1 +
 11 files changed, 233 insertions(+), 45 deletions(-)

diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index 3004964ab7f..d4f78b9807e 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -60,7 +60,8 @@ static HeapTuple heap_prepare_insert(Relation relation, HeapTuple tup,
 static XLogRecPtr log_heap_update(Relation reln, Buffer oldbuf,
 								  Buffer newbuf, HeapTuple oldtup,
 								  HeapTuple newtup, HeapTuple old_key_tuple,
-								  bool all_visible_cleared, bool new_all_visible_cleared);
+								  bool all_visible_cleared, bool new_all_visible_cleared,
+								  bool logical_identity_is_full);
 #ifdef USE_ASSERT_CHECKING
 static void check_lock_if_inplace_updateable_rel(Relation relation,
 												 const ItemPointerData *otid,
@@ -107,7 +108,7 @@ static void index_delete_sort(TM_IndexDeleteOp *delstate);
 static int	bottomup_sort_and_shrink(TM_IndexDeleteOp *delstate);
 static XLogRecPtr log_heap_new_cid(Relation relation, HeapTuple tup);
 static HeapTuple ExtractReplicaIdentity(Relation relation, HeapTuple tp, bool key_required,
-										bool *copy);
+										bool *copy, bool *ri_is_full);
 
 
 /*
@@ -2859,6 +2860,7 @@ heap_delete(Relation relation, const ItemPointerData *tid,
 	bool		all_visible_cleared = false;
 	HeapTuple	old_key_tuple = NULL;	/* replica identity of the tuple */
 	bool		old_key_copied = false;
+	bool		logical_identity_is_full = false;
 
 	Assert(ItemPointerIsValid(tid));
 
@@ -3088,7 +3090,7 @@ l1:
 	 * Compute replica identity tuple before entering the critical section so
 	 * we don't PANIC upon a memory allocation failure.
 	 */
-	old_key_tuple = ExtractReplicaIdentity(relation, &tp, true, &old_key_copied);
+	old_key_tuple = ExtractReplicaIdentity(relation, &tp, true, &old_key_copied, &logical_identity_is_full);
 
 	/*
 	 * If this is the first possibly-multixact-able operation in the current
@@ -3170,13 +3172,13 @@ l1:
 		xlrec.offnum = ItemPointerGetOffsetNumber(&tp.t_self);
 		xlrec.xmax = new_xmax;
 
-		if (old_key_tuple != NULL)
-		{
-			if (relation->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
-				xlrec.flags |= XLH_DELETE_CONTAINS_OLD_TUPLE;
-			else
-				xlrec.flags |= XLH_DELETE_CONTAINS_OLD_KEY;
-		}
+			if (old_key_tuple != NULL)
+			{
+				if (logical_identity_is_full)
+					xlrec.flags |= XLH_DELETE_CONTAINS_OLD_TUPLE;
+				else
+					xlrec.flags |= XLH_DELETE_CONTAINS_OLD_KEY;
+			}
 
 		XLogBeginInsert();
 		XLogRegisterData(&xlrec, SizeOfHeapDelete);
@@ -3326,6 +3328,7 @@ heap_update(Relation relation, const ItemPointerData *otid, HeapTuple newtup,
 	HeapTuple	heaptup;
 	HeapTuple	old_key_tuple = NULL;
 	bool		old_key_copied = false;
+	bool		logical_identity_is_full = false;
 	Page		page;
 	BlockNumber block;
 	MultiXactStatus mxact_status;
@@ -4113,7 +4116,7 @@ l2:
 	old_key_tuple = ExtractReplicaIdentity(relation, &oldtup,
 										   bms_overlap(modified_attrs, id_attrs) ||
 										   id_has_external,
-										   &old_key_copied);
+										   &old_key_copied, &logical_identity_is_full);
 
 	/* NO EREPORT(ERROR) from here till changes are logged */
 	START_CRIT_SECTION();
@@ -4200,11 +4203,12 @@ l2:
 			log_heap_new_cid(relation, heaptup);
 		}
 
-		recptr = log_heap_update(relation, buffer,
-								 newbuf, &oldtup, heaptup,
-								 old_key_tuple,
-								 all_visible_cleared,
-								 all_visible_cleared_new);
+			recptr = log_heap_update(relation, buffer,
+									 newbuf, &oldtup, heaptup,
+									 old_key_tuple,
+									 all_visible_cleared,
+									 all_visible_cleared_new,
+									 logical_identity_is_full);
 		if (newbuf != buffer)
 		{
 			PageSetLSN(BufferGetPage(newbuf), recptr);
@@ -8918,7 +8922,8 @@ static XLogRecPtr
 log_heap_update(Relation reln, Buffer oldbuf,
 				Buffer newbuf, HeapTuple oldtup, HeapTuple newtup,
 				HeapTuple old_key_tuple,
-				bool all_visible_cleared, bool new_all_visible_cleared)
+				bool all_visible_cleared, bool new_all_visible_cleared,
+				bool logical_identity_is_full)
 {
 	xl_heap_update xlrec;
 	xl_heap_header xlhdr;
@@ -9007,14 +9012,14 @@ log_heap_update(Relation reln, Buffer oldbuf,
 		xlrec.flags |= XLH_UPDATE_SUFFIX_FROM_OLD;
 	if (need_tuple_data)
 	{
-		xlrec.flags |= XLH_UPDATE_CONTAINS_NEW_TUPLE;
-		if (old_key_tuple)
-		{
-			if (reln->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
-				xlrec.flags |= XLH_UPDATE_CONTAINS_OLD_TUPLE;
-			else
-				xlrec.flags |= XLH_UPDATE_CONTAINS_OLD_KEY;
-		}
+			xlrec.flags |= XLH_UPDATE_CONTAINS_NEW_TUPLE;
+			if (old_key_tuple)
+			{
+				if (logical_identity_is_full)
+					xlrec.flags |= XLH_UPDATE_CONTAINS_OLD_TUPLE;
+				else
+					xlrec.flags |= XLH_UPDATE_CONTAINS_OLD_KEY;
+			}
 	}
 
 	/* If new tuple is the single and first tuple on page... */
@@ -9219,7 +9224,7 @@ log_heap_new_cid(Relation relation, HeapTuple tup)
  */
 static HeapTuple
 ExtractReplicaIdentity(Relation relation, HeapTuple tp, bool key_required,
-					   bool *copy)
+					   bool *copy, bool *ri_is_full)
 {
 	TupleDesc	desc = RelationGetDescr(relation);
 	char		replident = relation->rd_rel->relreplident;
@@ -9229,6 +9234,7 @@ ExtractReplicaIdentity(Relation relation, HeapTuple tp, bool key_required,
 	Datum		values[MaxHeapAttributeNumber];
 
 	*copy = false;
+	*ri_is_full = false;
 
 	if (!RelationIsLogicallyLogged(relation))
 		return NULL;
@@ -9236,7 +9242,7 @@ ExtractReplicaIdentity(Relation relation, HeapTuple tp, bool key_required,
 	if (replident == REPLICA_IDENTITY_NOTHING)
 		return NULL;
 
-	if (replident == REPLICA_IDENTITY_FULL)
+	if (logicalrep_identity_is_full(relation))
 	{
 		/*
 		 * When logging the entire old tuple, it very well could contain
@@ -9247,6 +9253,7 @@ ExtractReplicaIdentity(Relation relation, HeapTuple tp, bool key_required,
 			*copy = true;
 			tp = toast_flatten_tuple(tp, desc);
 		}
+		*ri_is_full = true;
 		return tp;
 	}
 
diff --git a/src/backend/catalog/pg_publication.c b/src/backend/catalog/pg_publication.c
index 9a4791c573e..8e17ead4f84 100644
--- a/src/backend/catalog/pg_publication.c
+++ b/src/backend/catalog/pg_publication.c
@@ -1097,6 +1097,7 @@ GetPublication(Oid pubid)
 	pub->pubactions.pubtruncate = pubform->pubtruncate;
 	pub->pubviaroot = pubform->pubviaroot;
 	pub->pubgencols_type = pubform->pubgencols;
+	pub->pubfallbackfull = pubform->pubfallbackfull;
 
 	ReleaseSysCache(tup);
 
diff --git a/src/backend/commands/publicationcmds.c b/src/backend/commands/publicationcmds.c
index fc3a4c19e65..de0245530fe 100644
--- a/src/backend/commands/publicationcmds.c
+++ b/src/backend/commands/publicationcmds.c
@@ -81,13 +81,16 @@ parse_publication_options(ParseState *pstate,
 						  bool *publish_via_partition_root_given,
 						  bool *publish_via_partition_root,
 						  bool *publish_generated_columns_given,
-						  char *publish_generated_columns)
+						  char *publish_generated_columns,
+						  bool *fallbackfull_given,
+						  bool *fallbackfull)
 {
 	ListCell   *lc;
 
 	*publish_given = false;
 	*publish_via_partition_root_given = false;
 	*publish_generated_columns_given = false;
+	*fallbackfull_given = false;
 
 	/* defaults */
 	pubactions->pubinsert = true;
@@ -96,6 +99,7 @@ parse_publication_options(ParseState *pstate,
 	pubactions->pubtruncate = true;
 	*publish_via_partition_root = false;
 	*publish_generated_columns = PUBLISH_GENCOLS_NONE;
+	*fallbackfull = false;
 
 	/* Parse options */
 	foreach(lc, options)
@@ -168,6 +172,13 @@ parse_publication_options(ParseState *pstate,
 			*publish_generated_columns_given = true;
 			*publish_generated_columns = defGetGeneratedColsOption(defel);
 		}
+		else if (strcmp(defel->defname, "fallbackfull") == 0)
+		{
+			if (*fallbackfull_given)
+				errorConflictingDefElem(defel, pstate);
+			*fallbackfull_given = true;
+			*fallbackfull = defGetBoolean(defel);
+		}
 		else
 			ereport(ERROR,
 					(errcode(ERRCODE_SYNTAX_ERROR),
@@ -281,6 +292,7 @@ pub_rf_contains_invalid_column(Oid pubid, Relation relation, List *ancestors,
 	bool		result = false;
 	Datum		rfdatum;
 	bool		rfisnull;
+	Publication *pub;
 
 	/*
 	 * FULL means all columns are in the REPLICA IDENTITY, so all columns are
@@ -289,6 +301,20 @@ pub_rf_contains_invalid_column(Oid pubid, Relation relation, List *ancestors,
 	if (relation->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
 		return false;
 
+	/* 
+	 * If REPLICA INDENTITY is DEFAULT and no replica index exists, see if we 
+	 * should fallback to FULL.
+	 */
+	if (relation->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT && 
+		!OidIsValid(RelationGetReplicaIndex(relation)))
+	{
+		Publication * pub = GetPublication(pubid);
+		if (pub->pubfallbackfull)
+		{
+			return false;	
+		}
+	}
+
 	/*
 	 * For a partition, if pubviaroot is true, find the topmost ancestor that
 	 * is published via this publication as we need to use its row filter
@@ -394,9 +420,12 @@ pub_contains_invalid_column(Oid pubid, Relation relation, List *ancestors,
 	pub = GetPublication(pubid);
 	check_and_fetch_column_list(pub, publish_as_relid, NULL, &columns);
 
-	if (relation->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
+	if (relation->rd_rel->relreplident == REPLICA_IDENTITY_FULL
+		|| (relation->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT &&
+			!OidIsValid(RelationGetReplicaIndex(relation)) && 
+			pub->pubfallbackfull))
 	{
-		/* With REPLICA IDENTITY FULL, no column list is allowed. */
+		/* With REPLICA IDENTITY FULL or fallback to FULL, no column list is allowed. */
 		*invalid_column_list = (columns != NULL);
 
 		/*
@@ -842,6 +871,8 @@ CreatePublication(ParseState *pstate, CreatePublicationStmt *stmt)
 	bool		publish_via_partition_root;
 	bool		publish_generated_columns_given;
 	char		publish_generated_columns;
+	bool		fallbackfull_given;
+	bool		fallbackfull;
 	AclResult	aclresult;
 	List	   *relations = NIL;
 	List	   *schemaidlist = NIL;
@@ -886,11 +917,13 @@ CreatePublication(ParseState *pstate, CreatePublicationStmt *stmt)
 							  &publish_via_partition_root_given,
 							  &publish_via_partition_root,
 							  &publish_generated_columns_given,
-							  &publish_generated_columns);
+							  &publish_generated_columns,
+							  &fallbackfull_given,
+							  &fallbackfull);
 
 	if (stmt->for_all_sequences &&
 		(publish_given || publish_via_partition_root_given ||
-		 publish_generated_columns_given))
+		 publish_generated_columns_given || fallbackfull_given))
 		ereport(NOTICE,
 				errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
 				errmsg("publication parameters are not applicable to sequence synchronization and will be ignored for sequences"));
@@ -914,6 +947,8 @@ CreatePublication(ParseState *pstate, CreatePublicationStmt *stmt)
 		BoolGetDatum(publish_via_partition_root);
 	values[Anum_pg_publication_pubgencols - 1] =
 		CharGetDatum(publish_generated_columns);
+	values[Anum_pg_publication_pubfallbackfull - 1] =
+		BoolGetDatum(fallbackfull);
 
 	tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
 
@@ -1010,6 +1045,8 @@ AlterPublicationOptions(ParseState *pstate, AlterPublicationStmt *stmt,
 	bool		publish_via_partition_root;
 	bool		publish_generated_columns_given;
 	char		publish_generated_columns;
+	bool		fallbackfull_given;
+	bool		fallbackfull;
 	ObjectAddress obj;
 	Form_pg_publication pubform;
 	List	   *root_relids = NIL;
@@ -1023,11 +1060,13 @@ AlterPublicationOptions(ParseState *pstate, AlterPublicationStmt *stmt,
 							  &publish_via_partition_root_given,
 							  &publish_via_partition_root,
 							  &publish_generated_columns_given,
-							  &publish_generated_columns);
+							  &publish_generated_columns,
+							  &fallbackfull_given,
+							  &fallbackfull);
 
 	if (pubform->puballsequences &&
 		(publish_given || publish_via_partition_root_given ||
-		 publish_generated_columns_given))
+		 publish_generated_columns_given || fallbackfull_given))
 		ereport(NOTICE,
 				errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
 				errmsg("publication parameters are not applicable to sequence synchronization and will be ignored for sequences"));
@@ -1143,6 +1182,11 @@ AlterPublicationOptions(ParseState *pstate, AlterPublicationStmt *stmt,
 		values[Anum_pg_publication_pubgencols - 1] = CharGetDatum(publish_generated_columns);
 		replaces[Anum_pg_publication_pubgencols - 1] = true;
 	}
+	if (fallbackfull_given)
+	{
+		values[Anum_pg_publication_pubfallbackfull - 1] = BoolGetDatum(fallbackfull);
+		replaces[Anum_pg_publication_pubfallbackfull - 1] = true;
+	}
 
 	tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
 							replaces);
diff --git a/src/backend/executor/execReplication.c b/src/backend/executor/execReplication.c
index 743b1ee2b28..0605f18e0e9 100644
--- a/src/backend/executor/execReplication.c
+++ b/src/backend/executor/execReplication.c
@@ -1089,7 +1089,7 @@ CheckCmdReplicaIdentity(Relation rel, CmdType cmd)
 		return;
 
 	/* REPLICA IDENTITY FULL is also good for UPDATE/DELETE. */
-	if (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
+	if (logicalrep_identity_is_full(rel))
 		return;
 
 	/*
diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c
index 3950dd0cf46..36fed2c686b 100644
--- a/src/backend/replication/logical/proto.c
+++ b/src/backend/replication/logical/proto.c
@@ -452,6 +452,10 @@ logicalrep_write_update(StringInfo out, TransactionId xid, Relation rel,
 						bool binary, Bitmapset *columns,
 						PublishGencolsType include_gencols_type)
 {
+	ereport(LOG,
+			(errmsg("EVAN: logical replication: send UPDATE for relation \"%s\" (oid %u)",
+					RelationGetRelationName(rel), RelationGetRelid(rel))));
+
 	pq_sendbyte(out, LOGICAL_REP_MSG_UPDATE);
 
 	Assert(rel->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT ||
@@ -534,6 +538,10 @@ logicalrep_write_delete(StringInfo out, TransactionId xid, Relation rel,
 		   rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL ||
 		   rel->rd_rel->relreplident == REPLICA_IDENTITY_INDEX);
 
+	ereport(LOG,
+			(errmsg("EVAN: logical replication: send DELETE for relation \"%s\" (oid %u)",
+					RelationGetRelationName(rel), RelationGetRelid(rel))));
+
 	pq_sendbyte(out, LOGICAL_REP_MSG_DELETE);
 
 	/* transaction ID (if not valid, we're not streaming) */
@@ -666,10 +674,15 @@ logicalrep_write_message(StringInfo out, TransactionId xid, XLogRecPtr lsn,
 void
 logicalrep_write_rel(StringInfo out, TransactionId xid, Relation rel,
 					 Bitmapset *columns,
-					 PublishGencolsType include_gencols_type)
+					 PublishGencolsType include_gencols_type,
+					 bool fallbackfull, uint32 proto_version)
 {
 	char	   *relname;
 
+	ereport(LOG,
+			(errmsg("EVAN: logical replication: send RELATION for \"%s\" (oid %u), fallbackfull=%s",
+					RelationGetRelationName(rel), RelationGetRelid(rel), fallbackfull ? "true" : "false")));
+
 	pq_sendbyte(out, LOGICAL_REP_MSG_RELATION);
 
 	/* transaction ID (if not valid, we're not streaming) */
@@ -687,6 +700,10 @@ logicalrep_write_rel(StringInfo out, TransactionId xid, Relation rel,
 	/* send replica identity */
 	pq_sendbyte(out, rel->rd_rel->relreplident);
 
+	/* send publication fallbackfull flag if supported */
+	if (proto_version >= LOGICALREP_PROTO_FALLBACKFULL_VERSION_NUM)
+		pq_sendbyte(out, fallbackfull ? 1 : 0);
+
 	/* send the attribute info */
 	logicalrep_write_attrs(out, rel, columns, include_gencols_type);
 }
@@ -695,7 +712,7 @@ logicalrep_write_rel(StringInfo out, TransactionId xid, Relation rel,
  * Read the relation info from stream and return as LogicalRepRelation.
  */
 LogicalRepRelation *
-logicalrep_read_rel(StringInfo in)
+logicalrep_read_rel(StringInfo in, uint32 proto_version)
 {
 	LogicalRepRelation *rel = palloc_object(LogicalRepRelation);
 
@@ -708,6 +725,12 @@ logicalrep_read_rel(StringInfo in)
 	/* Read the replica identity. */
 	rel->replident = pq_getmsgbyte(in);
 
+	/* Read publication fallbackfull flag if present. */
+	if (proto_version >= LOGICALREP_PROTO_FALLBACKFULL_VERSION_NUM)
+		rel->fallbackfull = (pq_getmsgbyte(in) != 0);
+	else
+		rel->fallbackfull = false;
+
 	/* relkind is not sent */
 	rel->relkind = 0;
 
diff --git a/src/backend/replication/logical/relation.c b/src/backend/replication/logical/relation.c
index 0b1d80b5b0f..28b43e901f3 100644
--- a/src/backend/replication/logical/relation.c
+++ b/src/backend/replication/logical/relation.c
@@ -21,6 +21,7 @@
 #include "access/genam.h"
 #include "access/table.h"
 #include "catalog/namespace.h"
+#include "catalog/pg_publication.h"
 #include "catalog/pg_subscription_rel.h"
 #include "executor/executor.h"
 #include "nodes/makefuncs.h"
@@ -209,6 +210,7 @@ logicalrep_relmap_update(LogicalRepRelation *remoterel)
 		(remoterel->relkind == 0) ? RELKIND_RELATION : remoterel->relkind;
 
 	entry->remoterel.attkeys = bms_copy(remoterel->attkeys);
+	entry->remoterel.fallbackfull = remoterel->fallbackfull;
 	MemoryContextSwitchTo(oldctx);
 }
 
@@ -326,7 +328,10 @@ logicalrep_rel_mark_updatable(LogicalRepRelMapEntry *entry)
 		 * If no replica identity index and no PK, the published table must
 		 * have replica identity FULL.
 		 */
-		if (idkey == NULL && remoterel->replident != REPLICA_IDENTITY_FULL)
+		if (idkey == NULL &&
+			remoterel->replident != REPLICA_IDENTITY_FULL &&
+			!(remoterel->replident == REPLICA_IDENTITY_DEFAULT &&
+			  remoterel->fallbackfull))
 			entry->updatable = false;
 	}
 
@@ -713,6 +718,7 @@ logicalrep_partition_open(LogicalRepRelMapEntry *root,
 			entry->remoterel.atttyps[i] = remoterel->atttyps[i];
 		}
 		entry->remoterel.replident = remoterel->replident;
+		entry->remoterel.fallbackfull = remoterel->fallbackfull;
 		entry->remoterel.attkeys = bms_copy(remoterel->attkeys);
 	}
 
@@ -777,6 +783,44 @@ logicalrep_partition_open(LogicalRepRelMapEntry *root,
 	return entry;
 }
 
+/*
+ * logicalrep_identity_is_full
+ *
+ * Check whether the replica identity of the relation is full or not.
+ * When a table's replica identity is default, but there is no primary key,
+ * if any publication the relation is in has fallbackfull enabled, we consider
+ * the replica identity as full. This function should only be called on the
+ * publisher.
+ */
+bool
+logicalrep_identity_is_full(Relation relation)
+{
+    Form_pg_class relform = RelationGetForm(relation);
+
+    if (relform->relreplident == REPLICA_IDENTITY_FULL)
+        return true;
+
+    if (relform->relreplident == REPLICA_IDENTITY_DEFAULT &&
+        !OidIsValid(RelationGetReplicaIndex(relation)))
+    {
+		/* relreplident is default, but no primary key, check if we can fallback to full. */	
+		List *pubids = GetRelationPublications(RelationGetRelid(relation));
+		foreach_oid(pubid, pubids)
+		{
+			Publication *pub = GetPublication(pubid);
+
+			if (pub->pubfallbackfull)
+			{
+				list_free(pubids);
+				return true;
+			}
+		}
+		list_free(pubids);
+	}
+
+    return false;
+}
+
 /*
  * Returns the oid of an index that can be used by the apply worker to scan
  * the relation.
@@ -938,7 +982,9 @@ FindLogicalRepLocalIndex(Relation localrel, LogicalRepRelation *remoterel,
 	if (OidIsValid(idxoid))
 		return idxoid;
 
-	if (remoterel->replident == REPLICA_IDENTITY_FULL)
+	if (remoterel->replident == REPLICA_IDENTITY_FULL ||
+		(remoterel->replident == REPLICA_IDENTITY_DEFAULT &&
+		 remoterel->fallbackfull))
 	{
 		/*
 		 * We are looking for one more opportunity for using an index. If
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 32725c48623..ad683eb822d 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -486,6 +486,7 @@ static XLogRecPtr remote_final_lsn = InvalidXLogRecPtr;
 
 /* fields valid only when processing streamed transaction */
 static bool in_streamed_transaction = false;
+static uint32 LogicalRepProtoVersion = LOGICALREP_PROTO_FALLBACKFULL_VERSION_NUM;
 
 static TransactionId stream_xid = InvalidTransactionId;
 
@@ -2567,7 +2568,7 @@ apply_handle_relation(StringInfo s)
 	if (handle_streamed_transaction(LOGICAL_REP_MSG_RELATION, s))
 		return;
 
-	rel = logicalrep_read_rel(s);
+	rel = logicalrep_read_rel(s, LogicalRepProtoVersion);
 	logicalrep_relmap_update(rel);
 
 	/* Also reset all entries in the partition map that refer to remoterel. */
@@ -3050,6 +3051,24 @@ apply_handle_delete(StringInfo s)
 	/* Check if we can do the delete. */
 	check_relation_updatable(rel);
 
+	/*
+	 * Before fallbackfull was added as an option to publication, if a table's
+	 * replica identity is default but without a PK, it cannot be updated, so
+	 * check_relation_updatable() treated unexpected update as error.
+	 *
+	 * However, with fallbackfull, such table can be updated because the table
+	 * might belong to anther publication with fallbackfull enabled. So here
+	 * we just skip the update if the table in current publication is not
+	 * updatable.
+	 */
+	if (!rel->updatable)
+	{
+		/* XXX: should close with NoLock or RowExclusiveLock??? */
+		logicalrep_rel_close(rel, NoLock);
+		end_replication_step();
+		return;
+	}
+
 	/*
 	 * Make sure that any user-supplied code runs as the table owner, unless
 	 * the user has opted out of that behavior.
@@ -3189,7 +3208,8 @@ FindReplTupleInLocalRel(ApplyExecutionData *edata, Relation localrel,
 	*localslot = table_slot_create(localrel, &estate->es_tupleTable);
 
 	Assert(OidIsValid(localidxoid) ||
-		   (remoterel->replident == REPLICA_IDENTITY_FULL));
+		   (remoterel->replident == REPLICA_IDENTITY_FULL) ||
+		   (remoterel->replident == REPLICA_IDENTITY_DEFAULT && remoterel->fallbackfull));
 
 	if (OidIsValid(localidxoid))
 	{
@@ -5523,11 +5543,15 @@ set_stream_options(WalRcvStreamOptions *options,
 
 	server_version = walrcv_server_version(LogRepWorkerWalRcvConn);
 	options->proto.logical.proto_version =
+		server_version >= 190000 ? LOGICALREP_PROTO_FALLBACKFULL_VERSION_NUM :
 		server_version >= 160000 ? LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM :
 		server_version >= 150000 ? LOGICALREP_PROTO_TWOPHASE_VERSION_NUM :
 		server_version >= 140000 ? LOGICALREP_PROTO_STREAM_VERSION_NUM :
 		LOGICALREP_PROTO_VERSION_NUM;
 
+	/* Cache the chosen protocol version for RELATION message parsing. */
+	LogicalRepProtoVersion = options->proto.logical.proto_version;
+
 	options->proto.logical.publication_names = MySubscription->publications;
 	options->proto.logical.binary = MySubscription->binary;
 
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index e016f64e0b3..09344d63a85 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -145,6 +145,19 @@ typedef struct RelationSyncEntry
 	/* are we publishing this rel? */
 	PublicationActions pubactions;
 
+	/* when replica identity is default but no pk, should we fallback to full? */
+	bool		fallbackfull;
+
+	/*
+	 * when replica identity is defaut and no pk and fallbackfull is false, we
+	 * should not send any updates/deletes for this relation. But if the table
+	 * belong to another publication that has falllbackfull enabled, then the
+	 * talbe can still be updated/deleted, thus still has WAL generated, but
+	 * we should skip sending those changes to downstream. This flag is used
+	 * to indicate that.
+	 */
+	bool		block_update_delete;
+
 	/*
 	 * ExprState array for row filter. Different publication actions don't
 	 * allow multiple expressions to always be combined into one, because
@@ -799,6 +812,7 @@ send_relation_and_attrs(Relation relation, TransactionId xid,
 						LogicalDecodingContext *ctx,
 						RelationSyncEntry *relentry)
 {
+	PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
 	TupleDesc	desc = RelationGetDescr(relation);
 	Bitmapset  *columns = relentry->columns;
 	PublishGencolsType include_gencols_type = relentry->include_gencols_type;
@@ -830,7 +844,9 @@ send_relation_and_attrs(Relation relation, TransactionId xid,
 
 	OutputPluginPrepareWrite(ctx, false);
 	logicalrep_write_rel(ctx->out, xid, relation, columns,
-						 include_gencols_type);
+						 include_gencols_type,
+						 relentry->fallbackfull,
+						 data->protocol_version);
 	OutputPluginWrite(ctx, false);
 }
 
@@ -1519,10 +1535,14 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 		case REORDER_BUFFER_CHANGE_UPDATE:
 			if (!relentry->pubactions.pubupdate)
 				return;
+			if (relentry->block_update_delete)
+				return;
 			break;
 		case REORDER_BUFFER_CHANGE_DELETE:
 			if (!relentry->pubactions.pubdelete)
 				return;
+			if (relentry->block_update_delete)
+				return;
 
 			/*
 			 * This is only possible if deletes are allowed even when replica
@@ -2057,6 +2077,7 @@ get_rel_sync_entry(PGOutputData *data, Relation relation)
 	bool		found;
 	MemoryContext oldctx;
 	Oid			relid = RelationGetRelid(relation);
+	Form_pg_class relform = RelationGetForm(relation);
 
 	Assert(RelationSyncCache != NULL);
 
@@ -2075,6 +2096,7 @@ get_rel_sync_entry(PGOutputData *data, Relation relation)
 		entry->streamed_txns = NIL;
 		entry->pubactions.pubinsert = entry->pubactions.pubupdate =
 			entry->pubactions.pubdelete = entry->pubactions.pubtruncate = false;
+		entry->fallbackfull = false;
 		entry->new_slot = NULL;
 		entry->old_slot = NULL;
 		memset(entry->exprstate, 0, sizeof(entry->exprstate));
@@ -2130,6 +2152,7 @@ get_rel_sync_entry(PGOutputData *data, Relation relation)
 		entry->pubactions.pubupdate = false;
 		entry->pubactions.pubdelete = false;
 		entry->pubactions.pubtruncate = false;
+		entry->fallbackfull = false;
 
 		/*
 		 * Tuple slots cleanups. (Will be rebuilt later if needed).
@@ -2267,6 +2290,7 @@ get_rel_sync_entry(PGOutputData *data, Relation relation)
 				entry->pubactions.pubupdate |= pub->pubactions.pubupdate;
 				entry->pubactions.pubdelete |= pub->pubactions.pubdelete;
 				entry->pubactions.pubtruncate |= pub->pubactions.pubtruncate;
+				entry->fallbackfull |= pub->pubfallbackfull;
 
 				/*
 				 * We want to publish the changes as the top-most ancestor
@@ -2302,6 +2326,13 @@ get_rel_sync_entry(PGOutputData *data, Relation relation)
 			}
 		}
 
+		if (!entry->fallbackfull &&
+			relform->relreplident == REPLICA_IDENTITY_DEFAULT &&
+			!OidIsValid(RelationGetReplicaIndex(relation)))
+			entry->block_update_delete = true;
+		else
+			entry->block_update_delete = false;
+
 		entry->publish_as_relid = publish_as_relid;
 
 		/*
diff --git a/src/include/catalog/pg_publication.h b/src/include/catalog/pg_publication.h
index 368becca899..f58418c2131 100644
--- a/src/include/catalog/pg_publication.h
+++ b/src/include/catalog/pg_publication.h
@@ -66,6 +66,9 @@ CATALOG(pg_publication,6104,PublicationRelationId)
 	 * if stored generated column data should be published.
 	 */
 	char		pubgencols;
+
+	/* true if fallbackfull is enabled */
+	bool		pubfallbackfull;
 } FormData_pg_publication;
 
 /* ----------------
@@ -138,6 +141,7 @@ typedef struct Publication
 	bool		allsequences;
 	bool		pubviaroot;
 	PublishGencolsType pubgencols_type;
+	bool		pubfallbackfull;
 	PublicationActions pubactions;
 } Publication;
 
diff --git a/src/include/replication/logicalproto.h b/src/include/replication/logicalproto.h
index 058a955e20c..1898ed2120c 100644
--- a/src/include/replication/logicalproto.h
+++ b/src/include/replication/logicalproto.h
@@ -36,13 +36,17 @@
  * LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM is the minimum protocol version
  * where we support applying large streaming transactions in parallel.
  * Introduced in PG16.
+ *
+ * LOGICALREP_PROTO_FALLBACKFULL_VERSION_NUM is the minimum protocol version
+ * that includes the fallbackfull flag in RELATION messages.
  */
 #define LOGICALREP_PROTO_MIN_VERSION_NUM 1
 #define LOGICALREP_PROTO_VERSION_NUM 1
 #define LOGICALREP_PROTO_STREAM_VERSION_NUM 2
 #define LOGICALREP_PROTO_TWOPHASE_VERSION_NUM 3
 #define LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM 4
-#define LOGICALREP_PROTO_MAX_VERSION_NUM LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM
+#define LOGICALREP_PROTO_FALLBACKFULL_VERSION_NUM 5
+#define LOGICALREP_PROTO_MAX_VERSION_NUM LOGICALREP_PROTO_FALLBACKFULL_VERSION_NUM
 
 /*
  * Logical message types
@@ -111,6 +115,7 @@ typedef struct LogicalRepRelation
 	char	  **attnames;		/* column names */
 	Oid		   *atttyps;		/* column types */
 	char		replident;		/* replica identity */
+	bool		fallbackfull;	/* publication fallback to full identity */
 	char		relkind;		/* remote relation kind */
 	Bitmapset  *attkeys;		/* Bitmap of key columns */
 } LogicalRepRelation;
@@ -250,8 +255,10 @@ extern void logicalrep_write_message(StringInfo out, TransactionId xid, XLogRecP
 									 bool transactional, const char *prefix, Size sz, const char *message);
 extern void logicalrep_write_rel(StringInfo out, TransactionId xid,
 								 Relation rel, Bitmapset *columns,
-								 PublishGencolsType include_gencols_type);
-extern LogicalRepRelation *logicalrep_read_rel(StringInfo in);
+								 PublishGencolsType include_gencols_type,
+								 bool fallbackfull, uint32 proto_version);
+extern LogicalRepRelation *logicalrep_read_rel(StringInfo in,
+											   uint32 proto_version);
 extern void logicalrep_write_typ(StringInfo out, TransactionId xid,
 								 Oid typoid);
 extern void logicalrep_read_typ(StringInfo in, LogicalRepTyp *ltyp);
diff --git a/src/include/replication/logicalrelation.h b/src/include/replication/logicalrelation.h
index efe0f9d6031..a4f5ecfddc5 100644
--- a/src/include/replication/logicalrelation.h
+++ b/src/include/replication/logicalrelation.h
@@ -48,6 +48,7 @@ extern LogicalRepRelMapEntry *logicalrep_partition_open(LogicalRepRelMapEntry *r
 														Relation partrel, AttrMap *map);
 extern void logicalrep_rel_close(LogicalRepRelMapEntry *rel,
 								 LOCKMODE lockmode);
+extern bool logicalrep_identity_is_full(Relation relation);
 extern bool IsIndexUsableForReplicaIdentityFull(Relation idxrel, AttrMap *attrmap);
 extern Oid	GetRelationIdentityOrPK(Relation rel);
 
-- 
2.50.1 (Apple Git-155)

