diff --git a/src/backend/access/zheap/zheapam.c b/src/backend/access/zheap/zheapam.c
index ee0b7f0c43..bf261c37d5 100644
--- a/src/backend/access/zheap/zheapam.c
+++ b/src/backend/access/zheap/zheapam.c
@@ -116,6 +116,8 @@ static void zheap_lock_tuple_guts(Relation rel, Buffer buf, ZHeapTuple zhtup,
 					  TransactionId single_locker_xid, int single_locker_trans_slot,
 					  UndoRecPtr prev_urecptr, CommandId cid,
 					  bool any_multi_locker_member_alive);
+static ZHeapTuple ZExtractReplicaIdentity(Relation relation, ZHeapTuple tp,
+						bool key_changed, bool *copy);
 static void compute_new_xid_infomask(ZHeapTuple zhtup, Buffer buf,
 						 TransactionId tup_xid, int tup_trans_slot,
 						 uint16 old_infomask, TransactionId add_to_xid,
@@ -1265,6 +1267,7 @@ zheap_delete(Relation relation, ItemPointer tid,
 	CommandId		tup_cid;
 	ItemId		lp;
 	ZHeapTupleData zheaptup;
+	ZHeapTuple	old_key_tuple = NULL;	/* replica identity of the tuple */
 	UnpackedUndoRecord	undorecord;
 	Page		page;
 	BlockNumber blkno;
@@ -1286,6 +1289,7 @@ zheap_delete(Relation relation, ItemPointer tid,
 	bool		lock_reacquired;
 	bool		hasSubXactLock = false;
 	bool		hasPayload = false;
+	bool		old_key_copied = false;
 	xl_undolog_meta undometa;
 	uint8		vm_status;
 
@@ -1958,6 +1962,13 @@ zheap_tuple_updated:
 	vm_status = visibilitymap_get_status(relation,
 								BufferGetBlockNumber(buffer), &vmbuffer);
 
+	/*
+	 * Compute replica identity tuple before entering the critical section so
+	 * we don't PANIC upon a memory allocation failure.
+	 */
+	old_key_tuple = ZExtractReplicaIdentity(relation, &zheaptup, true,
+											&old_key_copied);
+
 	START_CRIT_SECTION();
 
 	/*
@@ -2011,6 +2022,7 @@ zheap_tuple_updated:
 		XLogRecPtr	RedoRecPtr;
 		uint32		totalundotuplen = 0;
 		Size		dataoff;
+		int			bufflags = 0;
 		bool		doPageWrites;
 
 		/*
@@ -2031,6 +2043,15 @@ zheap_tuple_updated:
 			xlrec.flags |= XLZ_DELETE_IS_PARTITION_MOVE;
 		if (hasSubXactLock)
 			xlrec.flags |= XLZ_DELETE_CONTAINS_SUBXACT;
+		if (old_key_tuple != NULL)
+		{
+			bufflags |= REGBUF_KEEP_DATA;
+
+			if (relation->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
+				xlrec.flags |= XLZ_DELETE_CONTAINS_OLD_TUPLE;
+			else
+				xlrec.flags |= XLZ_DELETE_CONTAINS_OLD_KEY;
+		}
 
 		/*
 		 * If full_page_writes is enabled, and the buffer image is not
@@ -2076,7 +2097,27 @@ prepare_xlog:
 							totalundotuplen - SizeofZHeapTupleHeader);
 		}
 
-		XLogRegisterBuffer(0, buffer, REGBUF_STANDARD);
+		XLogRegisterBuffer(0, buffer, REGBUF_STANDARD | bufflags);
+
+		/*
+		 * Log replica identity of the deleted tuple if there is one
+		 */
+		if (old_key_tuple != NULL)
+		{
+			xl_zheap_header xlzhdr;
+
+			xlzhdr.t_infomask2 = old_key_tuple->t_data->t_infomask2;
+			xlzhdr.t_infomask = old_key_tuple->t_data->t_infomask;
+			xlzhdr.t_hoff = old_key_tuple->t_data->t_hoff;
+
+			XLogRegisterBufData(0, (char *) &xlzhdr, SizeOfZHeapHeader);
+			XLogRegisterBufData(0,
+								(char *) old_key_tuple->t_data +
+								SizeofZHeapTupleHeader,
+								old_key_tuple->t_len -
+								SizeofZHeapTupleHeader);
+		}
+
 		if (trans_slot_id > ZHEAP_PAGE_TRANS_SLOTS)
 			(void) RegisterTPDBuffer(page, 1);
 		RegisterUndoLogBuffers(2);
@@ -2135,6 +2176,9 @@ prepare_xlog:
 	if (have_tuple_lock)
 		UnlockTupleTuplock(relation, &(zheaptup.t_self), LockTupleExclusive);
 
+	if (old_key_tuple != NULL && old_key_copied)
+		zheap_freetuple(old_key_tuple);
+
 	pgstat_count_heap_delete(relation);
 
 	return HeapTupleMayBeUpdated;
@@ -5886,6 +5930,102 @@ prepare_xlog:
 	UnlockReleaseTPDBuffers();
 }
 
+/*
+ * Build a zheap tuple representing the configured REPLICA IDENTITY to represent
+ * the old tuple in a UPDATE or DELETE.
+ *
+ * Returns NULL if there's no need to log an identity or if there's no suitable
+ * key in the Relation relation.
+ */
+static ZHeapTuple
+ZExtractReplicaIdentity(Relation relation, ZHeapTuple tp, bool key_changed,
+						bool *copy)
+{
+	TupleDesc	desc = RelationGetDescr(relation);
+	Oid			replidindex;
+	Relation	idx_rel;
+	char		replident = relation->rd_rel->relreplident;
+	ZHeapTuple	key_tuple = NULL;
+	bool		nulls[MaxHeapAttributeNumber];
+	Datum		values[MaxHeapAttributeNumber];
+	int			natt;
+
+	*copy = false;
+
+	if (!RelationIsLogicallyLogged(relation))
+		return NULL;
+
+	if (replident == REPLICA_IDENTITY_NOTHING)
+		return NULL;
+
+	if (replident == REPLICA_IDENTITY_FULL)
+	{
+		/*
+		 * When logging the entire old tuple, it very well could contain
+		 * toasted columns. If so, force them to be inlined.
+		 */
+		if (ZHeapTupleHasExternal(tp))
+		{
+			elog(ERROR, "toast tables are not supported with replica identity");
+		}
+		return tp;
+	}
+
+	/* if the key hasn't changed and we're only logging the key, we're done */
+	if (!key_changed)
+		return NULL;
+
+	/* find the replica identity index */
+	replidindex = RelationGetReplicaIndex(relation);
+	if (!OidIsValid(replidindex))
+	{
+		elog(DEBUG4, "could not find configured replica identity for table \"%s\"",
+			 RelationGetRelationName(relation));
+		return NULL;
+	}
+
+	idx_rel = RelationIdGetRelation(replidindex);
+
+	Assert(CheckRelationLockedByMe(idx_rel, AccessShareLock, true));
+
+	/* deform tuple, so we have fast access to columns */
+	zheap_deform_tuple(tp, desc, values, nulls);
+
+	/* set all columns to NULL, regardless of whether they actually are */
+	memset(nulls, 1, sizeof(nulls));
+
+	/*
+	 * Now set all columns contained in the index to NOT NULL, they cannot
+	 * currently be NULL.
+	 */
+	for (natt = 0; natt < IndexRelationGetNumberOfKeyAttributes(idx_rel); natt++)
+	{
+		int			attno = idx_rel->rd_index->indkey.values[natt];
+
+		if (attno < 0)
+			elog(ERROR, "system column in index");
+		nulls[attno - 1] = false;
+	}
+
+	key_tuple = zheap_form_tuple(desc, values, nulls);
+	*copy = true;
+	RelationClose(idx_rel);
+
+	/*
+	 * If the tuple, which by here only contains indexed columns, still has
+	 * toasted columns, force them to be inlined. This is somewhat unlikely
+	 * since there's limits on the size of indexed columns, so we don't
+	 * duplicate toast_flatten_tuple()s functionality in the above loop over
+	 * the indexed columns, even if it would be more efficient.
+	 */
+	if (ZHeapTupleHasExternal(key_tuple))
+	{
+		elog(ERROR, "toast tables are not supported with replica identity");
+	}
+
+	return key_tuple;
+}
+
 /*
  * compute_new_xid_infomask - Given the old values of tuple header's infomask,
  * compute the new values for tuple header which includes lock mode, new
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index bafbbed50e..54800defc4 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -34,6 +34,10 @@
 #include "access/xlogutils.h"
 #include "access/xlogreader.h"
 #include "access/xlogrecord.h"
+#include "access/zheap.h"
+#include "access/zheapam_xlog.h"
+#include "access/zheaputils.h"
+#include "access/zhtup.h"
 
 #include "catalog/pg_control.h"
 
@@ -45,6 +49,8 @@
 #include "replication/snapbuild.h"
 
 #include "storage/standby.h"
+#include "utils/rel.h"
+#include "utils/relfilenodemap.h"
 
 typedef struct XLogRecordBuffer
 {
@@ -57,6 +63,7 @@ typedef struct XLogRecordBuffer
 static void DecodeXLogOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
 static void DecodeHeapOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
 static void DecodeHeap2Op(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
+static void DecodeZHeapOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
 static void DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
 static void DecodeStandbyOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
 static void DecodeLogicalMsgOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
@@ -74,6 +81,11 @@ static void DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
 static void DecodeAbort(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
 			xl_xact_parsed_abort *parsed, TransactionId xid);
 
+/* record handlers for zheap */
+static void DecodeZInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
+static void DecodeZDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
+static ZHeapTuple DecodeXLogZTuple(char *data, Size len);
+
 /* common function to decode tuples */
 static void DecodeXLogTuple(char *data, Size len, ReorderBufferTupleBuf *tup);
 
@@ -161,7 +173,7 @@ LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *recor
 			break;
 		case RM_ZHEAP_ID:
 			/* Logical decoding is not yet implemented for zheap. */
-			Assert(0);
+			DecodeZHeapOp(ctx, &buf);
 			break;
 		case RM_ZHEAP2_ID:
 			/* Logical decoding is not yet implemented for zheap. */
@@ -510,6 +522,48 @@ DecodeHeapOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 	}
 }
 
