On 28.01.2013 15:39, Amit Kapila wrote:
Rebased the patch as per HEAD.

I don't like the way heap_delta_encode has intimate knowledge of how the lz compression works. It feels like a violent punch through the abstraction layers.

Ideally, you would just pass the old and new tuple to pglz as char *, and pglz code would find the common parts. But I guess that's too slow, as that's what I originally suggested and you rejected that approach. But even if that's not possible on performance grounds, we don't need to completely blow up the abstraction. pglz can still do the encoding - the caller just needs to pass it the attribute boundaries to consider for matches, so that it doesn't need to scan them byte by byte.

I came up with the attached patch. I wrote it to demonstrate the API, I'm not 100% sure the result after decoding is correct.

- Heikki
diff --git a/src/backend/access/common/heaptuple.c b/src/backend/access/common/heaptuple.c
index e39b977..bbdee4f 100644
--- a/src/backend/access/common/heaptuple.c
+++ b/src/backend/access/common/heaptuple.c
@@ -60,7 +60,11 @@
 #include "access/sysattr.h"
 #include "access/tuptoaster.h"
 #include "executor/tuptable.h"
+#include "utils/datum.h"
+#include "utils/pg_lzcompress.h"
 
+/* guc variable for EWT compression ratio*/
+int			wal_update_compression_ratio = 25;
 
 /* Does att's datatype allow packing into the 1-byte-header varlena format? */
 #define ATT_IS_PACKABLE(att) \
@@ -617,6 +621,119 @@ heap_copytuple_with_tuple(HeapTuple src, HeapTuple dest)
 	memcpy((char *) dest->t_data, (char *) src->t_data, src->t_len);
 }
 
+
+/* ----------------
+ * heap_delta_encode
+ *
+ *		Calculate the delta between two tuples, using pglz. The result is
+ * stored in *encdata. *encdata must point to a PGLZ_header buffer, with at
+ * least PGLZ_MAX_OUTPUT(newtup->t_len) bytes.
+ * ----------------
+ */
+bool
+heap_delta_encode(TupleDesc tupleDesc, HeapTuple oldtup, HeapTuple newtup,
+				  char *encdata)
+{
+	HeapTupleHeader tup = oldtup->t_data;
+	Form_pg_attribute *att = tupleDesc->attrs;
+	bool		hasnulls = HeapTupleHasNulls(oldtup);
+	bits8	   *bp = oldtup->t_data->t_bits;		/* ptr to null bitmap in tuple */
+	bool		slow = false;	/* can we use/set attcacheoff? */
+	char	   *tp;				/* ptr to tuple data */
+	long		off;			/* offset in tuple data */
+	int			natts;
+	int32	   *offsets;
+	int			noffsets;
+	int			attnum;
+	PGLZ_Strategy strategy;
+
+	/*
+	 * Loop through all attributes, if the attribute is modified by the update
+	 * operation, store the [Offset,Length] reffering old tuple version till
+	 * the last unchanged column in the EWT as History Reference, else store
+	 * the [Length,Data] from new tuple version as New Data.
+	 */
+	natts = HeapTupleHeaderGetNatts(oldtup->t_data);
+
+	offsets = palloc(natts * sizeof(int32));
+
+	noffsets = 0;
+
+	/* copied from heap_deform_tuple */
+	tp = (char *) tup + tup->t_hoff;
+	off = 0;
+	for (attnum = 0; attnum < natts; attnum++)
+	{
+		Form_pg_attribute thisatt = att[attnum];
+
+		if (hasnulls && att_isnull(attnum, bp))
+		{
+			slow = true;		/* can't use attcacheoff anymore */
+			continue;
+		}
+
+		if (!slow && thisatt->attcacheoff >= 0)
+			off = thisatt->attcacheoff;
+		else if (thisatt->attlen == -1)
+		{
+			/*
+			 * We can only cache the offset for a varlena attribute if the
+			 * offset is already suitably aligned, so that there would be no
+			 * pad bytes in any case: then the offset will be valid for either
+			 * an aligned or unaligned value.
+			 */
+			if (!slow &&
+				off == att_align_nominal(off, thisatt->attalign))
+				thisatt->attcacheoff = off;
+			else
+			{
+				off = att_align_pointer(off, thisatt->attalign, -1,
+										tp + off);
+				slow = true;
+			}
+		}
+		else
+		{
+			/* not varlena, so safe to use att_align_nominal */
+			off = att_align_nominal(off, thisatt->attalign);
+
+			if (!slow)
+				thisatt->attcacheoff = off;
+		}
+
+		off = att_addlength_pointer(off, thisatt->attlen, tp + off);
+
+		if (thisatt->attlen <= 0)
+			slow = true;		/* can't use attcacheoff anymore */
+
+		offsets[noffsets++] = off;
+	}
+
+	strategy = *PGLZ_strategy_always;
+	strategy.min_comp_rate = wal_update_compression_ratio;
+
+	return pglz_compress_with_history((char *) oldtup->t_data, oldtup->t_len,
+									  (char *) newtup->t_data, newtup->t_len,
+									  offsets, noffsets, (PGLZ_Header *) encdata,
+									  &strategy);
+}
+
+/* ----------------
+ * heap_delta_decode
+ *
+ *		Decode a tuple using delta-encoded WAL tuple and old tuple version.
+ * ----------------
+ */
+void
+heap_delta_decode(char *encdata, HeapTuple oldtup, HeapTuple newtup)
+{
+	return pglz_decompress_with_history((char *) encdata,
+										newtup->t_data,
+										&newtup->t_len,
+										(char *) oldtup->t_data,
+										oldtup->t_len);
+}
+
 /*
  * heap_form_tuple
  *		construct a tuple from the given values[] and isnull[] arrays,
diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index 57d47e8..789bbe2 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -70,6 +70,7 @@
 #include "utils/snapmgr.h"
 #include "utils/syscache.h"
 #include "utils/tqual.h"
+#include "utils/pg_lzcompress.h"
 
 
 /* GUC variable */
