diff --git a/src/backend/access/zheap/zheapam.c b/src/backend/access/zheap/zheapam.c
index 14bcb34..c5870d8 100644
--- a/src/backend/access/zheap/zheapam.c
+++ b/src/backend/access/zheap/zheapam.c
@@ -56,6 +56,7 @@
 #include "access/zheapam_xlog.h"
 #include "access/zheap.h"
 #include "access/zheapscan.h"
+#include "access/zheaputils.h"
 #include "access/zmultilocker.h"
 #include "catalog/catalog.h"
 #include "executor/tuptable.h"
@@ -88,10 +89,10 @@ static void log_zheap_update(Relation reln, UnpackedUndoRecord undorecord,
 					UnpackedUndoRecord newundorecord, UndoRecPtr urecptr,
 					UndoRecPtr newurecptr, Buffer oldbuf, Buffer newbuf,
 					ZHeapTuple oldtup, ZHeapTuple newtup,
-					int old_tup_trans_slot_id, int trans_slot_id,
-					int new_trans_slot_id, bool inplace_update,
-					bool all_visible_cleared, bool new_all_visible_cleared,
-					xl_undolog_meta *undometa);
+					ZHeapTuple old_key_tuple, int old_tup_trans_slot_id,
+					int trans_slot_id, int new_trans_slot_id,
+					bool inplace_update, bool all_visible_cleared,
+					bool new_all_visible_cleared, xl_undolog_meta *undometa);
 static HTSU_Result
 zheap_lock_updated_tuple(Relation rel, ZHeapTuple tuple, ItemPointer ctid,
 						 TransactionId xid, LockTupleMode mode, LockOper lockopr,
@@ -103,6 +104,8 @@ static void zheap_lock_tuple_guts(Relation rel, Buffer buf, ZHeapTuple zhtup,
 					  TransactionId single_locker_xid, int single_locker_trans_slot,
 					  UndoRecPtr prev_urecptr, CommandId cid,
 					  bool any_multi_locker_member_alive);
+static ZHeapTuple ZExtractReplicaIdentity(Relation relation, ZHeapTuple tp,
+						bool key_changed, bool *copy);
 static void compute_new_xid_infomask(ZHeapTuple zhtup, Buffer buf,
 						 TransactionId tup_xid, int tup_trans_slot,
 						 uint16 old_infomask, TransactionId add_to_xid,
@@ -1253,6 +1256,7 @@ zheap_delete(Relation relation, ItemPointer tid,
 	CommandId		tup_cid;
 	ItemId		lp;
 	ZHeapTupleData zheaptup;
+	ZHeapTuple	old_key_tuple = NULL;	/* replica identity of the tuple */
 	UnpackedUndoRecord	undorecord;
 	Page		page;
 	BlockNumber blkno;
@@ -1274,6 +1278,7 @@ zheap_delete(Relation relation, ItemPointer tid,
 	bool		lock_reacquired;
 	bool		hasSubXactLock = false;
 	bool		hasPayload = false;
+	bool		old_key_copied = false;
 	xl_undolog_meta undometa;
 	uint8		vm_status;
 
@@ -1956,6 +1961,13 @@ zheap_tuple_updated:
 	vm_status = visibilitymap_get_status(relation,
 								BufferGetBlockNumber(buffer), &vmbuffer);
 
+	/*
+	 * Compute replica identity tuple before entering the critical section so
+	 * we don't PANIC upon a memory allocation failure.
+	 */
+	old_key_tuple = ZExtractReplicaIdentity(relation, &zheaptup, true,
+											&old_key_copied);
+
 	START_CRIT_SECTION();
 
 	if ((vm_status & VISIBILITYMAP_ALL_VISIBLE) ||
@@ -2000,6 +2012,7 @@ zheap_tuple_updated:
 		XLogRecPtr	RedoRecPtr;
 		uint32		totalundotuplen = 0;
 		Size		dataoff;
+		int			bufflags = 0;
 		bool		doPageWrites;
 
 		/*
@@ -2020,6 +2033,15 @@ zheap_tuple_updated:
 			xlrec.flags |= XLZ_DELETE_IS_PARTITION_MOVE;
 		if (hasSubXactLock)
 			xlrec.flags |= XLZ_DELETE_CONTAINS_SUBXACT;
+		if (old_key_tuple != NULL)
+		{
+			bufflags |= REGBUF_KEEP_DATA;
+
+			if (relation->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
+				xlrec.flags |= XLZ_DELETE_CONTAINS_OLD_TUPLE;
+			else
+				xlrec.flags |= XLZ_DELETE_CONTAINS_OLD_KEY;
+		}
 
 		/*
 		 * If full_page_writes is enabled, and the buffer image is not
@@ -2065,7 +2087,27 @@ prepare_xlog:
 							totalundotuplen - SizeofZHeapTupleHeader);
 		}
 
-		XLogRegisterBuffer(0, buffer, REGBUF_STANDARD);
+		XLogRegisterBuffer(0, buffer, REGBUF_STANDARD | bufflags);
+
+		/*
+		 * Log replica identity of the deleted tuple if there is one
+		 */
+		if (old_key_tuple != NULL)
+		{
+			xl_zheap_header xlzhdr;
+
+			xlzhdr.t_infomask2 = old_key_tuple->t_data->t_infomask2;
+			xlzhdr.t_infomask = old_key_tuple->t_data->t_infomask;
+			xlzhdr.t_hoff = old_key_tuple->t_data->t_hoff;
+
+			XLogRegisterBufData(0, (char *) &xlzhdr, SizeOfZHeapHeader);
+			XLogRegisterBufData(0,
+								(char *) old_key_tuple->t_data +
+								SizeofZHeapTupleHeader,
+								old_key_tuple->t_len -
+								SizeofZHeapTupleHeader);
+		}
+
 		if (trans_slot_id > ZHEAP_PAGE_TRANS_SLOTS)
 			(void) RegisterTPDBuffer(page, 1);
 		RegisterUndoLogBuffers(2);
@@ -2124,6 +2166,9 @@ prepare_xlog:
 	if (have_tuple_lock)
 		UnlockTupleTuplock(relation, &(zheaptup.t_self), LockTupleExclusive);
 
+	if (old_key_tuple != NULL && old_key_copied)
+		zheap_freetuple(old_key_tuple);
+
 	pgstat_count_heap_delete(relation);
 
 	return HeapTupleMayBeUpdated;
@@ -2162,6 +2207,7 @@ zheap_update(Relation relation, ItemPointer otid, ZHeapTuple newtup,
 	ItemId		lp;
 	ZHeapTupleData oldtup;
 	ZHeapTuple	zheaptup;
+	ZHeapTuple	old_key_tuple = NULL;	/* replica identity of the tuple */
 	UndoRecPtr	urecptr, prev_urecptr, new_prev_urecptr;
 	UndoRecPtr	new_urecptr = InvalidUndoRecPtr;
 	UnpackedUndoRecord	undorecord, new_undorecord;
@@ -2199,6 +2245,7 @@ zheap_update(Relation relation, ItemPointer otid, ZHeapTuple newtup,
 	bool		lock_reacquired;
 	bool		need_toast;
 	bool		hasSubXactLock = false;
+	bool		old_key_copied = false;
 	xl_undolog_meta	undometa;
 	uint8		vm_status;
 	uint8		vm_status_new = 0;
@@ -3650,6 +3697,15 @@ reacquire_buffer:
 	 */
 	XLogEnsureRecordSpace(8, 0);
 
+	/*
+	 * Compute replica identity tuple before entering the critical section so
+	 * we don't PANIC upon a memory allocation failure.
+	 */
+	old_key_tuple = ZExtractReplicaIdentity(relation, &oldtup, true,
+											&old_key_copied);
+	if (old_key_tuple != NULL && !old_key_copied)
+		old_key_tuple = zheap_copytuple(old_key_tuple);
+
 	START_CRIT_SECTION();
 
 	if ((vm_status & VISIBILITYMAP_ALL_VISIBLE) ||
@@ -3806,15 +3862,18 @@ reacquire_buffer:
 
 		log_zheap_update(relation, undorecord, new_undorecord,
 						 urecptr, new_urecptr, buffer, newbuf,
-						 &oldtup, zheaptup, tup_trans_slot_id,
-						 trans_slot_id, new_trans_slot_id,
-						 use_inplace_update, all_visible_cleared,
-						 new_all_visible_cleared, &undometa);
+						 &oldtup, zheaptup, old_key_tuple,
+						 tup_trans_slot_id, trans_slot_id,
+						 new_trans_slot_id, use_inplace_update,
+						 all_visible_cleared, new_all_visible_cleared,
+						 &undometa);
 	}
 
 	END_CRIT_SECTION();
 
 	/* be tidy */
+	if (old_key_tuple != NULL)
+		zheap_freetuple(old_key_tuple);
 	pfree(undorecord.uur_tuple.data);
 	if (undorecord.uur_payload.len > 0)
 		pfree(undorecord.uur_payload.data);
@@ -3897,10 +3956,10 @@ log_zheap_update(Relation reln, UnpackedUndoRecord undorecord,
 				 UnpackedUndoRecord newundorecord, UndoRecPtr urecptr,
 				 UndoRecPtr newurecptr, Buffer oldbuf, Buffer newbuf,
 				 ZHeapTuple oldtup, ZHeapTuple newtup,
-				 int old_tup_trans_slot_id, int trans_slot_id,
-				 int new_trans_slot_id, bool inplace_update,
-				 bool all_visible_cleared, bool new_all_visible_cleared,
-				 xl_undolog_meta *undometa)
+				 ZHeapTuple old_key_tuple, int old_tup_trans_slot_id,
+				 int trans_slot_id, int new_trans_slot_id,
+				 bool inplace_update, bool all_visible_cleared,
+				 bool new_all_visible_cleared, xl_undolog_meta *undometa)
 {
 	xl_undo_header	xlundohdr,
 					xlnewundohdr;
@@ -3915,6 +3974,7 @@ log_zheap_update(Relation reln, UnpackedUndoRecord undorecord,
 	XLogRecPtr	recptr;
 	XLogRecPtr	RedoRecPtr;
 	bool		doPageWrites;
+	bool		need_tuple_data = RelationIsLogicallyLogged(reln);
 	char	*oldp = NULL;
 	char	*newp = NULL;
 	int		oldlen, newlen;
@@ -3958,7 +4018,8 @@ log_zheap_update(Relation reln, UnpackedUndoRecord undorecord,
 	 * See log_heap_update to know under what some circumstances we can use
 	 * prefix-suffix compression.
 	 */
-	if (oldbuf == newbuf && !XLogCheckBufferNeedsBackup(newbuf))
+	if (oldbuf == newbuf && !need_tuple_data &&
+		!XLogCheckBufferNeedsBackup(newbuf))
 	{
 		Assert(oldp != NULL && newp != NULL);
 
@@ -4010,6 +4071,17 @@ log_zheap_update(Relation reln, UnpackedUndoRecord undorecord,
 		xlrec.flags |= XLZ_UPDATE_SUFFIX_FROM_OLD;
 	if (undorecord.uur_info & UREC_INFO_PAYLOAD_CONTAINS_SUBXACT)
 		xlrec.flags |= XLZ_UPDATE_CONTAINS_SUBXACT;
+	if (need_tuple_data)
+	{
+		xlrec.flags |= XLZ_UPDATE_CONTAINS_NEW_TUPLE;
+		if (old_key_tuple)
+		{
+			if (reln->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
+				xlrec.flags |= XLZ_UPDATE_CONTAINS_OLD_TUPLE;
+			else
+				xlrec.flags |= XLZ_UPDATE_CONTAINS_OLD_KEY;
+		}
+	}
 
 	if (!inplace_update)
 	{
@@ -4079,6 +4151,9 @@ prepare_xlog:
 						 totalundotuplen - SizeofZHeapTupleHeader);
 	}
 
+	if (need_tuple_data)
+		bufflags |= REGBUF_KEEP_DATA;
+
 	XLogRegisterBuffer(0, newbuf, bufflags);
 	if (oldbuf != newbuf)
 	{
@@ -4165,6 +4240,23 @@ prepare_xlog:
 	/* filtering by origin on a row level is much more efficient */
 	XLogSetRecordFlags(XLOG_INCLUDE_ORIGIN);
 
+	/* We need to log a tuple identity */
+	if (need_tuple_data && old_key_tuple &&
+		!(xlrec.flags & XLZ_HAS_UPDATE_UNDOTUPLE))
+	{
+		xl_zheap_header xlzhdr;
+
+		xlzhdr.t_infomask2 = old_key_tuple->t_data->t_infomask2;
+		xlzhdr.t_infomask = old_key_tuple->t_data->t_infomask;
+		xlzhdr.t_hoff = old_key_tuple->t_data->t_hoff;
+
+		XLogRegisterData((char *) &xlzhdr, SizeOfZHeapHeader);
+		XLogRegisterData((char *) old_key_tuple->t_data +
+						SizeofZHeapTupleHeader,
+						old_key_tuple->t_len -
+						SizeofZHeapTupleHeader);
+	}
+
 	recptr = XLogInsertExtended(RM_ZHEAP_ID, info, RedoRecPtr, doPageWrites);
 	if (recptr == InvalidXLogRecPtr)
 	{
@@ -5941,6 +6033,102 @@ prepare_xlog:
 }
 
 /*
+ * Build a zheap tuple representing the configured REPLICA IDENTITY to represent
+ * the old tuple in a UPDATE or DELETE.
+ *
+ * Returns NULL if there's no need to log an identity or if there's no suitable
+ * key in the Relation relation.
+ */
+static ZHeapTuple
+ZExtractReplicaIdentity(Relation relation, ZHeapTuple tp, bool key_changed,
+						bool *copy)
+{
+	TupleDesc	desc = RelationGetDescr(relation);
+	Oid			replidindex;
+	Relation	idx_rel;
+	char		replident = relation->rd_rel->relreplident;
+	ZHeapTuple	key_tuple = NULL;
+	bool		nulls[MaxHeapAttributeNumber];
+	Datum		values[MaxHeapAttributeNumber];
+	int			natt;
+
+	*copy = false;
+
+	if (!RelationIsLogicallyLogged(relation))
+		return NULL;
+
+	if (replident == REPLICA_IDENTITY_NOTHING)
+		return NULL;
+
+	if (replident == REPLICA_IDENTITY_FULL)
+	{
+		/*
+		 * When logging the entire old tuple, it very well could contain
+		 * toasted columns. If so, force them to be inlined.
+		 */
+		if (ZHeapTupleHasExternal(tp))
+		{
+			elog(ERROR, "toast tables are not supported with replica identity");
+		}
+		return tp;
+	}
+
+	/* if the key hasn't changed and we're only logging the key, we're done */
+	if (!key_changed)
+		return NULL;
+
+	/* find the replica identity index */
+	replidindex = RelationGetReplicaIndex(relation);
+	if (!OidIsValid(replidindex))
+	{
+		elog(DEBUG4, "could not find configured replica identity for table \"%s\"",
+			 RelationGetRelationName(relation));
+		return NULL;
+	}
+
+	idx_rel = RelationIdGetRelation(replidindex);
+
+	Assert(CheckRelationLockedByMe(idx_rel, AccessShareLock, true));
+
+	/* deform tuple, so we have fast access to columns */
+	zheap_deform_tuple(tp, desc, values, nulls);
+
+	/* set all columns to NULL, regardless of whether they actually are */
+	memset(nulls, 1, sizeof(nulls));
+
+	/*
+	 * Now set all columns contained in the index to NOT NULL, they cannot
+	 * currently be NULL.
+	 */
+	for (natt = 0; natt < IndexRelationGetNumberOfKeyAttributes(idx_rel); natt++)
+	{
+		int			attno = idx_rel->rd_index->indkey.values[natt];
+
+		if (attno < 0)
+			elog(ERROR, "system column in index");
+		nulls[attno - 1] = false;
+	}
+
+	key_tuple = zheap_form_tuple(desc, values, nulls);
+	*copy = true;
+	RelationClose(idx_rel);
+
+	/*
+	 * If the tuple, which by here only contains indexed columns, still has
+	 * toasted columns, force them to be inlined. This is somewhat unlikely
+	 * since there's limits on the size of indexed columns, so we don't
+	 * duplicate toast_flatten_tuple()s functionality in the above loop over
+	 * the indexed columns, even if it would be more efficient.
+	 */
+	if (ZHeapTupleHasExternal(key_tuple))
+	{
+		elog(ERROR, "toast tables are not supported with replica identity");
+	}
+
+	return key_tuple;
+}
+
+/*
  * compute_new_xid_infomask - Given the old values of tuple header's infomask,
  * compute the new values for tuple header which includes lock mode, new
  * infomask and transaction slot.
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index bafbbed..fc01f2b 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -34,6 +34,10 @@
 #include "access/xlogutils.h"
 #include "access/xlogreader.h"
 #include "access/xlogrecord.h"
+#include "access/zheap.h"
+#include "access/zheapam_xlog.h"
+#include "access/zheaputils.h"
+#include "access/zhtup.h"
 
 #include "catalog/pg_control.h"
 
@@ -45,6 +49,8 @@
 #include "replication/snapbuild.h"
 
 #include "storage/standby.h"
+#include "utils/rel.h"
+#include "utils/relfilenodemap.h"
 
 typedef struct XLogRecordBuffer
 {
@@ -57,6 +63,7 @@ typedef struct XLogRecordBuffer
 static void DecodeXLogOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
 static void DecodeHeapOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
 static void DecodeHeap2Op(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
+static void DecodeZHeapOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
 static void DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
 static void DecodeStandbyOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
 static void DecodeLogicalMsgOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
@@ -74,6 +81,13 @@ static void DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
 static void DecodeAbort(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
 			xl_xact_parsed_abort *parsed, TransactionId xid);
 
+/* record handlers for zheap */
+static void DecodeZInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
+static void DecodeZDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
+static void DecodeZUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
+static void DecodeZMultiInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
+static ZHeapTuple DecodeXLogZTuple(char *data, Size len);
+
 /* common function to decode tuples */
 static void DecodeXLogTuple(char *data, Size len, ReorderBufferTupleBuf *tup);
 
@@ -161,7 +175,7 @@ LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *recor
 			break;
 		case RM_ZHEAP_ID:
 			/* Logical decoding is not yet implemented for zheap. */
-			Assert(0);
+			DecodeZHeapOp(ctx, &buf);
 			break;
 		case RM_ZHEAP2_ID:
 			/* Logical decoding is not yet implemented for zheap. */
@@ -510,6 +524,73 @@ DecodeHeapOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 	}
 }
 
+/*
+ * Handle rmgr ZHEAP_ID records for DecodeRecordIntoReorderBuffer().
+ */
+static void
+DecodeZHeapOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
+{
+	uint8		info = XLogRecGetInfo(buf->record) & XLOG_ZHEAP_OPMASK;
+	TransactionId xid = XLogRecGetXid(buf->record);
+	SnapBuild  *builder = ctx->snapshot_builder;
+	bool		started_tx = false;
+
+	ReorderBufferProcessXid(ctx->reorder, xid, buf->origptr);
+
+	/*
+	 * If we don't have snapshot or we are just fast-forwarding, there is no
+	 * point in decoding data changes.
+	 */
+	if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT ||
+		ctx->fast_forward)
+		return;
+
+	/* This function might be called inside or outside of transaction. */
+	if (!IsTransactionState())
+	{
+		StartTransactionCommand();
+		started_tx = true;
+	}
+	switch (info)
+	{
+		case XLOG_ZHEAP_INSERT:
+			if (SnapBuildProcessChange(builder, xid, buf->origptr))
+				DecodeZInsert(ctx, buf);
+			break;
+
+		case XLOG_ZHEAP_DELETE:
+			if (SnapBuildProcessChange(builder, xid, buf->origptr))
+				DecodeZDelete(ctx, buf);
+			break;
+		case XLOG_ZHEAP_UPDATE:
+			if (SnapBuildProcessChange(builder, xid, buf->origptr))
+				DecodeZUpdate(ctx, buf);
+			break;
+		case XLOG_ZHEAP_MULTI_INSERT:
+			if (SnapBuildProcessChange(builder, xid, buf->origptr))
+				DecodeZMultiInsert(ctx, buf);
+			break;
+		case XLOG_ZHEAP_LOCK:
+			/* we don't care about row level locks for now */
+			break;
+		/*
+		 * Everything else here is just low level physical stuff we're not
+		 * interested in.
+		 */
+		case XLOG_ZHEAP_FREEZE_XACT_SLOT:
+		case XLOG_ZHEAP_INVALID_XACT_SLOT:
+			break;
+
+		default:
+			elog(ERROR, "unexpected RM_ZHEAP_ID record type: %u", info);
+			break;
+	}
+
+	/* Commit the transaction we have started one in this function. */
+	if (started_tx)
+		CommitTransactionCommand();
+}
+
 static inline bool
 FilterByOrigin(LogicalDecodingContext *ctx, RepOriginId origin_id)
 {
@@ -1068,3 +1149,465 @@ DecodeXLogTuple(char *data, Size len, ReorderBufferTupleBuf *tuple)
 	header->t_infomask2 = xlhdr.t_infomask2;
 	header->t_hoff = xlhdr.t_hoff;
 }
+
+/*
+ * Parse XLOG_ZHEAP_INSERT (not ZMULTI_INSERT!) records into tuplebufs.
+ *
+ * Here we retrieve zheap tuple, convert it to heap tuple format so
+ * reorder buffer stream can understand the tuple format.
+ */
+static void
+DecodeZInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
+{
+	Size		datalen;
+	char	   *tupledata;
+	XLogReaderState *r = buf->record;
+	xl_zheap_insert *xlrec;
+	ReorderBufferChange *change;
+	RelFileNode target_node;
+	Relation	relation = NULL;
+	Oid			reloid;
+	ZHeapTuple	zhtup;
+	HeapTuple	htup;
+
+	xlrec = (xl_zheap_insert *) ((char *) XLogRecGetData(r) + SizeOfUndoHeader);
+
+	/*
+	 * Ignore insert records without new tuples (this does happen when
+	 * raw_zheap_insert marks the TOAST record as HEAP_INSERT_NO_LOGICAL).
+	 */
+	if (!(xlrec->flags & XLZ_INSERT_CONTAINS_NEW_TUPLE))
+		return;
+
+	/* only interested in our database */
+	XLogRecGetBlockTag(r, 0, &target_node, NULL, NULL);
+	if (target_node.dbNode != ctx->slot->data.database)
+		return;
+
+	/* output plugin doesn't look for this origin, no need to queue */
+	if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
+		return;
+
+	change = ReorderBufferGetChange(ctx->reorder);
+	if (!(xlrec->flags & XLZ_INSERT_IS_SPECULATIVE))
+		change->action = REORDER_BUFFER_CHANGE_INSERT;
+	else
+		change->action = REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT;
+	change->origin_id = XLogRecGetOrigin(r);
+
+	memcpy(&change->data.tp.relnode, &target_node, sizeof(RelFileNode));
+
+	tupledata = XLogRecGetBlockData(r, 0, &datalen);
+
+	/*
+	 * Get the zheap tuple from WAL, convert it to heap tuple and store the
+	 * same as change stream.
+	 */
+	zhtup = DecodeXLogZTuple(tupledata, datalen);
+	reloid = RelidByRelfilenode(change->data.tp.relnode.spcNode,
+								change->data.tp.relnode.relNode);
+	relation = RelationIdGetRelation(reloid);
+	htup = zheap_to_heap(zhtup, RelationGetDescr(relation));
+
+	change->data.tp.newtuple =
+		ReorderBufferGetTupleBuf(ctx->reorder,
+								 htup->t_len - SizeofHeapTupleHeader);
+	change->data.tp.newtuple->tuple.t_len = htup->t_len;
+	change->data.tp.newtuple->tuple.t_self = htup->t_self;
+	change->data.tp.newtuple->tuple.t_tableOid = htup->t_tableOid;
+	memcpy((char *) change->data.tp.newtuple->tuple.t_data,
+		   (char *) htup->t_data,
+		   htup->t_len);
+
+	/* be tidy */
+	pfree(zhtup);
+	pfree(htup);
+
+	if (relation != NULL)
+	{
+		RelationClose(relation);
+		relation = NULL;
+	}
+
+	change->data.tp.clear_toast_afterwards = true;
+
+	ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr, change);
+}
+
+/*
+ * Parse XLOG_ZHEAP_DELETE from wal into proper tuplebufs.
+ *
+ * Deletes can possibly contain the old primary key.
+ */
+static void
+DecodeZDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
+{
+	XLogReaderState *r = buf->record;
+	xl_zheap_delete *xlrec;
+	ReorderBufferChange *change;
+	RelFileNode target_node;
+
+	xlrec = (xl_zheap_delete *) ((char *) XLogRecGetData(r) + SizeOfUndoHeader);
+
+	/* only interested in our database */
+	XLogRecGetBlockTag(r, 0, &target_node, NULL, NULL);
+	if (target_node.dbNode != ctx->slot->data.database)
+		return;
+
+	/* output plugin doesn't look for this origin, no need to queue */
+	if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
+		return;
+
+	change = ReorderBufferGetChange(ctx->reorder);
+	change->action = REORDER_BUFFER_CHANGE_DELETE;
+	change->origin_id = XLogRecGetOrigin(r);
+
+	memcpy(&change->data.tp.relnode, &target_node, sizeof(RelFileNode));
+
+	/* old primary key stored */
+	if (xlrec->flags & XLZ_DELETE_CONTAINS_OLD)
+	{
+		Relation	relation = NULL;
+		ZHeapTuple	zhtup;
+		HeapTuple	htup;
+		char	*tupledata;
+		Oid		reloid;
+		Size	datalen;
+
+		tupledata = XLogRecGetBlockData(r, 0, &datalen);
+
+		/*
+		 * Get the zheap tuple from WAL, convert it to heap tuple and store the
+		 * same as change stream.
+		 */
+		zhtup = DecodeXLogZTuple(tupledata, datalen);
+		reloid = RelidByRelfilenode(change->data.tp.relnode.spcNode,
+									change->data.tp.relnode.relNode);
+		relation = RelationIdGetRelation(reloid);
+		htup = zheap_to_heap(zhtup, RelationGetDescr(relation));
+
+		change->data.tp.oldtuple =
+			ReorderBufferGetTupleBuf(ctx->reorder,
+				htup->t_len - SizeofHeapTupleHeader);
+		change->data.tp.oldtuple->tuple.t_len = htup->t_len;
+		change->data.tp.oldtuple->tuple.t_self = htup->t_self;
+		change->data.tp.oldtuple->tuple.t_tableOid = htup->t_tableOid;
+		memcpy((char *) change->data.tp.oldtuple->tuple.t_data,
+			   (char *) htup->t_data,
+			   htup->t_len);
+
+		/* be tidy */
+		pfree(zhtup);
+		pfree(htup);
+
+		if (relation != NULL)
+		{
+			RelationClose(relation);
+			relation = NULL;
+		}
+	}
+
+	change->data.tp.clear_toast_afterwards = true;
+
+	ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr, change);
+}
+
+/*
+ * Parse XLOG_ZHEAP_UPDATE from wal into proper tuplebufs.
+ *
+ * Updates can possibly contain a new tuple and the old primary key.
+ */
+static void
+DecodeZUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
+{
+	XLogReaderState *r = buf->record;
+	xl_zheap_update *xlrec;
+	ReorderBufferChange *change;
+	char	   *data;
+	RelFileNode target_node;
+
+	data = XLogRecGetData(r);
+	xlrec = (xl_zheap_update *) (data + SizeOfUndoHeader);
+
+	/* only interested in our database */
+	XLogRecGetBlockTag(r, 0, &target_node, NULL, NULL);
+	if (target_node.dbNode != ctx->slot->data.database)
+		return;
+
+	/* output plugin doesn't look for this origin, no need to queue */
+	if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
+		return;
+
+	change = ReorderBufferGetChange(ctx->reorder);
+	change->action = REORDER_BUFFER_CHANGE_UPDATE;
+	change->origin_id = XLogRecGetOrigin(r);
+	memcpy(&change->data.tp.relnode, &target_node, sizeof(RelFileNode));
+
+	if (xlrec->flags & XLZ_UPDATE_CONTAINS_NEW_TUPLE)
+	{
+		Relation	relation = NULL;
+		ZHeapTuple	zhtup;
+		HeapTuple	htup;
+		char	   *tupledata;
+		Oid			reloid;
+		Size		datalen;
+
+		tupledata = XLogRecGetBlockData(r, 0, &datalen);
+		zhtup = DecodeXLogZTuple(tupledata, datalen);
+
+		reloid = RelidByRelfilenode(change->data.tp.relnode.spcNode,
+									change->data.tp.relnode.relNode);
+		relation = RelationIdGetRelation(reloid);
+		htup = zheap_to_heap(zhtup, RelationGetDescr(relation));
+
+		change->data.tp.newtuple =
+			ReorderBufferGetTupleBuf(ctx->reorder,
+				htup->t_len - SizeofHeapTupleHeader);
+		change->data.tp.newtuple->tuple.t_len = htup->t_len;
+		change->data.tp.newtuple->tuple.t_self = htup->t_self;
+		change->data.tp.newtuple->tuple.t_tableOid = htup->t_tableOid;
+		memcpy((char *) change->data.tp.newtuple->tuple.t_data,
+			   (char *) htup->t_data,
+			   htup->t_len);
+
+		/* be tidy */
+		pfree(zhtup);
+		pfree(htup);
+
+		if (relation != NULL)
+		{
+			RelationClose(relation);
+			relation = NULL;
+		}
+	}
+
+	if (xlrec->flags & XLZ_UPDATE_CONTAINS_OLD)
+	{
+		Relation	relation = NULL;
+		ZHeapTuple	zhtup;
+		HeapTuple	htup;
+		char	   *tupledata;
+		Oid			reloid;
+		Size		datalen;
+		int32		offset = 0;
+
+		offset =  SizeOfZHeapUpdate + SizeOfUndoHeader;
+
+		if (xlrec->flags & XLZ_UPDATE_OLD_CONTAINS_TPD_SLOT)
+			offset += sizeof(int32);
+		if (xlrec->flags & XLZ_NON_INPLACE_UPDATE)
+		{
+			offset += SizeOfUndoHeader;
+
+			if (xlrec->flags & XLZ_UPDATE_NEW_CONTAINS_TPD_SLOT)
+				offset += sizeof(int32);
+		}
+
+		tupledata = data + offset;
+		datalen = XLogRecGetDataLen(r) - offset;
+		zhtup = DecodeXLogZTuple(tupledata, datalen);
+
+		reloid = RelidByRelfilenode(change->data.tp.relnode.spcNode,
+									change->data.tp.relnode.relNode);
+		relation = RelationIdGetRelation(reloid);
+		htup = zheap_to_heap(zhtup, RelationGetDescr(relation));
+
+		change->data.tp.oldtuple = ReorderBufferGetTupleBuf(ctx->reorder,
+								   htup->t_len - SizeofHeapTupleHeader);
+		change->data.tp.oldtuple->tuple.t_len = htup->t_len;
+		change->data.tp.oldtuple->tuple.t_self = htup->t_self;
+		change->data.tp.oldtuple->tuple.t_tableOid = htup->t_tableOid;
+		memcpy((char *) change->data.tp.oldtuple->tuple.t_data,
+			   (char *) htup->t_data,
+			   htup->t_len);
+
+		/* be tidy */
+		pfree(zhtup);
+		pfree(htup);
+
+		if (relation != NULL)
+		{
+			RelationClose(relation);
+			relation = NULL;
+		}
+	}
+
+	change->data.tp.clear_toast_afterwards = true;
+
+	ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr, change);
+}
+
+/*
+ * Decode XLOG_HEAP2_MULTI_INSERT_insert record into multiple tuplebufs.
+ *
+ * Currently MULTI_INSERT will always contain the full tuples.
+ */
+static void
+DecodeZMultiInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
+{
+	XLogReaderState *r = buf->record;
+	xl_zheap_multi_insert *xlrec;
+	Relation	relation = NULL;
+	int			i;
+	char	   *data;
+	char	   *tupledata;
+	Size		tuplelen;
+	RelFileNode rnode;
+
+	xlrec = (xl_zheap_multi_insert *) ((char *) XLogRecGetData(r) + SizeOfUndoHeader);
+
+	/* only interested in our database */
+	XLogRecGetBlockTag(r, 0, &rnode, NULL, NULL);
+	if (rnode.dbNode != ctx->slot->data.database)
+		return;
+
+	/* output plugin doesn't look for this origin, no need to queue */
+	if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
+		return;
+
+	tupledata = XLogRecGetBlockData(r, 0, &tuplelen);
+
+	data = tupledata;
+	for (i = 0; i < xlrec->ntuples; i++)
+	{
+		ReorderBufferChange *change;
+		xl_multi_insert_ztuple *xlhdr;
+		int			datalen;
+		Oid			reloid;
+		ZHeapTuple	zhtup;
+		HeapTuple	htup;
+
+		change = ReorderBufferGetChange(ctx->reorder);
+		change->action = REORDER_BUFFER_CHANGE_INSERT;
+		change->origin_id = XLogRecGetOrigin(r);
+
+		memcpy(&change->data.tp.relnode, &rnode, sizeof(RelFileNode));
+
+		/*
+		 * CONTAINS_NEW_TUPLE will always be set currently as multi_insert
+		 * isn't used for catalogs, but better be future proof.
+		 *
+		 * We decode the tuple in pretty much the same way as DecodeXLogTuple,
+		 * but since the layout is slightly different, we can't use it here.
+		 */
+		if (xlrec->flags & XLZ_INSERT_CONTAINS_NEW_TUPLE)
+		{
+			ZHeapTupleHeader header;
+
+			xlhdr = (xl_multi_insert_ztuple *) SHORTALIGN(data);
+			data = ((char *) xlhdr) + SizeOfMultiInsertTuple;
+			datalen = xlhdr->datalen;
+
+			zhtup = palloc(tuplelen + ZHEAPTUPLESIZE);
+			header = zhtup->t_data = (ZHeapTupleHeader)((char *) zhtup + ZHEAPTUPLESIZE);
+
+			zhtup->t_len = tuplelen;
+			/* not a disk based tuple */
+			ItemPointerSetInvalid(&zhtup->t_self);
+
+			/* we can only figure this out after reassembling the transactions */
+			zhtup->t_tableOid = InvalidOid;
+
+			memcpy(((char *) zhtup->t_data) + SizeofZHeapTupleHeader,
+				   (char *) data,
+				   datalen);
+
+			header->t_infomask = xlhdr->t_infomask;
+			header->t_infomask2 = xlhdr->t_infomask2;
+			header->t_hoff = xlhdr->t_hoff;
+
+			if (!RelationIsValid(relation))
+			{
+				reloid = RelidByRelfilenode(change->data.tp.relnode.spcNode,
+											change->data.tp.relnode.relNode);
+				relation = RelationIdGetRelation(reloid);
+			}
+
+			htup = zheap_to_heap(zhtup, RelationGetDescr(relation));
+
+			change->data.tp.newtuple =
+				ReorderBufferGetTupleBuf(ctx->reorder,
+										htup->t_len - SizeofHeapTupleHeader);
+			change->data.tp.newtuple->tuple.t_len = htup->t_len;
+			change->data.tp.newtuple->tuple.t_self = htup->t_self;
+			change->data.tp.newtuple->tuple.t_tableOid = htup->t_tableOid;
+			memcpy((char *) change->data.tp.newtuple->tuple.t_data,
+				   (char *) htup->t_data,
+					htup->t_len);
+
+			data += datalen;
+
+			/* be tidy */
+			pfree(zhtup);
+			pfree(htup);
+		}
+
+		/*
+		 * Reset toast reassembly state only after the last row in the last
+		 * xl_multi_insert_tuple record emitted by one heap_multi_insert()
+		 * call.
+		 */
+		if (xlrec->flags & XLZ_INSERT_LAST_IN_MULTI &&
+			(i + 1) == xlrec->ntuples)
+			change->data.tp.clear_toast_afterwards = true;
+		else
+			change->data.tp.clear_toast_afterwards = false;
+
+		ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r),
+								 buf->origptr, change);
+	}
+	Assert(data == tupledata + tuplelen);
+
+	if (RelationIsValid(relation))
+	{
+		RelationClose(relation);
+		relation = NULL;
+	}
+}
+
+/*
+ * Read a ZHeapTuple as WAL logged by zheap_insert, zheap_update and
+ * zheap_delete (but not by zheap_multi_insert) and for in-memory tuple.
+ *
+ * The size 'len' and the pointer 'data' in the record need to be
+ * computed outside as they are record specific.
+ *
+ * The caller is responsible to free the memory for tuple allocated by
+ * this function.
+ */
+static ZHeapTuple
+DecodeXLogZTuple(char *data, Size len)
+{
+	ZHeapTuple	zhtup;
+	xl_zheap_header xlhdr;
+	int			datalen = len - SizeOfZHeapHeader;
+	int			tuplelen = datalen + SizeofZHeapTupleHeader;
+	ZHeapTupleHeader header;
+
+	Assert(datalen >= 0);
+
+	zhtup = palloc(tuplelen + ZHEAPTUPLESIZE);
+	header = zhtup->t_data = (ZHeapTupleHeader)((char *) zhtup + ZHEAPTUPLESIZE);
+
+	zhtup->t_len = tuplelen;
+	/* not a disk based tuple */
+	ItemPointerSetInvalid(&zhtup->t_self);
+
+	/* we can only figure this out after reassembling the transactions */
+	zhtup->t_tableOid = InvalidOid;
+
+	/* data is not stored aligned, copy to aligned storage */
+	memcpy((char *) &xlhdr, data, SizeOfZHeapHeader);
+
+	memset(header, 0, SizeofZHeapTupleHeader);
+
+	memcpy(((char *) zhtup->t_data) + SizeofZHeapTupleHeader,
+		   data + SizeOfZHeapHeader,
+		   datalen);
+
+	header->t_infomask = xlhdr.t_infomask;
+	header->t_infomask2 = xlhdr.t_infomask2;
+	header->t_hoff = xlhdr.t_hoff;
+
+	return zhtup;
+}
diff --git a/src/include/access/zheapam_xlog.h b/src/include/access/zheapam_xlog.h
index 6d031dc..99a360d 100644
--- a/src/include/access/zheapam_xlog.h
+++ b/src/include/access/zheapam_xlog.h
@@ -124,6 +124,12 @@ typedef struct xl_zheap_insert
 #define XLZ_DELETE_CONTAINS_TPD_SLOT			(1<<2)
 #define XLZ_DELETE_CONTAINS_SUBXACT				(1<<3)
 #define XLZ_DELETE_IS_PARTITION_MOVE			(1<<4)
+#define XLZ_DELETE_CONTAINS_OLD_TUPLE			(1<<5)
+#define XLZ_DELETE_CONTAINS_OLD_KEY				(1<<6)
+
+/* convenience macro for checking whether any form of old tuple was logged */
+#define XLZ_DELETE_CONTAINS_OLD						\
+	(XLZ_DELETE_CONTAINS_OLD_TUPLE | XLZ_DELETE_CONTAINS_OLD_KEY)
 
 /* This is what we need to know about delete */
 typedef struct xl_zheap_delete
@@ -155,6 +161,13 @@ typedef struct xl_zheap_delete
 #define	XLZ_UPDATE_OLD_CONTAINS_TPD_SLOT		(1<<6)
 #define	XLZ_UPDATE_NEW_CONTAINS_TPD_SLOT		(1<<7)
 #define XLZ_UPDATE_CONTAINS_SUBXACT				(1<<8)
+#define XLZ_UPDATE_CONTAINS_OLD_TUPLE			(1<<9)
+#define XLZ_UPDATE_CONTAINS_OLD_KEY				(1<<10)
+#define XLZ_UPDATE_CONTAINS_NEW_TUPLE			(1<<11)
+
+/* convenience macro for checking whether any form of old tuple was logged */
+#define XLZ_UPDATE_CONTAINS_OLD						\
+	(XLZ_UPDATE_CONTAINS_OLD_TUPLE | XLZ_UPDATE_CONTAINS_OLD_KEY)
 
 /*
  * This is what we need to know about update|inplace_update