+/*
+ * Handle rmgr ZHEAP_ID records for DecodeRecordIntoReorderBuffer().
+ */
+static void
+DecodeZHeapOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
+{
+	uint8		info = XLogRecGetInfo(buf->record) & XLOG_ZHEAP_OPMASK;
+	TransactionId xid = XLogRecGetXid(buf->record);
+	SnapBuild  *builder = ctx->snapshot_builder;
+
+	ReorderBufferProcessXid(ctx->reorder, xid, buf->origptr);
+
+	/*
+	 * If we don't have snapshot or we are just fast-forwarding, there is no
+	 * point in decoding data changes.
+	 */
+	if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT ||
+		ctx->fast_forward)
+		return;
+
+	switch (info)
+	{
+		case XLOG_ZHEAP_INSERT:
+			if (SnapBuildProcessChange(builder, xid, buf->origptr))
+				DecodeZInsert(ctx, buf);
+			break;
+
+		case XLOG_ZHEAP_DELETE:
+			if (SnapBuildProcessChange(builder, xid, buf->origptr))
+				DecodeZDelete(ctx, buf);
+			break;
+
+		case XLOG_ZHEAP_LOCK:
+			/* we don't care about row level locks for now */
+			break;
+
+		default:
+			elog(ERROR, "unexpected RM_ZHEAP_ID record type: %u", info);
+			break;
+	}
+}
+
 static inline bool
 FilterByOrigin(LogicalDecodingContext *ctx, RepOriginId origin_id)
 {
@@ -1068,3 +1122,212 @@ DecodeXLogTuple(char *data, Size len, ReorderBufferTupleBuf *tuple)
 	header->t_infomask2 = xlhdr.t_infomask2;
 	header->t_hoff = xlhdr.t_hoff;
 }