@@ -5765,6 +5766,16 @@ log_heap_update(Relation reln, Buffer oldbuf,
 	XLogRecPtr	recptr;
 	XLogRecData rdata[4];
 	Page		page = BufferGetPage(newbuf);
+	char	   *newtupdata;
+	int			newtuplen;
+	bool		compressed = false;
+
+	/* Structure which holds EWT */
+	struct
+	{
+		PGLZ_Header pglzheader;
+		char		buf[MaxHeapTupleSize];
+	}			buf;
 
 	/* Caller should not call me on a non-WAL-logged relation */
 	Assert(RelationNeedsWAL(reln));
@@ -5774,15 +5785,46 @@ log_heap_update(Relation reln, Buffer oldbuf,
 	else
 		info = XLOG_HEAP_UPDATE;
 
+ 	newtupdata = ((char *) newtup->t_data) + offsetof(HeapTupleHeaderData, t_bits);
+	newtuplen = newtup->t_len - offsetof(HeapTupleHeaderData, t_bits);
+
+	/*
+	 * EWT can be generated for all new tuple versions created by Update
+	 * operation. Currently we do it when both the old and new tuple versions
+	 * are on same page, because during recovery if the page containing old
+	 * tuple is corrupt, it should not cascade that corruption to other pages.
+	 * Under the general assumption that for long runs most updates tend to
+	 * create new tuple version on same page, there should not be significant
+	 * impact on WAL reduction or performance.
+	 *
+	 * We should not generate EWT when we need to backup the whole bolck in
+	 * WAL as in that case there is no saving by reduced WAL size.
+	 */
+	if ((oldbuf == newbuf) && !XLogCheckBufferNeedsBackup(newbuf))
+	{
+		/* Delta-encode the new tuple using the old tuple */
+		if (heap_delta_encode(reln->rd_att, oldtup, newtup, (char *) &buf.pglzheader))
+		{
+			compressed = true;
+			newtupdata = (char *) &buf.pglzheader;
+			newtuplen = VARSIZE(&buf.pglzheader);
+		}
+	}
+
+	xlrec.flags = 0;
 	xlrec.target.node = reln->rd_node;
 	xlrec.target.tid = oldtup->t_self;
 	xlrec.old_xmax = HeapTupleHeaderGetRawXmax(oldtup->t_data);
 	xlrec.old_infobits_set = compute_infobits(oldtup->t_data->t_infomask,
 											  oldtup->t_data->t_infomask2);
 	xlrec.new_xmax = HeapTupleHeaderGetRawXmax(newtup->t_data);
-	xlrec.all_visible_cleared = all_visible_cleared;
+	if (all_visible_cleared)
+		xlrec.flags |= XL_HEAP_UPDATE_ALL_VISIBLE_CLEARED;
 	xlrec.newtid = newtup->t_self;
-	xlrec.new_all_visible_cleared = new_all_visible_cleared;
+	if (new_all_visible_cleared)
+		xlrec.flags |= XL_HEAP_UPDATE_NEW_ALL_VISIBLE_CLEARED;
+	if (compressed)
+		xlrec.flags |= XL_HEAP_UPDATE_DELTA_ENCODED;
 
 	rdata[0].data = (char *) &xlrec;
 	rdata[0].len = SizeOfHeapUpdate;
@@ -5809,9 +5851,12 @@ log_heap_update(Relation reln, Buffer oldbuf,
 	rdata[2].buffer_std = true;
 	rdata[2].next = &(rdata[3]);
 
-	/* PG73FORMAT: write bitmap [+ padding] [+ oid] + data */
-	rdata[3].data = (char *) newtup->t_data + offsetof(HeapTupleHeaderData, t_bits);
-	rdata[3].len = newtup->t_len - offsetof(HeapTupleHeaderData, t_bits);
+	/*
+	 * PG73FORMAT: write bitmap [+ padding] [+ oid] + data follows .........
+	 * OR PG93FORMAT [If encoded]: LZ header + Encoded data follows
+	 */
+	rdata[3].data = newtupdata;
+	rdata[3].len = newtuplen;
 	rdata[3].buffer = newbuf;
 	rdata[3].buffer_std = true;
 	rdata[3].next = NULL;
@@ -6614,7 +6659,10 @@ heap_xlog_update(XLogRecPtr lsn, XLogRecord *record, bool hot_update)
 	Page		page;
 	OffsetNumber offnum;
 	ItemId		lp = NULL;
+	HeapTupleData newtup;
+	HeapTupleData oldtup;
 	HeapTupleHeader htup;
+	HeapTupleHeader oldtupdata = NULL;
 	struct
 	{
 		HeapTupleHeaderData hdr;
@@ -6629,7 +6677,7 @@ heap_xlog_update(XLogRecPtr lsn, XLogRecord *record, bool hot_update)
 	 * The visibility map may need to be fixed even if the heap page is
 	 * already up-to-date.
 	 */
-	if (xlrec->all_visible_cleared)
+	if (xlrec->flags & XL_HEAP_UPDATE_ALL_VISIBLE_CLEARED)
 	{
 		Relation	reln = CreateFakeRelcacheEntry(xlrec->target.node);
 		BlockNumber block = ItemPointerGetBlockNumber(&xlrec->target.tid);
@@ -6689,7 +6737,7 @@ heap_xlog_update(XLogRecPtr lsn, XLogRecord *record, bool hot_update)
 	if (PageGetMaxOffsetNumber(page) < offnum || !ItemIdIsNormal(lp))
 		elog(PANIC, "heap_update_redo: invalid lp");
 
-	htup = (HeapTupleHeader) PageGetItem(page, lp);
+	oldtupdata = htup = (HeapTupleHeader) PageGetItem(page, lp);
 
 	htup->t_infomask &= ~(HEAP_XMAX_BITS | HEAP_MOVED);
 	htup->t_infomask2 &= ~HEAP_KEYS_UPDATED;
@@ -6707,7 +6755,7 @@ heap_xlog_update(XLogRecPtr lsn, XLogRecord *record, bool hot_update)
 	/* Mark the page as a candidate for pruning */
 	PageSetPrunable(page, record->xl_xid);
 
-	if (xlrec->all_visible_cleared)
+	if (xlrec->flags & XL_HEAP_UPDATE_ALL_VISIBLE_CLEARED)
 		PageClearAllVisible(page);
 
 	/*
@@ -6732,7 +6780,7 @@ newt:;
 	 * The visibility map may need to be fixed even if the heap page is
 	 * already up-to-date.
 	 */
-	if (xlrec->new_all_visible_cleared)
+	if (xlrec->flags & XL_HEAP_UPDATE_NEW_ALL_VISIBLE_CLEARED)
 	{
 		Relation	reln = CreateFakeRelcacheEntry(xlrec->target.node);
 		BlockNumber block = ItemPointerGetBlockNumber(&xlrec->newtid);
@@ -6795,10 +6843,32 @@ newsame:;
 		   SizeOfHeapHeader);
 	htup = &tbuf.hdr;
 	MemSet((char *) htup, 0, sizeof(HeapTupleHeaderData));
-	/* PG73FORMAT: get bitmap [+ padding] [+ oid] + data */
-	memcpy((char *) htup + offsetof(HeapTupleHeaderData, t_bits),
-		   (char *) xlrec + hsize,
-		   newlen);
+
+	/*
+	 * If the record is EWT then decode it.
+	 */
+	if (xlrec->flags & XL_HEAP_UPDATE_DELTA_ENCODED)
+	{
+		/*
+		 * PG93FORMAT: Header + Control byte + history reference (2 - 3)bytes
+		 * + New data (1 byte length + variable data)+ ...
+		 */
+		PGLZ_Header *encoded_data = (PGLZ_Header *) (((char *) xlrec) + hsize);
+
+		oldtup.t_data = oldtupdata;
+		newtup.t_data = htup;
+
+		heap_delta_decode((char *) encoded_data, &oldtup, &newtup);
+		newlen = newtup.t_len;
+	}
+	else
+	{
+		/* PG73FORMAT: get bitmap [+ padding] [+ oid] + data */
+		memcpy((char *) htup + offsetof(HeapTupleHeaderData, t_bits),
+			   (char *) xlrec + hsize,
+			   newlen);
+	}
+
 	newlen += offsetof(HeapTupleHeaderData, t_bits);
 	htup->t_infomask2 = xlhdr.t_infomask2;
 	htup->t_infomask = xlhdr.t_infomask;
@@ -6814,7 +6884,7 @@ newsame:;
 	if (offnum == InvalidOffsetNumber)
 		elog(PANIC, "heap_update_redo: failed to add tuple");
 
-	if (xlrec->new_all_visible_cleared)
+	if (xlrec->flags & XL_HEAP_UPDATE_NEW_ALL_VISIBLE_CLEARED)
 		PageClearAllVisible(page);
 
 	freespace = PageGetHeapFreeSpace(page);		/* needed to update FSM below */
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index cf2f6e7..9cd6271 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -1204,6 +1204,28 @@ begin:;
 }
 
 /*
+ * Determine whether the buffer referenced has to be backed up. Since we don't
+ * yet have the insert lock, fullPageWrites and forcePageWrites could change
+ * later, but will not cause any problem because this function is used only to
+ * identify whether EWT is required for WAL update.
+ */
+bool
+XLogCheckBufferNeedsBackup(Buffer buffer)
+{
+	bool		doPageWrites;
+	Page		page;
+
+	page = BufferGetPage(buffer);
+
+	doPageWrites = XLogCtl->Insert.fullPageWrites || XLogCtl->Insert.forcePageWrites;
+
+	if (doPageWrites && PageGetLSN(page) <= RedoRecPtr)
+		return true;			/* buffer requires backup */
+
+	return false;				/* buffer does not need to be backed up */
+}
+
+/*
  * Determine whether the buffer referenced by an XLogRecData item has to
  * be backed up, and if so fill a BkpBlock struct for it.  In any case
  * save the buffer's LSN at *lsn.
diff --git a/src/backend/utils/adt/pg_lzcompress.c b/src/backend/utils/adt/pg_lzcompress.c
index 66c64c1..c6ba6af 100644
--- a/src/backend/utils/adt/pg_lzcompress.c
+++ b/src/backend/utils/adt/pg_lzcompress.c
@@ -373,6 +373,7 @@ do { \
  */
 static inline int
 pglz_find_match(PGLZ_HistEntry **hstart, const char *input, const char *end,
+				const char *historyend,
 				int *lenp, int *offp, int good_match, int good_drop)
 {
 	PGLZ_HistEntry *hent;
@@ -393,7 +394,7 @@ pglz_find_match(PGLZ_HistEntry **hstart, const char *input, const char *end,
 		/*
 		 * Stop if the offset does not fit into our tag anymore.
 		 */
-		thisoff = ip - hp;
+		thisoff = (historyend ? historyend : ip) - hp;
 		if (thisoff >= 0x0fff)
 			break;
 
@@ -408,12 +409,12 @@ pglz_find_match(PGLZ_HistEntry **hstart, const char *input, const char *end,
 		thislen = 0;
 		if (len >= 16)
 		{
-			if (memcmp(ip, hp, len) == 0)
+			if ((historyend == NULL || historyend - hp > len) && memcmp(ip, hp, len) == 0)
 			{
 				thislen = len;
 				ip += len;
 				hp += len;
-				while (ip < end && *ip == *hp && thislen < PGLZ_MAX_MATCH)
+				while (ip < end && *ip == *hp && thislen < PGLZ_MAX_MATCH && (historyend == NULL || hp < historyend))
 				{
 					thislen++;
 					ip++;
@@ -423,7 +424,7 @@ pglz_find_match(PGLZ_HistEntry **hstart, const char *input, const char *end,
 		}
 		else
 		{
-			while (ip < end && *ip == *hp && thislen < PGLZ_MAX_MATCH)
+			while (ip < end && *ip == *hp && thislen < PGLZ_MAX_MATCH && (historyend == NULL || hp < historyend))
 			{
 				thislen++;
 				ip++;
@@ -588,7 +589,7 @@ pglz_compress(const char *source, int32 slen, PGLZ_Header *dest,
 		/*
 		 * Try to find a match in the history
 		 */
-		if (pglz_find_match(hist_start, dp, dend, &match_len,
+		if (pglz_find_match(hist_start, dp, dend, NULL, &match_len,
 							&match_off, good_match, good_drop))
 		{
 			/*
@@ -637,6 +638,176 @@ pglz_compress(const char *source, int32 slen, PGLZ_Header *dest,
 	return true;
 }
 
+/*
+ * Like pglz_compress, but performs delta encoding rather than compression.
+ * The back references are offsets from the end of history data, rather
+ * than current output position. 'hoffsets' is an array of offsets in the
+ * history to consider. We could scan the whole history string for possible
+ * matches, but if the caller has some information on which offsets are
+ * likely to be interesting (attribute boundaries, when encoding tuples, for
+ * example), this is a lot faster.
+ */
+bool
+pglz_compress_with_history(const char *source, int32 slen, const char *history,
+						   int32 hlen,
+						   int32 *hoffsets,
+						   int32 nhoffsets,
+						   PGLZ_Header *dest, const PGLZ_Strategy *strategy)
+{
+	unsigned char *bp = ((unsigned char *) dest) + sizeof(PGLZ_Header);
+	unsigned char *bstart = bp;
+	int			hist_next = 0;
+	bool		hist_recycle = false;
+	const char *dp = source;
+	const char *dend = source + slen;
+	unsigned char ctrl_dummy = 0;
+	unsigned char *ctrlp = &ctrl_dummy;
+	unsigned char ctrlb = 0;
+	unsigned char ctrl = 0;
+	bool		found_match = false;
+	int32		match_len;
+	int32		match_off;
+	int32		good_match;
+	int32		good_drop;
+	int32		result_size;
+	int32		result_max;
+	int			i;
+	int32		need_rate;
+	const char *historyend = history + hlen;
+
+	/*
+	 * Our fallback strategy is the default.
+	 */
+	if (strategy == NULL)
+		strategy = PGLZ_strategy_default;
+
+	/*
+	 * If the strategy forbids compression (at all or if source chunk size out
+	 * of range), fail.
+	 */
+	if (strategy->match_size_good <= 0 ||
+		slen < strategy->min_input_size ||
+		slen > strategy->max_input_size)
+		return false;
+
+	/*
+	 * Save the original source size in the header.
+	 */
+	dest->rawsize = slen;
+
+	/*
+	 * Limit the match parameters to the supported range.
+	 */
+	good_match = strategy->match_size_good;
+	if (good_match > PGLZ_MAX_MATCH)
+		good_match = PGLZ_MAX_MATCH;
+	else if (good_match < 17)
+		good_match = 17;
+
+	good_drop = strategy->match_size_drop;
+	if (good_drop < 0)
+		good_drop = 0;
+	else if (good_drop > 100)
+		good_drop = 100;
+
+	need_rate = strategy->min_comp_rate;
+	if (need_rate < 0)
+		need_rate = 0;
+	else if (need_rate > 99)
+		need_rate = 99;
+
+	/*
+	 * Compute the maximum result size allowed by the strategy, namely the
+	 * input size minus the minimum wanted compression rate.  This had better
+	 * be <= slen, else we might overrun the provided output buffer.
+	 */
+	if (slen > (INT_MAX / 100))
+	{
+		/* Approximate to avoid overflow */
+		result_max = (slen / 100) * (100 - need_rate);
+	}
+	else
+		result_max = (slen * (100 - need_rate)) / 100;
+
+	/*
+	 * Initialize the history lists to empty.  We do not need to zero the
+	 * hist_entries[] array; its entries are initialized as they are used.
+	 */
+	memset(hist_start, 0, sizeof(hist_start));
+
+	/* Populate the history hash from the history string */
+	for (i = 0; i < nhoffsets; i++)
+	{
+		const char *hp = history + hoffsets[i];
+
+		/* Add this offset to history */
+		pglz_hist_add(hist_start, hist_entries,
+					  hist_next, hist_recycle,
+					  hp, historyend);
+	}
+
+	/*
+	 * Compress the source directly into the output buffer.
+	 */
+	dp = source;
+	while (dp < dend)
+	{
+		/*
+		 * If we already exceeded the maximum result size, fail.
+		 *
+		 * We check once per loop; since the loop body could emit as many as 4
+		 * bytes (a control byte and 3-byte tag), PGLZ_MAX_OUTPUT() had better
+		 * allow 4 slop bytes.
+		 */
+		if (bp - bstart >= result_max)
+			return false;
+
+		/*
+		 * Try to find a match in the history
+		 */
+		if (pglz_find_match(hist_start, dp, dend, historyend, &match_len,
+							&match_off, good_match, good_drop))
+		{
+			/*
+			 * Create the tag and add history entries for all matched
+			 * characters.
+			 */
+			pglz_out_tag(ctrlp, ctrlb, ctrl, bp, match_len, match_off);
+			dp += match_len;
+			found_match = true;
+		}
+		else
+		{
+			/*
+			 * No match found. Copy one literal byte.
+			 */
+			pglz_out_literal(ctrlp, ctrlb, ctrl, bp, *dp);
+			dp++;				/* Do not do this ++ in the line above! */
+			/* The macro would do it four times - Jan.	*/
+		}
+	}
+
+	if (!found_match)
+		return false;
+
+	/*
+	 * Write out the last control byte and check that we haven't overrun the
+	 * output size allowed by the strategy.
+	 */
+	*ctrlp = ctrlb;
+	result_size = bp - bstart;
+
+#ifdef DELTA_DEBUG
+	elog(LOG, "old %d new %d compressed %d", hlen, slen, result_size);
+#endif
+
+	/*
+	 * Success - need only fill in the actual length of the compressed datum.
+	 */
+	SET_VARSIZE_COMPRESSED(dest, result_size + sizeof(PGLZ_Header));
+
+	return true;
+}
 
 /* ----------
  * pglz_decompress -
@@ -647,15 +818,39 @@ pglz_compress(const char *source, int32 slen, PGLZ_Header *dest,
 void
 pglz_decompress(const PGLZ_Header *source, char *dest)
 {
+	pglz_decompress_with_history((char *) source, dest, NULL, NULL, 0);
+}
+
+/* ----------
+ * pglz_decompress_with_history -
+ *
+ *		Decompresses source into dest.
+ *		To decompress, it uses history if provided.
+ * ----------
+ */
+void
+pglz_decompress_with_history(const char *source, char *dest, uint32 *destlen,
+							 const char *history, int hlen)
+{
+	PGLZ_Header src;
 	const unsigned char *sp;
 	const unsigned char *srcend;
 	unsigned char *dp;
 	unsigned char *destend;
+	const char *historyend = history + hlen;
+
+	/* To avoid the unaligned access of PGLZ_Header */
+	memcpy((char *) &src, source, sizeof(PGLZ_Header));
 
 	sp = ((const unsigned char *) source) + sizeof(PGLZ_Header);
-	srcend = ((const unsigned char *) source) + VARSIZE(source);
+	srcend = ((const unsigned char *) source) + VARSIZE(&src);
 	dp = (unsigned char *) dest;
-	destend = dp + source->rawsize;
+	destend = dp + src.rawsize;
+
+	if (destlen)
+	{
+		*destlen = src.rawsize;
+	}
 
 	while (sp < srcend && dp < destend)
 	{
@@ -699,26 +894,38 @@ pglz_decompress(const PGLZ_Header *source, char *dest)
 					break;
 				}
 
-				/*
-				 * Now we copy the bytes specified by the tag from OUTPUT to
-				 * OUTPUT. It is dangerous and platform dependent to use
-				 * memcpy() here, because the copied areas could overlap
-				 * extremely!
-				 */
-				while (len--)
+				if (history)
+				{
+					/*
+					 * Now we copy the bytes specified by the tag from history
+					 * to OUTPUT.
+					 */
+					memcpy(dp, historyend - off, len);
+					dp += len;
+				}
+				else
 				{
-					*dp = dp[-off];
-					dp++;
+					/*
+					 * Now we copy the bytes specified by the tag from OUTPUT
+					 * to OUTPUT. It is dangerous and platform dependent to
+					 * use memcpy() here, because the copied areas could
+					 * overlap extremely!
+					 */
+					while (len--)
+					{
+						*dp = dp[-off];
+						dp++;
+					}
 				}
 			}
 			else
 			{
 				/*
-				 * An unset control bit means LITERAL BYTE. So we just copy
-				 * one from INPUT to OUTPUT.
+				 * An unset control bit means LITERAL BYTE. So we just
+				 * copy one from INPUT to OUTPUT.
 				 */
-				if (dp >= destend)		/* check for buffer overrun */
-					break;		/* do not clobber memory */
+				if (dp >= destend)	/* check for buffer overrun */
+					break;	/* do not clobber memory */
 
 				*dp++ = *sp++;
 			}
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index 6128694..9a37b2d 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -123,6 +123,7 @@ extern int	CommitSiblings;
 extern char *default_tablespace;
 extern char *temp_tablespaces;
 extern bool synchronize_seqscans;
+extern int	wal_update_compression_ratio;
 extern int	ssl_renegotiation_limit;
 extern char *SSLCipherSuites;
 
@@ -2382,6 +2383,17 @@ static struct config_int ConfigureNamesInt[] =
 		NULL, NULL, NULL
 	},
 
+	{
+		/* Not for general use */
+		{"wal_update_compression_ratio", PGC_USERSET, DEVELOPER_OPTIONS,
+			gettext_noop("Sets the compression ratio of delta record for wal update"),
+			NULL,
+		},
+		&wal_update_compression_ratio,
+		25, 1, 99,
+		NULL, NULL, NULL
+	},
+
 	/* End-of-list marker */
 	{
 		{NULL, 0, 0, NULL, NULL}, NULL, 0, 0, 0, NULL, NULL, NULL
diff --git a/src/include/access/heapam_xlog.h b/src/include/access/heapam_xlog.h
index 270924a..1825292 100644
--- a/src/include/access/heapam_xlog.h
+++ b/src/include/access/heapam_xlog.h
@@ -147,13 +147,22 @@ typedef struct xl_heap_update
 	TransactionId old_xmax;		/* xmax of the old tuple */
 	TransactionId new_xmax;		/* xmax of the new tuple */
 	ItemPointerData newtid;		/* new inserted tuple id */
-	uint8		old_infobits_set;	/* infomask bits to set on old tuple */
-	bool		all_visible_cleared;	/* PD_ALL_VISIBLE was cleared */
-	bool		new_all_visible_cleared;		/* same for the page of newtid */
+	uint8		old_infobits_set;		/* infomask bits to set on old tuple */
+	int			flags;			/* flag bits, see below */
 	/* NEW TUPLE xl_heap_header AND TUPLE DATA FOLLOWS AT END OF STRUCT */
 } xl_heap_update;
 
-#define SizeOfHeapUpdate	(offsetof(xl_heap_update, new_all_visible_cleared) + sizeof(bool))
+#define XL_HEAP_UPDATE_ALL_VISIBLE_CLEARED		0x01	/* Indicates as old
+														 * page's all visible
+														 * bit is cleared */
+#define XL_HEAP_UPDATE_NEW_ALL_VISIBLE_CLEARED	0x02	/* Indicates as new
+														 * page's all visible
+														 * bit is cleared */
+#define XL_HEAP_UPDATE_DELTA_ENCODED			0x04	/* Indicates as the
+														 * update operation is
+														 * delta encoded */
+
+#define SizeOfHeapUpdate	(offsetof(xl_heap_update, flags) + sizeof(int))
 
 /*
  * This is what we need to know about vacuum page cleanup/redirect
diff --git a/src/include/access/htup_details.h b/src/include/access/htup_details.h
index cd01ecd..042c8b9 100644
--- a/src/include/access/htup_details.h
+++ b/src/include/access/htup_details.h
@@ -687,6 +687,11 @@ extern HeapTuple heap_modify_tuple(HeapTuple tuple,
 extern void heap_deform_tuple(HeapTuple tuple, TupleDesc tupleDesc,
 				  Datum *values, bool *isnull);
 
+extern bool heap_delta_encode(TupleDesc tupleDesc, HeapTuple oldtup,
+				HeapTuple newtup, char *encdata);
+extern void heap_delta_decode (char *encdata, HeapTuple oldtup,
+				HeapTuple newtup);
+
 /* these three are deprecated versions of the three above: */
 extern HeapTuple heap_formtuple(TupleDesc tupleDescriptor,
 			   Datum *values, char *nulls);
diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h
index 72e3242..15f5d5d 100644
--- a/src/include/access/xlog.h
+++ b/src/include/access/xlog.h
@@ -261,6 +261,7 @@ typedef struct CheckpointStatsData
 extern CheckpointStatsData CheckpointStats;
 
 extern XLogRecPtr XLogInsert(RmgrId rmid, uint8 info, XLogRecData *rdata);
+extern bool XLogCheckBufferNeedsBackup(Buffer buffer);
 extern void XLogFlush(XLogRecPtr RecPtr);
 extern bool XLogBackgroundFlush(void);
 extern bool XLogNeedsFlush(XLogRecPtr RecPtr);
diff --git a/src/include/utils/pg_lzcompress.h b/src/include/utils/pg_lzcompress.h
index 4af24a3..7a32803 100644
--- a/src/include/utils/pg_lzcompress.h
+++ b/src/include/utils/pg_lzcompress.h
@@ -107,6 +107,8 @@ extern const PGLZ_Strategy *const PGLZ_strategy_always;
  */
 extern bool pglz_compress(const char *source, int32 slen, PGLZ_Header *dest,
 			  const PGLZ_Strategy *strategy);
+extern bool pglz_compress_with_history(const char *source, int32 slen, const char *history, int32 hlen, int32 *hoffsets, int32 noffsets, PGLZ_Header *dest, const PGLZ_Strategy *strategy);
 extern void pglz_decompress(const PGLZ_Header *source, char *dest);
+extern void pglz_decompress_with_history(const char *source, char *dest, uint32 *destlen, const char *history, int hlen);
 
 #endif   /* _PG_LZCOMPRESS_H_ */
diff --git a/src/test/regress/expected/update.out b/src/test/regress/expected/update.out
index 71b856f..af46df2 100644
--- a/src/test/regress/expected/update.out
+++ b/src/test/regress/expected/update.out
@@ -97,3 +97,73 @@ SELECT a, b, char_length(c) FROM update_test;
 (2 rows)
 
 DROP TABLE update_test;
+--
+-- Test to update continuos and non continuos columns
+--
+DROP TABLE IF EXISTS update_test;
+NOTICE:  table "update_test" does not exist, skipping
+CREATE TABLE update_test (
+		bser bigserial,
+		bln boolean,
+		ename VARCHAR(25),
+		perf_f float(8),
+		grade CHAR,
+		dept CHAR(5) NOT NULL,
+		dob DATE,
+		idnum INT,
+		addr VARCHAR(30) NOT NULL,
+		destn CHAR(6),
+		Gend CHAR,
+		samba BIGINT,
+		hgt float,
+		ctime TIME
+);
+INSERT INTO update_test VALUES (
+		nextval('update_test_bser_seq'::regclass),
+		TRUE,
+		'Test',
+		7.169,
+		'B',
+		'CSD',
+		'2000-01-01',
+		520,
+		'road2,
+		streeeeet2,
+		city2',
+		'dcy2',
+		'M',
+		12000,
+		50.4,
+		'00:00:00.0'
+);
+SELECT * from update_test;
+ bser | bln | ename | perf_f | grade | dept  |    dob     | idnum |            addr             | destn  | gend | samba | hgt  |  ctime   
+------+-----+-------+--------+-------+-------+------------+-------+-----------------------------+--------+------+-------+------+----------
+    1 | t   | Test  |  7.169 | B     | CSD   | 01-01-2000 |   520 | road2,                     +| dcy2   | M    | 12000 | 50.4 | 00:00:00
+      |     |       |        |       |       |            |       |                 streeeeet2,+|        |      |       |      | 
+      |     |       |        |       |       |            |       |                 city2       |        |      |       |      | 
+(1 row)
+
+-- update first column
+UPDATE update_test SET bser = bser - 1 + 1;
+-- update middle column
+UPDATE update_test SET perf_f = 8.9;
+-- update last column
+UPDATE update_test SET ctime = '00:00:00.1';
+-- update 3 continuos columns
+UPDATE update_test SET destn = 'dcy2', samba = 0 WHERE Gend = 'M' and dept = 'CSD';
+-- update two non continuos columns
+UPDATE update_test SET destn = 'moved', samba = 0;
+UPDATE update_test SET bln = FALSE, hgt = 10.1;
+-- update causing some column alignment difference
+UPDATE update_test SET ename = 'Tes';
+UPDATE update_test SET dept = 'Test';
+SELECT * from update_test;
+ bser | bln | ename | perf_f | grade | dept  |    dob     | idnum |            addr             | destn  | gend | samba | hgt  |   ctime    
+------+-----+-------+--------+-------+-------+------------+-------+-----------------------------+--------+------+-------+------+------------
+    1 | f   | Tes   |    8.9 | B     | Test  | 01-01-2000 |   520 | road2,                     +| moved  | M    |     0 | 10.1 | 00:00:00.1
+      |     |       |        |       |       |            |       |                 streeeeet2,+|        |      |       |      | 
+      |     |       |        |       |       |            |       |                 city2       |        |      |       |      | 
+(1 row)
+
+DROP TABLE update_test;
diff --git a/src/test/regress/sql/update.sql b/src/test/regress/sql/update.sql
index a8a028f..1806992 100644
--- a/src/test/regress/sql/update.sql
+++ b/src/test/regress/sql/update.sql
@@ -59,3 +59,70 @@ UPDATE update_test SET c = repeat('x', 10000) WHERE c = 'car';
 SELECT a, b, char_length(c) FROM update_test;
 
 DROP TABLE update_test;
+
+
+--
+-- Test to update continuos and non continuos columns
+--
+
+DROP TABLE IF EXISTS update_test;
+CREATE TABLE update_test (
+		bser bigserial,
+		bln boolean,
+		ename VARCHAR(25),
+		perf_f float(8),
+		grade CHAR,
+		dept CHAR(5) NOT NULL,
+		dob DATE,
+		idnum INT,
+		addr VARCHAR(30) NOT NULL,
+		destn CHAR(6),
+		Gend CHAR,
+		samba BIGINT,
+		hgt float,
+		ctime TIME
+);
+
+INSERT INTO update_test VALUES (
+		nextval('update_test_bser_seq'::regclass),
+		TRUE,
+		'Test',
+		7.169,
+		'B',
+		'CSD',
+		'2000-01-01',
+		520,
+		'road2,
+		streeeeet2,
+		city2',
+		'dcy2',
+		'M',
+		12000,
+		50.4,
+		'00:00:00.0'
+);
+
+SELECT * from update_test;
+
+-- update first column
+UPDATE update_test SET bser = bser - 1 + 1;
+
+-- update middle column
+UPDATE update_test SET perf_f = 8.9;
+
+-- update last column
+UPDATE update_test SET ctime = '00:00:00.1';
+
+-- update 3 continuos columns
+UPDATE update_test SET destn = 'dcy2', samba = 0 WHERE Gend = 'M' and dept = 'CSD';
+
+-- update two non continuos columns
+UPDATE update_test SET destn = 'moved', samba = 0;
+UPDATE update_test SET bln = FALSE, hgt = 10.1;
+
+-- update causing some column alignment difference
+UPDATE update_test SET ename = 'Tes';
+UPDATE update_test SET dept = 'Test';
+
+SELECT * from update_test;
+DROP TABLE update_test;
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to