+
+/*
+ * Parse XLOG_ZHEAP_INSERT (not ZMULTI_INSERT!) records into tuplebufs.
+ *
+ * Here we retrieve zheap tuple, convert it to heap tuple format so
+ * reorder buffer stream can understand the tuple format.
+ */
+static void
+DecodeZInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
+{
+	Size		datalen;
+	char	   *tupledata;
+	XLogReaderState *r = buf->record;
+	xl_zheap_insert *xlrec;
+	ReorderBufferChange *change;
+	RelFileNode target_node;
+	Relation	relation = NULL;
+	Oid			reloid;
+	ZHeapTuple	zhtup;
+	HeapTuple	htup;
+
+	xlrec = (xl_zheap_insert *) ((char *) XLogRecGetData(r) + SizeOfUndoHeader);
+
+	/*
+	 * Ignore insert records without new tuples (this does happen when
+	 * raw_zheap_insert marks the TOAST record as HEAP_INSERT_NO_LOGICAL).
+	 */
+	if (!(xlrec->flags & XLZ_INSERT_CONTAINS_NEW_TUPLE))
+		return;
+
+	/* only interested in our database */
+	XLogRecGetBlockTag(r, 0, &target_node, NULL, NULL);
+	if (target_node.dbNode != ctx->slot->data.database)
+		return;
+
+	/* output plugin doesn't look for this origin, no need to queue */
+	if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
+		return;
+
+	change = ReorderBufferGetChange(ctx->reorder);
+	if (!(xlrec->flags & XLZ_INSERT_IS_SPECULATIVE))
+		change->action = REORDER_BUFFER_CHANGE_INSERT;
+	else
+		change->action = REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT;
+	change->origin_id = XLogRecGetOrigin(r);
+
+	memcpy(&change->data.tp.relnode, &target_node, sizeof(RelFileNode));
+
+	tupledata = XLogRecGetBlockData(r, 0, &datalen);
+
+	/*
+	 * Get the zheap tuple from WAL, convert it to heap tuple and store the
+	 * same as change stream.
+	 */
+	zhtup = DecodeXLogZTuple(tupledata, datalen);
+	reloid = RelidByRelfilenode(change->data.tp.relnode.spcNode,
+								change->data.tp.relnode.relNode);
+	relation = RelationIdGetRelation(reloid);
+	htup = zheap_to_heap(zhtup, RelationGetDescr(relation));
+
+	change->data.tp.newtuple =
+		ReorderBufferGetTupleBuf(ctx->reorder,
+								 htup->t_len - SizeofHeapTupleHeader);
+	change->data.tp.newtuple->tuple.t_len = htup->t_len;
+	change->data.tp.newtuple->tuple.t_self = htup->t_self;
+	change->data.tp.newtuple->tuple.t_tableOid = htup->t_tableOid;
+	memcpy((char *) change->data.tp.newtuple->tuple.t_data,
+		   (char *) htup->t_data,
+		   htup->t_len);
+
+	/* be tidy */
+	pfree(zhtup);
+	pfree(htup);
+
+	if (relation != NULL)
+	{
+		RelationClose(relation);
+		relation = NULL;
+	}
+
+	change->data.tp.clear_toast_afterwards = true;
+
+	ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr, change);
+}
+
+/*
+ * Parse XLOG_ZHEAP_DELETE from wal into proper tuplebufs.
+ *
+ * Deletes can possibly contain the old primary key.
+ */
+static void
+DecodeZDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
+{
+	XLogReaderState *r = buf->record;
+	xl_zheap_delete *xlrec;
+	ReorderBufferChange *change;
+	RelFileNode target_node;
+
+	xlrec = (xl_zheap_delete *) ((char *) XLogRecGetData(r) + SizeOfUndoHeader);
+
+	/* only interested in our database */
+	XLogRecGetBlockTag(r, 0, &target_node, NULL, NULL);
+	if (target_node.dbNode != ctx->slot->data.database)
+		return;
+
+	/* output plugin doesn't look for this origin, no need to queue */
+	if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
+		return;
+
+	change = ReorderBufferGetChange(ctx->reorder);
+	change->action = REORDER_BUFFER_CHANGE_DELETE;
+	change->origin_id = XLogRecGetOrigin(r);
+
+	memcpy(&change->data.tp.relnode, &target_node, sizeof(RelFileNode));
+
+	/* old primary key stored */
+	if (xlrec->flags & XLZ_DELETE_CONTAINS_OLD)
+	{
+		Relation	relation = NULL;
+		ZHeapTuple	zhtup;
+		HeapTuple	htup;
+		char	*tupledata;
+		Oid		reloid;
+		Size	datalen;
+
+		tupledata = XLogRecGetBlockData(r, 0, &datalen);
+
+		/*
+		 * Get the zheap tuple from WAL, convert it to heap tuple and store the
+		 * same as change stream.
+		 */
+		zhtup = DecodeXLogZTuple(tupledata, datalen);
+		reloid = RelidByRelfilenode(change->data.tp.relnode.spcNode,
+									change->data.tp.relnode.relNode);
+		relation = RelationIdGetRelation(reloid);
+		htup = zheap_to_heap(zhtup, RelationGetDescr(relation));
+
+		change->data.tp.oldtuple =
+			ReorderBufferGetTupleBuf(ctx->reorder,
+				htup->t_len - SizeofHeapTupleHeader);
+		change->data.tp.oldtuple->tuple.t_len = htup->t_len;
+		change->data.tp.oldtuple->tuple.t_self = htup->t_self;
+		change->data.tp.oldtuple->tuple.t_tableOid = htup->t_tableOid;
+		memcpy((char *) change->data.tp.oldtuple->tuple.t_data,
+			   (char *) htup->t_data,
+			   htup->t_len);
+
+		/* be tidy */
+		pfree(zhtup);
+		pfree(htup);
+
+		if (relation != NULL)
+		{
+			RelationClose(relation);
+			relation = NULL;
+		}
+	}
+
+	change->data.tp.clear_toast_afterwards = true;
+
+	ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr, change);
+}
+
+/*
+ * Read a ZHeapTuple as WAL logged by zheap_insert, zheap_update and
+ * zheap_delete (but not by zheap_multi_insert) and for in-memory tuple.
+ *
+ * The size 'len' and the pointer 'data' in the record need to be
+ * computed outside as they are record specific.
+ *
+ * The caller is responsible to free the memory for tuple allocated by
+ * this function.
+ */
+static ZHeapTuple
+DecodeXLogZTuple(char *data, Size len)
+{
+	ZHeapTuple	zhtup;
+	xl_zheap_header xlhdr;
+	int			datalen = len - SizeOfZHeapHeader;
+	int			tuplelen = datalen + SizeofZHeapTupleHeader;
+	ZHeapTupleHeader header;
+
+	Assert(datalen >= 0);
+
+	zhtup = palloc(tuplelen + ZHEAPTUPLESIZE);
+	header = zhtup->t_data = (ZHeapTupleHeader)((char *) zhtup + ZHEAPTUPLESIZE);
+
+	zhtup->t_len = tuplelen;
+	/* not a disk based tuple */
+	ItemPointerSetInvalid(&zhtup->t_self);
+
+	/* we can only figure this out after reassembling the transactions */
+	zhtup->t_tableOid = InvalidOid;
+
+	/* data is not stored aligned, copy to aligned storage */
+	memcpy((char *) &xlhdr, data, SizeOfZHeapHeader);
+
+	memset(header, 0, SizeofZHeapTupleHeader);
+
+	memcpy(((char *) zhtup->t_data) + SizeofZHeapTupleHeader,
+		   data + SizeOfZHeapHeader,
+		   datalen);
+
+	header->t_infomask = xlhdr.t_infomask;
+	header->t_infomask2 = xlhdr.t_infomask2;
+	header->t_hoff = xlhdr.t_hoff;
+
+	return zhtup;
+}
diff --git a/src/include/access/zheapam_xlog.h b/src/include/access/zheapam_xlog.h
index 6d031dcaa4..1654718263 100644
--- a/src/include/access/zheapam_xlog.h
+++ b/src/include/access/zheapam_xlog.h
@@ -124,6 +124,12 @@ typedef struct xl_zheap_insert
 #define XLZ_DELETE_CONTAINS_TPD_SLOT			(1<<2)
 #define XLZ_DELETE_CONTAINS_SUBXACT				(1<<3)
 #define XLZ_DELETE_IS_PARTITION_MOVE			(1<<4)
+#define XLZ_DELETE_CONTAINS_OLD_TUPLE			(1<<5)
+#define XLZ_DELETE_CONTAINS_OLD_KEY				(1<<6)
+
+/* convenience macro for checking whether any form of old tuple was logged */
+#define XLZ_DELETE_CONTAINS_OLD						\
+	(XLZ_DELETE_CONTAINS_OLD_TUPLE | XLZ_DELETE_CONTAINS_OLD_KEY)
 
 /* This is what we need to know about delete */
 typedef struct xl_zheap_delete
