On Fri, Sep 10, 2021 at 9:54 PM Jaime Casanova
<jcasa...@systemguards.com.ec> wrote:

> > I will do that early next week.
> >
>
> Great! I'm marking the patch as "waiting on author".
> Thanks for keep working on this.
>

I haved rebased the patch.

-- 
Regards,
Dilip Kumar
EnterpriseDB: http://www.enterprisedb.com
From 167be9aa3266c9d022820a128afe012a0b0cebf4 Mon Sep 17 00:00:00 2001
From: Dilip Kumar <dilipkumar@localhost.localdomain>
Date: Sat, 11 Sep 2021 14:32:08 +0530
Subject: [PATCH v3] Toast compression method options

Allow user to provide compression method option for the toastable
attribute while setting the compression method.
---
 doc/src/sgml/ref/alter_table.sgml             |   8 +-
 doc/src/sgml/ref/create_table.sgml            |   7 +-
 src/backend/access/brin/brin_tuple.c          |   9 +-
 src/backend/access/common/indextuple.c        |  11 +-
 src/backend/access/common/reloptions.c        |  64 +++++++
 src/backend/access/common/toast_compression.c | 176 ++++++++++++++++++-
 src/backend/access/common/toast_internals.c   |   6 +-
 src/backend/access/table/toast_helper.c       |   7 +-
 src/backend/bootstrap/bootparse.y             |   1 +
 src/backend/catalog/heap.c                    |  15 +-
 src/backend/catalog/index.c                   |  44 ++++-
 src/backend/catalog/toasting.c                |   1 +
 src/backend/commands/cluster.c                |   1 +
 src/backend/commands/foreigncmds.c            |  44 -----
 src/backend/commands/tablecmds.c              | 241 +++++++++++++++++++++-----
 src/backend/nodes/copyfuncs.c                 |  16 +-
 src/backend/nodes/equalfuncs.c                |  14 +-
 src/backend/nodes/outfuncs.c                  |  14 +-
 src/backend/parser/gram.y                     |  28 ++-
 src/backend/parser/parse_utilcmd.c            |   3 +-
 src/include/access/toast_compression.h        |   8 +-
 src/include/access/toast_helper.h             |   2 +
 src/include/access/toast_internals.h          |   2 +-
 src/include/catalog/heap.h                    |   2 +
 src/include/catalog/pg_attribute.h            |   3 +
 src/include/commands/defrem.h                 |   7 +-
 src/include/nodes/nodes.h                     |   1 +
 src/include/nodes/parsenodes.h                |  15 +-
 src/test/regress/expected/compression.out     |  33 ++++
 src/test/regress/expected/compression_1.out   |  36 ++++
 src/test/regress/expected/misc_sanity.out     |   3 +-
 src/test/regress/sql/compression.sql          |  15 ++
 32 files changed, 705 insertions(+), 132 deletions(-)

diff --git a/doc/src/sgml/ref/alter_table.sgml b/doc/src/sgml/ref/alter_table.sgml
index 8129157..afee4e7 100644
--- a/doc/src/sgml/ref/alter_table.sgml
+++ b/doc/src/sgml/ref/alter_table.sgml
@@ -54,7 +54,7 @@ ALTER TABLE [ IF EXISTS ] <replaceable class="parameter">name</replaceable>
     ALTER [ COLUMN ] <replaceable class="parameter">column_name</replaceable> SET ( <replaceable class="parameter">attribute_option</replaceable> = <replaceable class="parameter">value</replaceable> [, ... ] )
     ALTER [ COLUMN ] <replaceable class="parameter">column_name</replaceable> RESET ( <replaceable class="parameter">attribute_option</replaceable> [, ... ] )
     ALTER [ COLUMN ] <replaceable class="parameter">column_name</replaceable> SET STORAGE { PLAIN | EXTERNAL | EXTENDED | MAIN }
-    ALTER [ COLUMN ] <replaceable class="parameter">column_name</replaceable> SET COMPRESSION <replaceable class="parameter">compression_method</replaceable>
+    ALTER [ COLUMN ] <replaceable class="parameter">column_name</replaceable> SET COMPRESSION <replaceable class="parameter">compression_method</replaceable> [ WITH (<replaceable class="parameter">compression_options</replaceable>) ]
     ADD <replaceable class="parameter">table_constraint</replaceable> [ NOT VALID ]
     ADD <replaceable class="parameter">table_constraint_using_index</replaceable>
     ALTER CONSTRAINT <replaceable class="parameter">constraint_name</replaceable> [ DEFERRABLE | NOT DEFERRABLE ] [ INITIALLY DEFERRED | INITIALLY IMMEDIATE ]
@@ -387,7 +387,7 @@ WITH ( MODULUS <replaceable class="parameter">numeric_literal</replaceable>, REM
 
    <varlistentry>
     <term>
-     <literal>SET COMPRESSION <replaceable class="parameter">compression_method</replaceable></literal>
+     <literal>SET COMPRESSION <replaceable class="parameter">compression_method</replaceable> [ WITH (<replaceable class="parameter">compression_options</replaceable>) ]</literal>
     </term>
     <listitem>
      <para>
@@ -410,7 +410,9 @@ WITH ( MODULUS <replaceable class="parameter">numeric_literal</replaceable>, REM
       addition, <replaceable class="parameter">compression_method</replaceable>
       can be <literal>default</literal>, which selects the default behavior of
       consulting the <xref linkend="guc-default-toast-compression"/> setting
-      at the time of data insertion to determine the method to use.
+      at the time of data insertion to determine the method to use.  The
+      compression options for the compression method can be specified using
+      <literal>WITH</literal>
      </para>
     </listitem>
    </varlistentry>
diff --git a/doc/src/sgml/ref/create_table.sgml b/doc/src/sgml/ref/create_table.sgml
index 473a0a4..a47a5a1 100644
--- a/doc/src/sgml/ref/create_table.sgml
+++ b/doc/src/sgml/ref/create_table.sgml
@@ -22,7 +22,7 @@ PostgreSQL documentation
  <refsynopsisdiv>
 <synopsis>
 CREATE [ [ GLOBAL | LOCAL ] { TEMPORARY | TEMP } | UNLOGGED ] TABLE [ IF NOT EXISTS ] <replaceable class="parameter">table_name</replaceable> ( [
-  { <replaceable class="parameter">column_name</replaceable> <replaceable class="parameter">data_type</replaceable> [ COMPRESSION <replaceable>compression_method</replaceable> ] [ COLLATE <replaceable>collation</replaceable> ] [ <replaceable class="parameter">column_constraint</replaceable> [ ... ] ]
+  { <replaceable class="parameter">column_name</replaceable> <replaceable class="parameter">data_type</replaceable> [ COMPRESSION <replaceable>compression_method</replaceable> [ WITH (<replaceable class="parameter">compression_options</replaceable>) ] ] [ COLLATE <replaceable>collation</replaceable> ] [ <replaceable class="parameter">column_constraint</replaceable> [ ... ] ]
     | <replaceable>table_constraint</replaceable>
     | LIKE <replaceable>source_table</replaceable> [ <replaceable>like_option</replaceable> ... ] }
     [, ... ]
@@ -289,7 +289,7 @@ WITH ( MODULUS <replaceable class="parameter">numeric_literal</replaceable>, REM
    </varlistentry>
 
    <varlistentry>
-    <term><literal>COMPRESSION <replaceable class="parameter">compression_method</replaceable></literal></term>
+    <term><literal>COMPRESSION <replaceable class="parameter">compression_method</replaceable> [ WITH (<replaceable class="parameter">compression_options</replaceable>) ]</literal></term>
     <listitem>
      <para>
       The <literal>COMPRESSION</literal> clause sets the compression method
@@ -308,7 +308,8 @@ WITH ( MODULUS <replaceable class="parameter">numeric_literal</replaceable>, REM
       can be <literal>default</literal> to explicitly specify the default
       behavior, which is to consult the
       <xref linkend="guc-default-toast-compression"/> setting at the time of
-      data insertion to determine the method to use.
+      data insertion to determine the method to use.  The compression options
+      for the compression method can be specified using <literal>WITH</literal>
      </para>
     </listitem>
    </varlistentry>
diff --git a/src/backend/access/brin/brin_tuple.c b/src/backend/access/brin/brin_tuple.c
index 09e563b..b48a725 100644
--- a/src/backend/access/brin/brin_tuple.c
+++ b/src/backend/access/brin/brin_tuple.c
@@ -38,6 +38,7 @@
 #include "access/toast_internals.h"
 #include "access/tupdesc.h"
 #include "access/tupmacs.h"
+#include "commands/defrem.h"
 #include "utils/datum.h"
 #include "utils/memutils.h"
 
@@ -223,6 +224,7 @@ brin_form_tuple(BrinDesc *brdesc, BlockNumber blkno, BrinMemTuple *tuple,
 			{
 				Datum		cvalue;
 				char		compression;
+				List	   *acoption = NULL;
 				Form_pg_attribute att = TupleDescAttr(brdesc->bd_tupdesc,
 													  keyno);
 
@@ -233,11 +235,16 @@ brin_form_tuple(BrinDesc *brdesc, BlockNumber blkno, BrinMemTuple *tuple,
 				 * default method.
 				 */
 				if (att->atttypid == atttype->type_id)
+				{
 					compression = att->attcompression;
+
+					/* Get the compression method option. */
+					acoption = GetAttributeCompressionOptions(att);
+				}
 				else
 					compression = InvalidCompressionMethod;
 
-				cvalue = toast_compress_datum(value, compression);
+				cvalue = toast_compress_datum(value, compression, acoption);
 
 				if (DatumGetPointer(cvalue) != NULL)
 				{
diff --git a/src/backend/access/common/indextuple.c b/src/backend/access/common/indextuple.c
index 8df882d..d6560b3 100644
--- a/src/backend/access/common/indextuple.c
+++ b/src/backend/access/common/indextuple.c
@@ -21,6 +21,7 @@
 #include "access/htup_details.h"
 #include "access/itup.h"
 #include "access/toast_internals.h"
+#include "commands/defrem.h"
 
 /*
  * This enables de-toasting of index entries.  Needed until VACUUM is
@@ -104,9 +105,17 @@ index_form_tuple(TupleDesc tupleDescriptor,
 			 att->attstorage == TYPSTORAGE_MAIN))
 		{
 			Datum		cvalue;
+			List	   *acoption = NULL;
+
+			/*
+			 * If the attribute has a valid compression method then get the
+			 * compression options.
+			 */
+			if (CompressionMethodIsValid(att->attcompression))
+				acoption = GetAttributeCompressionOptions(att);
 
 			cvalue = toast_compress_datum(untoasted_values[i],
-										  att->attcompression);
+										  att->attcompression, acoption);
 
 			if (DatumGetPointer(cvalue) != NULL)
 			{
diff --git a/src/backend/access/common/reloptions.c b/src/backend/access/common/reloptions.c
index b5602f5..d381618 100644
--- a/src/backend/access/common/reloptions.c
+++ b/src/backend/access/common/reloptions.c
@@ -2129,3 +2129,67 @@ AlterTableGetRelOptionsLockLevel(List *defList)
 
 	return lockmode;
 }
+
+/*
+ * Convert a DefElem list to the text array format that is used in
+ * pg_foreign_data_wrapper, pg_foreign_server, pg_user_mapping, and
+ * pg_foreign_table.
+ *
+ * Returns the array in the form of a Datum, or PointerGetDatum(NULL)
+ * if the list is empty.
+ *
+ * Note: The array is usually stored to database without further
+ * processing, hence any validation should be done before this
+ * conversion.
+ */
+Datum
+optionListToArray(List *options)
+{
+	ArrayBuildState *astate = NULL;
+	ListCell   *cell;
+
+	foreach(cell, options)
+	{
+		DefElem    *def = lfirst(cell);
+		const char *value;
+		Size		len;
+		text	   *t;
+
+		value = defGetString(def);
+		len = VARHDRSZ + strlen(def->defname) + 1 + strlen(value);
+		t = palloc(len + 1);
+		SET_VARSIZE(t, len);
+		sprintf(VARDATA(t), "%s=%s", def->defname, value);
+
+		astate = accumArrayResult(astate, PointerGetDatum(t),
+								  false, TEXTOID,
+								  CurrentMemoryContext);
+	}
+
+	if (astate)
+		return makeArrayResult(astate, CurrentMemoryContext);
+
+	return PointerGetDatum(NULL);
+}
+
+/*
+ * Return human readable list of reloptions
+ */
+char *
+formatRelOptions(List *options)
+{
+	StringInfoData buf;
+	ListCell   *cell;
+
+	initStringInfo(&buf);
+
+	foreach(cell, options)
+	{
+		DefElem    *def = (DefElem *) lfirst(cell);
+
+		appendStringInfo(&buf, "%s%s=%s", buf.len > 0 ? ", " : "",
+						 def->defname, defGetString(def));
+	}
+
+	return buf.data;
+}
diff --git a/src/backend/access/common/toast_compression.c b/src/backend/access/common/toast_compression.c
index 8456183..53c3039 100644
--- a/src/backend/access/common/toast_compression.c
+++ b/src/backend/access/common/toast_compression.c
@@ -19,6 +19,7 @@
 
 #include "access/detoast.h"
 #include "access/toast_compression.h"
+#include "commands/defrem.h"
 #include "common/pg_lzcompress.h"
 #include "fmgr.h"
 #include "utils/builtins.h"
@@ -33,26 +34,113 @@ int			default_toast_compression = TOAST_PGLZ_COMPRESSION;
 			 errdetail("This functionality requires the server to be built with lz4 support."), \
 			 errhint("You need to rebuild PostgreSQL using %s.", "--with-lz4")))
 
+#define PGLZ_OPTIONS_COUNT 6
+
+static char *PGLZ_options[PGLZ_OPTIONS_COUNT] = {
+	"min_input_size",
+	"max_input_size",
+	"min_comp_rate",
+	"first_success_by",
+	"match_size_good",
+	"match_size_drop"
+};
+
+/*
+ * Convert value from reloptions to int32, and report if it is not correct.
+ * Also checks parameter names
+ */
+static int32
+parse_option(char *name, char *value)
+{
+	int			i;
+
+	for (i = 0; i < PGLZ_OPTIONS_COUNT; i++)
+	{
+		if (strcmp(PGLZ_options[i], name) == 0)
+			return pg_atoi(value, 4, 0);
+	}
+
+	ereport(ERROR,
+			(errcode(ERRCODE_UNDEFINED_PARAMETER),
+			 errmsg("unknown compression option for pglz: \"%s\"", name)));
+}
+
+/*
+ * Check PGLZ options if specified.
+ */
+void
+pglz_check_options(List *options)
+{
+	ListCell   *lc;
+
+	foreach(lc, options)
+	{
+		DefElem    *def = (DefElem *) lfirst(lc);
+
+		parse_option(def->defname, defGetString(def));
+	}
+}
+
+/*
+ * Configure PGLZ_Strategy struct for compression function
+ */
+static inline PGLZ_Strategy *
+pglz_init_options(List *options)
+{
+	ListCell   *lc;
+	PGLZ_Strategy *strategy = palloc(sizeof(PGLZ_Strategy));
+
+	/* initialize with default strategy values */
+	memcpy(strategy, PGLZ_strategy_default, sizeof(PGLZ_Strategy));
+	foreach(lc, options)
+	{
+		DefElem    *def = (DefElem *) lfirst(lc);
+		int32		val = parse_option(def->defname, defGetString(def));
+
+		/* fill the strategy */
+		if (strcmp(def->defname, "min_input_size") == 0)
+			strategy->min_input_size = val;
+		else if (strcmp(def->defname, "max_input_size") == 0)
+			strategy->max_input_size = val;
+		else if (strcmp(def->defname, "min_comp_rate") == 0)
+			strategy->min_comp_rate = val;
+		else if (strcmp(def->defname, "first_success_by") == 0)
+			strategy->first_success_by = val;
+		else if (strcmp(def->defname, "match_size_good") == 0)
+			strategy->match_size_good = val;
+		else if (strcmp(def->defname, "match_size_drop") == 0)
+			strategy->match_size_drop = val;
+	}
+	return strategy;
+}
+
 /*
  * Compress a varlena using PGLZ.
  *
  * Returns the compressed varlena, or NULL if compression fails.
  */
 struct varlena *
-pglz_compress_datum(const struct varlena *value)
+pglz_compress_datum(const struct varlena *value, List *options)
 {
 	int32		valsize,
 				len;
-	struct varlena *tmp = NULL;
+	struct varlena	*tmp = NULL;
+	PGLZ_Strategy	*strategy;
 
 	valsize = VARSIZE_ANY_EXHDR(DatumGetPointer(value));
 
 	/*
+	 * XXX we should create attribute level cache for the parsed options
+	 * instead of parsing it for every compression.
+	 */
+	strategy = pglz_init_options(options);
+
+	/*
 	 * No point in wasting a palloc cycle if value size is outside the allowed
 	 * range for compression.
 	 */
-	if (valsize < PGLZ_strategy_default->min_input_size ||
-		valsize > PGLZ_strategy_default->max_input_size)
+	if (valsize < strategy->min_input_size ||
+		valsize > strategy->max_input_size)
 		return NULL;
 
 	/*
@@ -65,7 +153,9 @@ pglz_compress_datum(const struct varlena *value)
 	len = pglz_compress(VARDATA_ANY(value),
 						valsize,
 						(char *) tmp + VARHDRSZ_COMPRESSED,
-						NULL);
+						strategy);
+	pfree(strategy);
+
 	if (len < 0)
 	{
 		pfree(tmp);
@@ -133,12 +223,77 @@ pglz_decompress_datum_slice(const struct varlena *value,
 }
 
 /*
+ * Check options if specified. All validation is located here so
+ * we don't need to do it again in cminitstate function.
+ */
+void
+lz4_check_options(List *options)
+{
+#ifndef USE_LZ4
+	NO_LZ4_SUPPORT();
+	return NULL;				/* keep compiler quiet */
+#else
+	ListCell	*lc;
+
+	foreach(lc, options)
+	{
+		DefElem    *def = (DefElem *) lfirst(lc);
+
+		if (strcmp(def->defname, "acceleration") == 0)
+		{
+			int32 acceleration =
+				pg_atoi(defGetString(def), sizeof(acceleration), 0);
+
+			if (acceleration < INT32_MIN || acceleration > INT32_MAX)
+				ereport(ERROR,
+					(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+					 errmsg("unexpected value for lz4 compression acceleration: \"%s\"",
+								defGetString(def)),
+					 errhint("expected value between INT32_MIN and INT32_MAX")
+					));
+		}
+		else
+			ereport(ERROR,
+					(errcode(ERRCODE_UNDEFINED_PARAMETER),
+					 errmsg("unknown compression option for lz4: \"%s\"", def->defname)));
+	}
+#endif
+}
+
+static int32
+lz4_init_options(List *options)
+{
+#ifndef USE_LZ4
+	NO_LZ4_SUPPORT();
+	return NULL;				/* keep compiler quiet */
+#else
+	int32	acceleration;
+
+	acceleration = 1;
+	if (list_length(options) > 0)
+	{
+		ListCell	*lc;
+
+		foreach(lc, options)
+		{
+			DefElem    *def = (DefElem *) lfirst(lc);
+
+			if (strcmp(def->defname, "acceleration") == 0)
+				acceleration = pg_atoi(defGetString(def), sizeof(int32), 0);
+		}
+	}
+
+	return acceleration;
+#endif
+}
+
+/*
  * Compress a varlena using LZ4.
  *
  * Returns the compressed varlena, or NULL if compression fails.
  */
 struct varlena *
-lz4_compress_datum(const struct varlena *value)
+lz4_compress_datum(const struct varlena *value, List *options)
 {
 #ifndef USE_LZ4
 	NO_LZ4_SUPPORT();
@@ -147,8 +302,11 @@ lz4_compress_datum(const struct varlena *value)
 	int32		valsize;
 	int32		len;
 	int32		max_size;
+	int32		acceleration;
 	struct varlena *tmp = NULL;
 
+	acceleration = lz4_init_options(options);
+
 	valsize = VARSIZE_ANY_EXHDR(value);
 
 	/*
@@ -158,9 +316,9 @@ lz4_compress_datum(const struct varlena *value)
 	max_size = LZ4_compressBound(valsize);
 	tmp = (struct varlena *) palloc(max_size + VARHDRSZ_COMPRESSED);
 
-	len = LZ4_compress_default(VARDATA_ANY(value),
-							   (char *) tmp + VARHDRSZ_COMPRESSED,
-							   valsize, max_size);
+	len = LZ4_compress_fast(VARDATA_ANY(value),
+							(char *) tmp + VARHDRSZ_COMPRESSED,
+							valsize, max_size, acceleration);
 	if (len <= 0)
 		elog(ERROR, "lz4 compression failed");
 
diff --git a/src/backend/access/common/toast_internals.c b/src/backend/access/common/toast_internals.c
index c7b9ade..2c1e4ea 100644
--- a/src/backend/access/common/toast_internals.c
+++ b/src/backend/access/common/toast_internals.c
@@ -44,7 +44,7 @@ static bool toastid_valueid_exists(Oid toastrelid, Oid valueid);
  * ----------
  */
 Datum
-toast_compress_datum(Datum value, char cmethod)
+toast_compress_datum(Datum value, char cmethod, List *cmoptions)
 {
 	struct varlena *tmp = NULL;
 	int32		valsize;
@@ -65,11 +65,11 @@ toast_compress_datum(Datum value, char cmethod)
 	switch (cmethod)
 	{
 		case TOAST_PGLZ_COMPRESSION:
-			tmp = pglz_compress_datum((const struct varlena *) value);
+			tmp = pglz_compress_datum((const struct varlena *) value, cmoptions);
 			cmid = TOAST_PGLZ_COMPRESSION_ID;
 			break;
 		case TOAST_LZ4_COMPRESSION:
-			tmp = lz4_compress_datum((const struct varlena *) value);
+			tmp = lz4_compress_datum((const struct varlena *) value, cmoptions);
 			cmid = TOAST_LZ4_COMPRESSION_ID;
 			break;
 		default:
diff --git a/src/backend/access/table/toast_helper.c b/src/backend/access/table/toast_helper.c
index 53f78f9..302b2e5 100644
--- a/src/backend/access/table/toast_helper.c
+++ b/src/backend/access/table/toast_helper.c
@@ -15,10 +15,13 @@
 #include "postgres.h"
 
 #include "access/detoast.h"
+#include "access/reloptions.h"
 #include "access/table.h"
 #include "access/toast_helper.h"
 #include "access/toast_internals.h"
 #include "catalog/pg_type_d.h"
+#include "commands/defrem.h"
+#include "utils/syscache.h"
 
 
 /*
@@ -55,6 +58,7 @@ toast_tuple_init(ToastTupleContext *ttc)
 		ttc->ttc_attr[i].tai_colflags = 0;
 		ttc->ttc_attr[i].tai_oldexternal = NULL;
 		ttc->ttc_attr[i].tai_compression = att->attcompression;
+		ttc->ttc_attr[i].tai_cmoptions = GetAttributeCompressionOptions(att);
 
 		if (ttc->ttc_oldvalues != NULL)
 		{
@@ -230,7 +234,8 @@ toast_tuple_try_compression(ToastTupleContext *ttc, int attribute)
 	Datum		new_value;
 	ToastAttrInfo *attr = &ttc->ttc_attr[attribute];
 
-	new_value = toast_compress_datum(*value, attr->tai_compression);
+	new_value = toast_compress_datum(*value, attr->tai_compression,
+									 attr->tai_cmoptions);
 
 	if (DatumGetPointer(new_value) != NULL)
 	{
diff --git a/src/backend/bootstrap/bootparse.y b/src/backend/bootstrap/bootparse.y
index 5fcd004..a43e0e1 100644
--- a/src/backend/bootstrap/bootparse.y
+++ b/src/backend/bootstrap/bootparse.y
@@ -239,6 +239,7 @@ Boot_CreateStmt:
 													  true,
 													  false,
 													  InvalidOid,
+													  NULL,
 													  NULL);
 						elog(DEBUG4, "relation created with OID %u", id);
 					}
diff --git a/src/backend/catalog/heap.c b/src/backend/catalog/heap.c
index 83746d3..86d5cc9 100644
--- a/src/backend/catalog/heap.c
+++ b/src/backend/catalog/heap.c
@@ -740,6 +740,7 @@ InsertPgAttributeTuples(Relation pg_attribute_rel,
 						TupleDesc tupdesc,
 						Oid new_rel_oid,
 						Datum *attoptions,
+						Datum *attcmoptions,
 						CatalogIndexState indstate)
 {
 	TupleTableSlot **slot;
@@ -795,6 +796,11 @@ InsertPgAttributeTuples(Relation pg_attribute_rel,
 		else
 			slot[slotCount]->tts_isnull[Anum_pg_attribute_attoptions - 1] = true;
 
+		if (attcmoptions && attcmoptions[natts] != (Datum) 0)
+			slot[slotCount]->tts_values[Anum_pg_attribute_attcmoptions - 1] = attcmoptions[natts];
+		else
+			slot[slotCount]->tts_isnull[Anum_pg_attribute_attcmoptions - 1] = true;
+
 		/* start out with empty permissions and empty options */
 		slot[slotCount]->tts_isnull[Anum_pg_attribute_attacl - 1] = true;
 		slot[slotCount]->tts_isnull[Anum_pg_attribute_attfdwoptions - 1] = true;
@@ -842,6 +848,7 @@ InsertPgAttributeTuples(Relation pg_attribute_rel,
 static void
 AddNewAttributeTuples(Oid new_rel_oid,
 					  TupleDesc tupdesc,
+					  Datum *acoption,
 					  char relkind)
 {
 	Relation	rel;
@@ -860,7 +867,8 @@ AddNewAttributeTuples(Oid new_rel_oid,
 	/* set stats detail level to a sane default */
 	for (int i = 0; i < natts; i++)
 		tupdesc->attrs[i].attstattarget = -1;
-	InsertPgAttributeTuples(rel, tupdesc, new_rel_oid, NULL, indstate);
+	InsertPgAttributeTuples(rel, tupdesc, new_rel_oid,
+							NULL, acoption, indstate);
 
 	/* add dependencies on their datatypes and collations */
 	for (int i = 0; i < natts; i++)
@@ -892,7 +900,7 @@ AddNewAttributeTuples(Oid new_rel_oid,
 
 		td = CreateTupleDesc(lengthof(SysAtt), (FormData_pg_attribute **) &SysAtt);
 
-		InsertPgAttributeTuples(rel, td, new_rel_oid, NULL, indstate);
+		InsertPgAttributeTuples(rel, td, new_rel_oid, NULL, NULL, indstate);
 		FreeTupleDesc(td);
 	}
 
@@ -1160,6 +1168,7 @@ heap_create_with_catalog(const char *relname,
 						 bool allow_system_table_mods,
 						 bool is_internal,
 						 Oid relrewrite,
+						 Datum *acoptions,
 						 ObjectAddress *typaddress)
 {
 	Relation	pg_class_desc;
@@ -1418,7 +1427,7 @@ heap_create_with_catalog(const char *relname,
 	/*
 	 * now add tuples to pg_attribute for the attributes in our new relation.
 	 */
-	AddNewAttributeTuples(relid, new_rel_desc->rd_att, relkind);
+	AddNewAttributeTuples(relid, new_rel_desc->rd_att, acoptions, relkind);
 
 	/*
 	 * Make a dependency link to force the relation to be deleted if its
diff --git a/src/backend/catalog/index.c b/src/backend/catalog/index.c
index 26bfa74..0f86f32 100644
--- a/src/backend/catalog/index.c
+++ b/src/backend/catalog/index.c
@@ -106,10 +106,12 @@ static TupleDesc ConstructTupleDescriptor(Relation heapRelation,
 										  List *indexColNames,
 										  Oid accessMethodObjectId,
 										  Oid *collationObjectId,
-										  Oid *classObjectId);
+										  Oid *classObjectId,
+										  Datum *acoptions);
 static void InitializeAttributeOids(Relation indexRelation,
 									int numatts, Oid indexoid);
-static void AppendAttributeTuples(Relation indexRelation, Datum *attopts);
+static void AppendAttributeTuples(Relation indexRelation, Datum *attopts,
+								  Datum *attcmopts);
 static void UpdateIndexRelation(Oid indexoid, Oid heapoid,
 								Oid parentIndexId,
 								IndexInfo *indexInfo,
@@ -271,7 +273,8 @@ ConstructTupleDescriptor(Relation heapRelation,
 						 List *indexColNames,
 						 Oid accessMethodObjectId,
 						 Oid *collationObjectId,
-						 Oid *classObjectId)
+						 Oid *classObjectId,
+						 Datum *acoptions)
 {
 	int			numatts = indexInfo->ii_NumIndexAttrs;
 	int			numkeyatts = indexInfo->ii_NumIndexKeyAttrs;
@@ -332,6 +335,9 @@ ConstructTupleDescriptor(Relation heapRelation,
 		{
 			/* Simple index column */
 			const FormData_pg_attribute *from;
+			HeapTuple	attr_tuple;
+			Datum		attcmoptions;
+			bool		isNull;
 
 			Assert(atnum > 0);	/* should've been caught above */
 
@@ -348,6 +354,24 @@ ConstructTupleDescriptor(Relation heapRelation,
 			to->attalign = from->attalign;
 			to->attstorage = from->attstorage;
 			to->attcompression = from->attcompression;
+
+			attr_tuple = SearchSysCache2(ATTNUM,
+										 ObjectIdGetDatum(from->attrelid),
+										 Int16GetDatum(from->attnum));
+			if (!HeapTupleIsValid(attr_tuple))
+				elog(ERROR, "cache lookup failed for attribute %d of relation %u",
+					 from->attnum, from->attrelid);
+
+			/* Get attribute's compression option. */
+			attcmoptions = SysCacheGetAttr(ATTNUM, attr_tuple,
+										   Anum_pg_attribute_attcmoptions,
+										   &isNull);
+			if (isNull)
+				acoptions[i] = PointerGetDatum(NULL);
+			else
+				acoptions[i] =
+					PointerGetDatum(DatumGetArrayTypePCopy(attcmoptions));
+			ReleaseSysCache(attr_tuple);
 		}
 		else
 		{
@@ -499,7 +523,7 @@ InitializeAttributeOids(Relation indexRelation,
  * ----------------------------------------------------------------
  */
 static void
-AppendAttributeTuples(Relation indexRelation, Datum *attopts)
+AppendAttributeTuples(Relation indexRelation, Datum *attopts, Datum *attcmopts)
 {
 	Relation	pg_attribute;
 	CatalogIndexState indstate;
@@ -517,7 +541,8 @@ AppendAttributeTuples(Relation indexRelation, Datum *attopts)
 	 */
 	indexTupDesc = RelationGetDescr(indexRelation);
 
-	InsertPgAttributeTuples(pg_attribute, indexTupDesc, InvalidOid, attopts, indstate);
+	InsertPgAttributeTuples(pg_attribute, indexTupDesc, InvalidOid,
+							attopts, attcmopts, indstate);
 
 	CatalogCloseIndexes(indstate);
 
@@ -730,6 +755,7 @@ index_create(Relation heapRelation,
 	bool		concurrent = (flags & INDEX_CREATE_CONCURRENT) != 0;
 	bool		partitioned = (flags & INDEX_CREATE_PARTITIONED) != 0;
 	char		relkind;
+	Datum	   *acoptions;
 	TransactionId relfrozenxid;
 	MultiXactId relminmxid;
 
@@ -885,6 +911,8 @@ index_create(Relation heapRelation,
 						indexRelationName, RelationGetRelationName(heapRelation))));
 	}
 
+	acoptions = (Datum *) palloc0(sizeof(Datum) * indexInfo->ii_NumIndexAttrs);
+
 	/*
 	 * construct tuple descriptor for index tuples
 	 */
@@ -893,7 +921,8 @@ index_create(Relation heapRelation,
 											indexColNames,
 											accessMethodObjectId,
 											collationObjectId,
-											classObjectId);
+											classObjectId,
+											acoptions);
 
 	/*
 	 * Allocate an OID for the index, unless we were told what to use.
@@ -984,7 +1013,8 @@ index_create(Relation heapRelation,
 	/*
 	 * append ATTRIBUTE tuples for the index
 	 */
-	AppendAttributeTuples(indexRelation, indexInfo->ii_OpclassOptions);
+	AppendAttributeTuples(indexRelation, indexInfo->ii_OpclassOptions,
+						  acoptions);
 
 	/* ----------------
 	 *	  update pg_index
diff --git a/src/backend/catalog/toasting.c b/src/backend/catalog/toasting.c
index 0db90c2..3996321 100644
--- a/src/backend/catalog/toasting.c
+++ b/src/backend/catalog/toasting.c
@@ -266,6 +266,7 @@ create_toast_table(Relation rel, Oid toastOid, Oid toastIndexOid,
 										   true,
 										   true,
 										   OIDOldToast,
+										   NULL,
 										   NULL);
 	Assert(toast_relid != InvalidOid);
 
diff --git a/src/backend/commands/cluster.c b/src/backend/commands/cluster.c
index 9d22f64..1c0d324 100644
--- a/src/backend/commands/cluster.c
+++ b/src/backend/commands/cluster.c
@@ -701,6 +701,7 @@ make_new_heap(Oid OIDOldHeap, Oid NewTableSpace, Oid NewAccessMethod,
 										  true,
 										  true,
 										  OIDOldHeap,
+										  NULL,
 										  NULL);
 	Assert(OIDNewHeap != InvalidOid);
 
diff --git a/src/backend/commands/foreigncmds.c b/src/backend/commands/foreigncmds.c
index 146fa57..00ee8c8 100644
--- a/src/backend/commands/foreigncmds.c
+++ b/src/backend/commands/foreigncmds.c
@@ -49,50 +49,6 @@ typedef struct
 /* Internal functions */
 static void import_error_callback(void *arg);
 
-
-/*
- * Convert a DefElem list to the text array format that is used in
- * pg_foreign_data_wrapper, pg_foreign_server, pg_user_mapping, and
- * pg_foreign_table.
- *
- * Returns the array in the form of a Datum, or PointerGetDatum(NULL)
- * if the list is empty.
- *
- * Note: The array is usually stored to database without further
- * processing, hence any validation should be done before this
- * conversion.
- */
-static Datum
-optionListToArray(List *options)
-{
-	ArrayBuildState *astate = NULL;
-	ListCell   *cell;
-
-	foreach(cell, options)
-	{
-		DefElem    *def = lfirst(cell);
-		const char *value;
-		Size		len;
-		text	   *t;
-
-		value = defGetString(def);
-		len = VARHDRSZ + strlen(def->defname) + 1 + strlen(value);
-		t = palloc(len + 1);
-		SET_VARSIZE(t, len);
-		sprintf(VARDATA(t), "%s=%s", def->defname, value);
-
-		astate = accumArrayResult(astate, PointerGetDatum(t),
-								  false, TEXTOID,
-								  CurrentMemoryContext);
-	}
-
-	if (astate)
-		return makeArrayResult(astate, CurrentMemoryContext);
-
-	return PointerGetDatum(NULL);
-}
-
-
 /*
  * Transform a list of DefElem into text array format.  This is substantially
  * the same thing as optionListToArray(), except we recognize SET/ADD/DROP
diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c
index dbee6ae..55a06ad 100644
--- a/src/backend/commands/tablecmds.c
+++ b/src/backend/commands/tablecmds.c
@@ -563,7 +563,9 @@ static void ATExecGenericOptions(Relation rel, List *options);
 static void ATExecSetRowSecurity(Relation rel, bool rls);
 static void ATExecForceNoForceRowSecurity(Relation rel, bool force_rls);
 static ObjectAddress ATExecSetCompression(AlteredTableInfo *tab, Relation rel,
-										  const char *column, Node *newValue, LOCKMODE lockmode);
+										  const char *column,
+										  ColumnCompression *compression,
+										  LOCKMODE lockmode);
 
 static void index_copy_data(Relation rel, RelFileNode newrnode);
 static const char *storage_name(char c);
@@ -601,7 +603,9 @@ static void refuseDupeIndexAttach(Relation parentIdx, Relation partIdx,
 								  Relation partitionTbl);
 static List *GetParentedForeignKeyRefs(Relation partition);
 static void ATDetachCheckNoForeignKeyRefs(Relation partition);
-static char GetAttributeCompression(Oid atttypid, char *compression);
+static char GetAttributeCompression(Oid atttypid,
+									ColumnCompression *compression,
+									Datum *acoptions);
 
 
 /* ----------------------------------------------------------------
@@ -647,6 +651,7 @@ DefineRelation(CreateStmt *stmt, char relkind, Oid ownerId,
 	LOCKMODE	parentLockmode;
 	const char *accessMethod = NULL;
 	Oid			accessMethodId = InvalidOid;
+	Datum	   *acoptions;
 
 	/*
 	 * Truncate relname to appropriate length (probably a waste of time, as
@@ -851,6 +856,8 @@ DefineRelation(CreateStmt *stmt, char relkind, Oid ownerId,
 	cookedDefaults = NIL;
 	attnum = 0;
 
+	acoptions = (Datum *) palloc0(sizeof(Datum) * descriptor->natts);
+
 	foreach(listptr, stmt->tableElts)
 	{
 		ColumnDef  *colDef = lfirst(listptr);
@@ -899,7 +906,8 @@ DefineRelation(CreateStmt *stmt, char relkind, Oid ownerId,
 
 		if (colDef->compression)
 			attr->attcompression = GetAttributeCompression(attr->atttypid,
-														   colDef->compression);
+														   colDef->compression,
+														   &acoptions[attnum - 1]);
 	}
 
 	/*
@@ -951,8 +959,11 @@ DefineRelation(CreateStmt *stmt, char relkind, Oid ownerId,
 										  allowSystemTableMods,
 										  false,
 										  InvalidOid,
+										  acoptions,
 										  typaddress);
 
+	pfree(acoptions);
+
 	/*
 	 * We must bump the command counter to make the newly-created relation
 	 * tuple visible for opening.
@@ -2540,17 +2551,14 @@ MergeAttributes(List *schema, List *supers, char relpersistence,
 				/* Copy/check compression parameter */
 				if (CompressionMethodIsValid(attribute->attcompression))
 				{
-					const char *compression =
-					GetCompressionMethodName(attribute->attcompression);
+					ColumnCompression *compression = MakeColumnCompression(attribute);
 
 					if (def->compression == NULL)
-						def->compression = pstrdup(compression);
-					else if (strcmp(def->compression, compression) != 0)
-						ereport(ERROR,
-								(errcode(ERRCODE_DATATYPE_MISMATCH),
-								 errmsg("column \"%s\" has a compression method conflict",
-										attributeName),
-								 errdetail("%s versus %s", def->compression, compression)));
+						def->compression = compression;
+					else
+						CheckCompressionMismatch(def->compression,
+												 compression,
+												 attributeName);
 				}
 
 				def->inhcount++;
@@ -2588,8 +2596,7 @@ MergeAttributes(List *schema, List *supers, char relpersistence,
 				def->constraints = NIL;
 				def->location = -1;
 				if (CompressionMethodIsValid(attribute->attcompression))
-					def->compression =
-						pstrdup(GetCompressionMethodName(attribute->attcompression));
+					def->compression = MakeColumnCompression(attribute);
 				else
 					def->compression = NULL;
 				inhSchema = lappend(inhSchema, def);
@@ -2843,15 +2850,10 @@ MergeAttributes(List *schema, List *supers, char relpersistence,
 				/* Copy compression parameter */
 				if (def->compression == NULL)
 					def->compression = newdef->compression;
-				else if (newdef->compression != NULL)
-				{
-					if (strcmp(def->compression, newdef->compression) != 0)
-						ereport(ERROR,
-								(errcode(ERRCODE_DATATYPE_MISMATCH),
-								 errmsg("column \"%s\" has a compression method conflict",
-										attributeName),
-								 errdetail("%s versus %s", def->compression, newdef->compression)));
-				}
+				else if (newdef->compression)
+					CheckCompressionMismatch(def->compression,
+											 newdef->compression,
+											 attributeName);
 
 				/* Mark the column as locally defined */
 				def->is_local = true;
@@ -4884,7 +4886,8 @@ ATExecCmd(List **wqueue, AlteredTableInfo *tab,
 			address = ATExecSetStorage(rel, cmd->name, cmd->def, lockmode);
 			break;
 		case AT_SetCompression:
-			address = ATExecSetCompression(tab, rel, cmd->name, cmd->def,
+			address = ATExecSetCompression(tab, rel, cmd->name,
+										   (ColumnCompression *) cmd->def,
 										   lockmode);
 			break;
 		case AT_DropColumn:		/* DROP COLUMN */
@@ -6567,6 +6570,7 @@ ATExecAddColumn(List **wqueue, AlteredTableInfo *tab, Relation rel,
 	AclResult	aclresult;
 	ObjectAddress address;
 	TupleDesc	tupdesc;
+	Datum		acoptions = PointerGetDatum(NULL);
 	FormData_pg_attribute *aattr[] = {&attribute};
 
 	/* At top level, permission check was done in ATPrepCmd, else do it */
@@ -6713,7 +6717,8 @@ ATExecAddColumn(List **wqueue, AlteredTableInfo *tab, Relation rel,
 	attribute.attalign = tform->typalign;
 	attribute.attstorage = tform->typstorage;
 	attribute.attcompression = GetAttributeCompression(typeOid,
-													   colDef->compression);
+													   colDef->compression,
+													   &acoptions);
 	attribute.attnotnull = colDef->is_not_null;
 	attribute.atthasdef = false;
 	attribute.atthasmissing = false;
@@ -6730,7 +6735,7 @@ ATExecAddColumn(List **wqueue, AlteredTableInfo *tab, Relation rel,
 
 	tupdesc = CreateTupleDesc(lengthof(aattr), (FormData_pg_attribute **) &aattr);
 
-	InsertPgAttributeTuples(attrdesc, tupdesc, myrelid, NULL, NULL);
+	InsertPgAttributeTuples(attrdesc, tupdesc, myrelid, NULL, &acoptions, NULL);
 
 	table_close(attrdesc, RowExclusiveLock);
 
@@ -8108,10 +8113,16 @@ SetIndexStorageProperties(Relation rel, Relation attrelation,
 						  AttrNumber attnum,
 						  bool setstorage, char newstorage,
 						  bool setcompression, char newcompression,
-						  LOCKMODE lockmode)
+						  Datum acoptions, LOCKMODE lockmode)
 {
 	ListCell   *lc;
 
+	/*
+	 * Compression option can only be valid if we are updating the compression
+	 * method.
+	 */
+	Assert(DatumGetPointer(acoptions) == NULL || OidIsValid(newcompression));
+
 	foreach(lc, RelationGetIndexList(rel))
 	{
 		Oid			indexoid = lfirst_oid(lc);
@@ -8148,7 +8159,29 @@ SetIndexStorageProperties(Relation rel, Relation attrelation,
 			if (setcompression)
 				attrtuple->attcompression = newcompression;
 
-			CatalogTupleUpdate(attrelation, &tuple->t_self, tuple);
+			if (DatumGetPointer(acoptions) != NULL)
+			{
+				Datum	values[Natts_pg_attribute];
+				bool	nulls[Natts_pg_attribute];
+				bool	replace[Natts_pg_attribute];
+				HeapTuple	newtuple;
+
+				/* Initialize buffers for new tuple values. */
+				memset(values, 0, sizeof(values));
+				memset(nulls, false, sizeof(nulls));
+				memset(replace, false, sizeof(replace));
+
+				values[Anum_pg_attribute_attcmoptions - 1] = acoptions;
+				replace[Anum_pg_attribute_attcmoptions - 1] = true;
+
+				newtuple = heap_modify_tuple(tuple, RelationGetDescr(attrelation),
+											 values, nulls, replace);
+				CatalogTupleUpdate(attrelation, &newtuple->t_self, newtuple);
+
+				heap_freetuple(newtuple);
+			}
+			else
+				CatalogTupleUpdate(attrelation, &tuple->t_self, tuple);
 
 			InvokeObjectPostAlterHook(RelationRelationId,
 									  RelationGetRelid(rel),
@@ -8241,7 +8274,7 @@ ATExecSetStorage(Relation rel, const char *colName, Node *newValue, LOCKMODE loc
 	 */
 	SetIndexStorageProperties(rel, attrelation, attnum,
 							  true, newstorage,
-							  false, 0,
+							  false, 0, PointerGetDatum(NULL),
 							  lockmode);
 
 	table_close(attrelation, RowExclusiveLock);
@@ -15715,19 +15748,21 @@ static ObjectAddress
 ATExecSetCompression(AlteredTableInfo *tab,
 					 Relation rel,
 					 const char *column,
-					 Node *newValue,
+					 ColumnCompression *compression,
 					 LOCKMODE lockmode)
 {
 	Relation	attrel;
 	HeapTuple	tuple;
 	Form_pg_attribute atttableform;
 	AttrNumber	attnum;
-	char	   *compression;
 	char		cmethod;
+	HeapTuple	newtuple = NULL;
+	Datum		values[Natts_pg_attribute];
+	bool		nulls[Natts_pg_attribute];
+	bool		replace[Natts_pg_attribute];
+	Datum		acoptions = (Datum) 0;
 	ObjectAddress address;
 
-	Assert(IsA(newValue, String));
-	compression = strVal(newValue);
 
 	attrel = table_open(AttributeRelationId, RowExclusiveLock);
 
@@ -15751,11 +15786,32 @@ ATExecSetCompression(AlteredTableInfo *tab,
 	 * Check that column type is compressible, then get the attribute
 	 * compression method code
 	 */
-	cmethod = GetAttributeCompression(atttableform->atttypid, compression);
+	cmethod = GetAttributeCompression(atttableform->atttypid, compression,
+									  &acoptions);
 
-	/* update pg_attribute entry */
+	/* Update the compression method in pg_attribute entry */
 	atttableform->attcompression = cmethod;
-	CatalogTupleUpdate(attrel, &tuple->t_self, tuple);
+
+	/*
+	 * If the compression options are given then update it in the pg_attribute
+	 * entry.
+	 */
+	if (DatumGetPointer(acoptions) != NULL)
+	{
+		/* Initialize buffers for new tuple values */
+		memset(values, 0, sizeof(values));
+		memset(nulls, false, sizeof(nulls));
+		memset(replace, false, sizeof(replace));
+
+		values[Anum_pg_attribute_attcmoptions - 1] = acoptions;
+		replace[Anum_pg_attribute_attcmoptions - 1] = true;
+		newtuple = heap_modify_tuple(tuple, RelationGetDescr(attrel),
+									 values, nulls, replace);
+	}
+
+	/* Perform actual update */
+	CatalogTupleUpdate(attrel, &tuple->t_self, newtuple != NULL ? newtuple :
+					   tuple);
 
 	InvokeObjectPostAlterHook(RelationRelationId,
 							  RelationGetRelid(rel),
@@ -15767,10 +15823,13 @@ ATExecSetCompression(AlteredTableInfo *tab,
 	 */
 	SetIndexStorageProperties(rel, attrel, attnum,
 							  false, 0,
-							  true, cmethod,
+							  true, cmethod, acoptions,
 							  lockmode);
 
+	/* clean up */
 	heap_freetuple(tuple);
+	if (newtuple != NULL)
+		heap_freetuple(newtuple);
 
 	table_close(attrel, RowExclusiveLock);
 
@@ -18725,14 +18784,46 @@ ATDetachCheckNoForeignKeyRefs(Relation partition)
 }
 
 /*
+ * Fetch atttribute's compression options
+ */
+List *
+GetAttributeCompressionOptions(Form_pg_attribute att)
+{
+	HeapTuple	attr_tuple;
+	Datum		attcmoptions;
+	List	   *acoptions;
+	bool		isNull;
+
+	attr_tuple = SearchSysCache2(ATTNUM,
+								 ObjectIdGetDatum(att->attrelid),
+								 Int16GetDatum(att->attnum));
+	if (!HeapTupleIsValid(attr_tuple))
+		elog(ERROR, "cache lookup failed for attribute %d of relation %u",
+			 att->attnum, att->attrelid);
+
+	attcmoptions = SysCacheGetAttr(ATTNUM, attr_tuple,
+								   Anum_pg_attribute_attcmoptions,
+								   &isNull);
+	if (isNull)
+		acoptions = NIL;
+	else
+		acoptions = untransformRelOptions(attcmoptions);
+
+	ReleaseSysCache(attr_tuple);
+
+	return acoptions;
+}
+
+/*
  * resolve column compression specification to compression method.
  */
 static char
-GetAttributeCompression(Oid atttypid, char *compression)
+GetAttributeCompression(Oid atttypid, ColumnCompression *compression,
+						Datum *acoptions)
 {
 	char		cmethod;
 
-	if (compression == NULL || strcmp(compression, "default") == 0)
+	if (compression == NULL || strcmp(compression->cmname, "default") == 0)
 		return InvalidCompressionMethod;
 
 	/*
@@ -18753,11 +18844,79 @@ GetAttributeCompression(Oid atttypid, char *compression)
 				 errmsg("column data type %s does not support compression",
 						format_type_be(atttypid))));
 
-	cmethod = CompressionNameToMethod(compression);
+	cmethod = CompressionNameToMethod(compression->cmname);
 	if (!CompressionMethodIsValid(cmethod))
 		ereport(ERROR,
 				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
-				 errmsg("invalid compression method \"%s\"", compression)));
+				 errmsg("invalid compression method \"%s\"", compression->cmname)));
+
+	/*
+	 * If compression options are given then validate them and convert them to
+	 * the text array format.
+	 */
+	if (compression->options)
+	{
+		switch (cmethod)
+		{
+			case TOAST_PGLZ_COMPRESSION:
+				pglz_check_options(compression->options);
+				break;
+			case TOAST_LZ4_COMPRESSION:
+				lz4_check_options(compression->options);
+				break;
+			default:
+				elog(ERROR, "unknown compression method: %c", cmethod);
+		}
+
+		*acoptions = optionListToArray(compression->options);
+	}
+	else
+		*acoptions = PointerGetDatum(NULL);
 
 	return cmethod;
 }
+
+/*
+ * Make ColumnCompression node for pg_attribute entry.
+ */
+ColumnCompression *
+MakeColumnCompression(Form_pg_attribute att)
+{
+	ColumnCompression *node;
+
+	/* Return NULL if attribute's compression method is invalid. */
+	if (!OidIsValid(att->attcompression))
+		return NULL;
+
+	node = makeNode(ColumnCompression);
+
+	/* Copy the compression method name and compression options. */
+	node->cmname = pstrdup(GetCompressionMethodName(att->attcompression));
+	node->options = GetAttributeCompressionOptions(att);
+
+	return node;
+}
+
+/*
+ * Compare compression method and the compression options for two columns.
+ */
+void
+CheckCompressionMismatch(ColumnCompression *c1, ColumnCompression *c2,
+						 const char *attributeName)
+{
+	if (strcmp(c1->cmname, c2->cmname) != 0)
+		ereport(ERROR,
+				(errcode(ERRCODE_DATATYPE_MISMATCH),
+				 errmsg("column \"%s\" has a compression method conflict",
+						attributeName),
+				 errdetail("%s versus %s", c1->cmname, c2->cmname)));
+
+	if (!equal(c1->options, c2->options))
+		ereport(ERROR,
+				(errcode(ERRCODE_DATATYPE_MISMATCH),
+				 errmsg("column \"%s\" has a compression options conflict",
+						attributeName),
+				 errdetail("(%s) versus (%s)",
+						   formatRelOptions(c1->options),
+						   formatRelOptions(c2->options))));
+}
diff --git a/src/backend/nodes/copyfuncs.c b/src/backend/nodes/copyfuncs.c
index 83ec2a3..ef7cea4 100644
--- a/src/backend/nodes/copyfuncs.c
+++ b/src/backend/nodes/copyfuncs.c
@@ -3027,7 +3027,7 @@ _copyColumnDef(const ColumnDef *from)
 
 	COPY_STRING_FIELD(colname);
 	COPY_NODE_FIELD(typeName);
-	COPY_STRING_FIELD(compression);
+	COPY_NODE_FIELD(compression);
 	COPY_SCALAR_FIELD(inhcount);
 	COPY_SCALAR_FIELD(is_local);
 	COPY_SCALAR_FIELD(is_not_null);
@@ -3047,6 +3047,17 @@ _copyColumnDef(const ColumnDef *from)
 	return newnode;
 }
 
+static ColumnCompression *
+_copyColumnCompression(const ColumnCompression *from)
+{
+	ColumnCompression *newnode = makeNode(ColumnCompression);
+
+	COPY_STRING_FIELD(cmname);
+	COPY_NODE_FIELD(options);
+
+	return newnode;
+}
+
 static Constraint *
 _copyConstraint(const Constraint *from)
 {
@@ -5803,6 +5814,9 @@ copyObjectImpl(const void *from)
 		case T_ColumnDef:
 			retval = _copyColumnDef(from);
 			break;
+		case T_ColumnCompression:
+			retval = _copyColumnCompression(from);
+			break;
 		case T_Constraint:
 			retval = _copyConstraint(from);
 			break;
diff --git a/src/backend/nodes/equalfuncs.c b/src/backend/nodes/equalfuncs.c
index 4bad709..47ab93c 100644
--- a/src/backend/nodes/equalfuncs.c
+++ b/src/backend/nodes/equalfuncs.c
@@ -2651,7 +2651,7 @@ _equalColumnDef(const ColumnDef *a, const ColumnDef *b)
 {
 	COMPARE_STRING_FIELD(colname);
 	COMPARE_NODE_FIELD(typeName);
-	COMPARE_STRING_FIELD(compression);
+	COMPARE_NODE_FIELD(compression);
 	COMPARE_SCALAR_FIELD(inhcount);
 	COMPARE_SCALAR_FIELD(is_local);
 	COMPARE_SCALAR_FIELD(is_not_null);
@@ -2672,6 +2672,15 @@ _equalColumnDef(const ColumnDef *a, const ColumnDef *b)
 }
 
 static bool
+_equalColumnCompression(const ColumnCompression *a, const ColumnCompression *b)
+{
+	COMPARE_STRING_FIELD(cmname);
+	COMPARE_NODE_FIELD(options);
+
+	return true;
+}
+
+static bool
 _equalConstraint(const Constraint *a, const Constraint *b)
 {
 	COMPARE_SCALAR_FIELD(contype);
@@ -3806,6 +3815,9 @@ equal(const void *a, const void *b)
 		case T_ColumnDef:
 			retval = _equalColumnDef(a, b);
 			break;
+		case T_ColumnCompression:
+			retval = _equalColumnCompression(a, b);
+			break;
 		case T_Constraint:
 			retval = _equalConstraint(a, b);
 			break;
diff --git a/src/backend/nodes/outfuncs.c b/src/backend/nodes/outfuncs.c
index 36e6186..f9a902c 100644
--- a/src/backend/nodes/outfuncs.c
+++ b/src/backend/nodes/outfuncs.c
@@ -2948,7 +2948,7 @@ _outColumnDef(StringInfo str, const ColumnDef *node)
 
 	WRITE_STRING_FIELD(colname);
 	WRITE_NODE_FIELD(typeName);
-	WRITE_STRING_FIELD(compression);
+	WRITE_NODE_FIELD(compression);
 	WRITE_INT_FIELD(inhcount);
 	WRITE_BOOL_FIELD(is_local);
 	WRITE_BOOL_FIELD(is_not_null);
@@ -2967,6 +2967,15 @@ _outColumnDef(StringInfo str, const ColumnDef *node)
 }
 
 static void
+_outColumnCompression(StringInfo str, const ColumnCompression *node)
+{
+	WRITE_NODE_TYPE("COLUMNCOMPRESSION");
+
+	WRITE_STRING_FIELD(cmname);
+	WRITE_NODE_FIELD(options);
+}
+
+static void
 _outTypeName(StringInfo str, const TypeName *node)
 {
 	WRITE_NODE_TYPE("TYPENAME");
@@ -4374,6 +4383,9 @@ outNode(StringInfo str, const void *obj)
 			case T_ColumnDef:
 				_outColumnDef(str, obj);
 				break;
+			case T_ColumnCompression:
+				_outColumnCompression(str, obj);
+				break;
 			case T_TypeName:
 				_outTypeName(str, obj);
 				break;
diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y
index e3068a3..31da8a7 100644
--- a/src/backend/parser/gram.y
+++ b/src/backend/parser/gram.y
@@ -560,7 +560,8 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
 
 %type <node>	TableConstraint TableLikeClause
 %type <ival>	TableLikeOptionList TableLikeOption
-%type <str>		column_compression opt_column_compression
+%type <list>	opt_column_compression_options
+%type <node>	column_compression opt_column_compression
 %type <list>	ColQualList
 %type <node>	ColConstraint ColConstraintElem ConstraintAttr
 %type <ival>	key_actions key_delete key_match key_update key_action
@@ -2307,7 +2308,7 @@ alter_table_cmd:
 					AlterTableCmd *n = makeNode(AlterTableCmd);
 					n->subtype = AT_SetCompression;
 					n->name = $3;
-					n->def = (Node *) makeString($5);
+					n->def = $5;
 					$$ = (Node *)n;
 				}
 			/* ALTER TABLE <name> ALTER [COLUMN] <colname> ADD GENERATED ... AS IDENTITY ... */
@@ -3474,7 +3475,7 @@ columnDef:	ColId Typename opt_column_compression create_generic_options ColQualL
 					ColumnDef *n = makeNode(ColumnDef);
 					n->colname = $1;
 					n->typeName = $2;
-					n->compression = $3;
+					n->compression = (ColumnCompression *) $3;
 					n->inhcount = 0;
 					n->is_local = true;
 					n->is_not_null = false;
@@ -3529,9 +3530,26 @@ columnOptions:	ColId ColQualList
 				}
 		;
 
+opt_column_compression_options:
+			WITH '(' generic_option_list ')' { $$ = $3; }
+			| /*EMPTY*/	{ $$ = NULL; }
+		;
+
 column_compression:
-			COMPRESSION ColId						{ $$ = $2; }
-			| COMPRESSION DEFAULT					{ $$ = pstrdup("default"); }
+			COMPRESSION ColId opt_column_compression_options
+			{
+				ColumnCompression *n = makeNode(ColumnCompression);
+				n->cmname = $2;
+				n->options = (List *) $3;
+				$$ = (Node *) n;
+			}
+			| COMPRESSION DEFAULT
+			{
+				ColumnCompression *n = makeNode(ColumnCompression);
+				n->cmname = pstrdup("default");
+				n->options = NULL;
+				$$ = (Node *) n;
+			}
 		;
 
 opt_column_compression:
diff --git a/src/backend/parser/parse_utilcmd.c b/src/backend/parser/parse_utilcmd.c
index 1d3ee53..d6a2cf2 100644
--- a/src/backend/parser/parse_utilcmd.c
+++ b/src/backend/parser/parse_utilcmd.c
@@ -1087,8 +1087,7 @@ transformTableLikeClause(CreateStmtContext *cxt, TableLikeClause *table_like_cla
 		/* Likewise, copy compression if requested */
 		if ((table_like_clause->options & CREATE_TABLE_LIKE_COMPRESSION) != 0
 			&& CompressionMethodIsValid(attribute->attcompression))
-			def->compression =
-				pstrdup(GetCompressionMethodName(attribute->attcompression));
+			def->compression = MakeColumnCompression(attribute);
 		else
 			def->compression = NULL;
 
diff --git a/src/include/access/toast_compression.h b/src/include/access/toast_compression.h
index c992ece..40adb30 100644
--- a/src/include/access/toast_compression.h
+++ b/src/include/access/toast_compression.h
@@ -13,6 +13,8 @@
 #ifndef TOAST_COMPRESSION_H
 #define TOAST_COMPRESSION_H
 
+#include "nodes/pg_list.h"
+
 /*
  * GUC support.
  *
@@ -54,13 +56,15 @@ typedef enum ToastCompressionId
 
 
 /* pglz compression/decompression routines */
-extern struct varlena *pglz_compress_datum(const struct varlena *value);
+extern void pglz_check_options(List *options);
+extern struct varlena *pglz_compress_datum(const struct varlena *value, List *options);
 extern struct varlena *pglz_decompress_datum(const struct varlena *value);
 extern struct varlena *pglz_decompress_datum_slice(const struct varlena *value,
 												   int32 slicelength);
 
 /* lz4 compression/decompression routines */
-extern struct varlena *lz4_compress_datum(const struct varlena *value);
+extern void lz4_check_options(List *options);
+extern struct varlena *lz4_compress_datum(const struct varlena *value, List *options);
 extern struct varlena *lz4_decompress_datum(const struct varlena *value);
 extern struct varlena *lz4_decompress_datum_slice(const struct varlena *value,
 												  int32 slicelength);
diff --git a/src/include/access/toast_helper.h b/src/include/access/toast_helper.h
index 05104ce..312e54b 100644
--- a/src/include/access/toast_helper.h
+++ b/src/include/access/toast_helper.h
@@ -15,6 +15,7 @@
 #define TOAST_HELPER_H
 
 #include "utils/rel.h"
+#include "nodes/pg_list.h"
 
 /*
  * Information about one column of a tuple being toasted.
@@ -33,6 +34,7 @@ typedef struct
 	int32		tai_size;
 	uint8		tai_colflags;
 	char		tai_compression;
+	List	   *tai_cmoptions;
 } ToastAttrInfo;
 
 /*
diff --git a/src/include/access/toast_internals.h b/src/include/access/toast_internals.h
index 1c28b07..301e892 100644
--- a/src/include/access/toast_internals.h
+++ b/src/include/access/toast_internals.h
@@ -45,7 +45,7 @@ typedef struct toast_compress_header
 			(len) | ((uint32) (cm_method) << VARLENA_EXTSIZE_BITS); \
 	} while (0)
 
-extern Datum toast_compress_datum(Datum value, char cmethod);
+extern Datum toast_compress_datum(Datum value, char cmethod, List *cmoptions);
 extern Oid	toast_get_valid_index(Oid toastoid, LOCKMODE lock);
 
 extern void toast_delete_datum(Relation rel, Datum value, bool is_speculative);
diff --git a/src/include/catalog/heap.h b/src/include/catalog/heap.h
index 6ce480b..5ecb6f3 100644
--- a/src/include/catalog/heap.h
+++ b/src/include/catalog/heap.h
@@ -81,6 +81,7 @@ extern Oid	heap_create_with_catalog(const char *relname,
 									 bool allow_system_table_mods,
 									 bool is_internal,
 									 Oid relrewrite,
+									 Datum *acoptions,
 									 ObjectAddress *typaddress);
 
 extern void heap_drop_with_catalog(Oid relid);
@@ -97,6 +98,7 @@ extern void InsertPgAttributeTuples(Relation pg_attribute_rel,
 									TupleDesc tupdesc,
 									Oid new_rel_oid,
 									Datum *attoptions,
+									Datum *attcmoptions,
 									CatalogIndexState indstate);
 
 extern void InsertPgClassTuple(Relation pg_class_desc,
diff --git a/src/include/catalog/pg_attribute.h b/src/include/catalog/pg_attribute.h
index 5c1ec93..6b16f77 100644
--- a/src/include/catalog/pg_attribute.h
+++ b/src/include/catalog/pg_attribute.h
@@ -182,6 +182,9 @@ CATALOG(pg_attribute,1249,AttributeRelationId) BKI_BOOTSTRAP BKI_ROWTYPE_OID(75,
 	/* Column-level FDW options */
 	text		attfdwoptions[1] BKI_DEFAULT(_null_);
 
+	/* current compression options */
+	text		attcmoptions[1] BKI_DEFAULT(_null_);
+
 	/*
 	 * Missing value for added columns. This is a one element array which lets
 	 * us store a value of the attribute type here.
diff --git a/src/include/commands/defrem.h b/src/include/commands/defrem.h
index f84d099..f5d3239 100644
--- a/src/include/commands/defrem.h
+++ b/src/include/commands/defrem.h
@@ -134,7 +134,12 @@ extern Datum transformGenericOptions(Oid catalogId,
 									 Datum oldOptions,
 									 List *options,
 									 Oid fdwvalidator);
-
+extern Datum optionListToArray(List *options);
+extern char *formatRelOptions(List *options);
+extern ColumnCompression *MakeColumnCompression(Form_pg_attribute att);
+extern List *GetAttributeCompressionOptions(Form_pg_attribute att);
+extern void CheckCompressionMismatch(ColumnCompression *c1, ColumnCompression *c2,
+									 const char *attributeName);
 /* commands/amcmds.c */
 extern ObjectAddress CreateAccessMethod(CreateAmStmt *stmt);
 extern Oid	get_index_am_oid(const char *amname, bool missing_ok);
diff --git a/src/include/nodes/nodes.h b/src/include/nodes/nodes.h
index a692eb7..6d1508d 100644
--- a/src/include/nodes/nodes.h
+++ b/src/include/nodes/nodes.h
@@ -455,6 +455,7 @@ typedef enum NodeTag
 	T_RangeTableFuncCol,
 	T_TypeName,
 	T_ColumnDef,
+	T_ColumnCompression,
 	T_IndexElem,
 	T_StatsElem,
 	T_Constraint,
diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h
index 45e4f2a..d61225a 100644
--- a/src/include/nodes/parsenodes.h
+++ b/src/include/nodes/parsenodes.h
@@ -647,6 +647,19 @@ typedef struct RangeTableSample
 } RangeTableSample;
 
 /*
+ * ColumnCompression - compression parameters for some attribute
+ *
+ * This represents compression information defined using clause:
+ * .. COMPRESSION <compression method> <compression method options>
+ */
+typedef struct ColumnCompression
+{
+	NodeTag		type;
+	char	   *cmname;
+	List	   *options;
+} ColumnCompression;
+
+/*
  * ColumnDef - column definition (used in various creates)
  *
  * If the column has a default value, we may have the value expression
@@ -669,7 +682,7 @@ typedef struct ColumnDef
 	NodeTag		type;
 	char	   *colname;		/* name of column */
 	TypeName   *typeName;		/* type of column */
-	char	   *compression;	/* compression method for column */
+	ColumnCompression *compression;	/* column compression */
 	int			inhcount;		/* number of times column is inherited */
 	bool		is_local;		/* column has local (non-inherited) def'n */
 	bool		is_not_null;	/* NOT NULL constraint specified? */
diff --git a/src/test/regress/expected/compression.out b/src/test/regress/expected/compression.out
index 4c997e2..9d4337b 100644
--- a/src/test/regress/expected/compression.out
+++ b/src/test/regress/expected/compression.out
@@ -319,6 +319,39 @@ CREATE TABLE cmdata2 (f1 TEXT COMPRESSION pglz, f2 TEXT COMPRESSION lz4);
 CREATE UNIQUE INDEX idx1 ON cmdata2 ((f1 || f2));
 INSERT INTO cmdata2 VALUES((SELECT array_agg(md5(g::TEXT))::TEXT FROM
 generate_series(1, 50) g), VERSION());
+-- compression options
+CREATE TABLE cmdata3(f1 TEXT COMPRESSION pglz WITH (min_input_size '100'));
+CREATE INDEX idx2 ON cmdata3(f1);
+INSERT INTO cmdata3 VALUES(repeat('1234567890',1000));
+SELECT attcmoptions FROM pg_attribute WHERE attrelid = 'cmdata3'::regclass AND attname='f1';
+     attcmoptions     
+----------------------
+ {min_input_size=100}
+(1 row)
+
+SELECT attcmoptions FROM pg_attribute WHERE attrelid = 'idx2'::regclass AND attname='f1';
+     attcmoptions     
+----------------------
+ {min_input_size=100}
+(1 row)
+
+ALTER TABLE cmdata3 ALTER COLUMN f1 SET COMPRESSION lz4 WITH (acceleration '50');
+INSERT INTO cmdata3 VALUES(repeat('1234567890',1004));
+SELECT attcmoptions FROM pg_attribute WHERE attrelid = 'cmdata3'::regclass AND attname='f1';
+   attcmoptions    
+-------------------
+ {acceleration=50}
+(1 row)
+
+SELECT attcmoptions FROM pg_attribute WHERE attrelid = 'idx2'::regclass AND attname='f1';
+   attcmoptions    
+-------------------
+ {acceleration=50}
+(1 row)
+
+--error
+ALTER TABLE cmdata3 ALTER COLUMN f1 SET COMPRESSION pglz WITH (invalid_option '50');
+ERROR:  unknown compression option for pglz: "invalid_option"
 -- check data is ok
 SELECT length(f1) FROM cmdata;
  length 
diff --git a/src/test/regress/expected/compression_1.out b/src/test/regress/expected/compression_1.out
index 1ce2962..1111899 100644
--- a/src/test/regress/expected/compression_1.out
+++ b/src/test/regress/expected/compression_1.out
@@ -322,6 +322,42 @@ generate_series(1, 50) g), VERSION());
 ERROR:  relation "cmdata2" does not exist
 LINE 1: INSERT INTO cmdata2 VALUES((SELECT array_agg(md5(g::TEXT))::...
                     ^
+-- compression options
+CREATE TABLE cmdata3(f1 TEXT COMPRESSION pglz WITH (min_input_size '100'));
+CREATE INDEX idx2 ON cmdata3(f1);
+INSERT INTO cmdata3 VALUES(repeat('1234567890',1000));
+SELECT attcmoptions FROM pg_attribute WHERE attrelid = 'cmdata3'::regclass AND attname='f1';
+     attcmoptions     
+----------------------
+ {min_input_size=100}
+(1 row)
+
+SELECT attcmoptions FROM pg_attribute WHERE attrelid = 'idx2'::regclass AND attname='f1';
+     attcmoptions     
+----------------------
+ {min_input_size=100}
+(1 row)
+
+ALTER TABLE cmdata3 ALTER COLUMN f1 SET COMPRESSION lz4 WITH (acceleration '50');
+ERROR:  unsupported LZ4 compression method
+DETAIL:  This functionality requires the server to be built with lz4 support.
+HINT:  You need to rebuild PostgreSQL using --with-lz4.
+INSERT INTO cmdata3 VALUES(repeat('1234567890',1004));
+SELECT attcmoptions FROM pg_attribute WHERE attrelid = 'cmdata3'::regclass AND attname='f1';
+     attcmoptions     
+----------------------
+ {min_input_size=100}
+(1 row)
+
+SELECT attcmoptions FROM pg_attribute WHERE attrelid = 'idx2'::regclass AND attname='f1';
+     attcmoptions     
+----------------------
+ {min_input_size=100}
+(1 row)
+
+--error
+ALTER TABLE cmdata3 ALTER COLUMN f1 SET COMPRESSION pglz WITH (invalid_option '50');
+ERROR:  unknown compression option for pglz: "invalid_option"
 -- check data is ok
 SELECT length(f1) FROM cmdata;
  length 
diff --git a/src/test/regress/expected/misc_sanity.out b/src/test/regress/expected/misc_sanity.out
index a57fd14..b6a6fe2 100644
--- a/src/test/regress/expected/misc_sanity.out
+++ b/src/test/regress/expected/misc_sanity.out
@@ -50,6 +50,7 @@ ORDER BY 1, 2;
          relname         |    attname    |   atttypid   
 -------------------------+---------------+--------------
  pg_attribute            | attacl        | aclitem[]
+ pg_attribute            | attcmoptions  | text[]
  pg_attribute            | attfdwoptions | text[]
  pg_attribute            | attmissingval | anyarray
  pg_attribute            | attoptions    | text[]
@@ -60,7 +61,7 @@ ORDER BY 1, 2;
  pg_index                | indpred       | pg_node_tree
  pg_largeobject          | data          | bytea
  pg_largeobject_metadata | lomacl        | aclitem[]
-(11 rows)
+(12 rows)
 
 -- system catalogs without primary keys
 --
diff --git a/src/test/regress/sql/compression.sql b/src/test/regress/sql/compression.sql
index 86332dc..c681159 100644
--- a/src/test/regress/sql/compression.sql
+++ b/src/test/regress/sql/compression.sql
@@ -138,6 +138,21 @@ CREATE UNIQUE INDEX idx1 ON cmdata2 ((f1 || f2));
 INSERT INTO cmdata2 VALUES((SELECT array_agg(md5(g::TEXT))::TEXT FROM
 generate_series(1, 50) g), VERSION());
 
+-- compression options
+CREATE TABLE cmdata3(f1 TEXT COMPRESSION pglz WITH (min_input_size '100'));
+CREATE INDEX idx2 ON cmdata3(f1);
+INSERT INTO cmdata3 VALUES(repeat('1234567890',1000));
+SELECT attcmoptions FROM pg_attribute WHERE attrelid = 'cmdata3'::regclass AND attname='f1';
+SELECT attcmoptions FROM pg_attribute WHERE attrelid = 'idx2'::regclass AND attname='f1';
+
+ALTER TABLE cmdata3 ALTER COLUMN f1 SET COMPRESSION lz4 WITH (acceleration '50');
+INSERT INTO cmdata3 VALUES(repeat('1234567890',1004));
+SELECT attcmoptions FROM pg_attribute WHERE attrelid = 'cmdata3'::regclass AND attname='f1';
+SELECT attcmoptions FROM pg_attribute WHERE attrelid = 'idx2'::regclass AND attname='f1';
+
+--error
+ALTER TABLE cmdata3 ALTER COLUMN f1 SET COMPRESSION pglz WITH (invalid_option '50');
+
 -- check data is ok
 SELECT length(f1) FROM cmdata;
 SELECT length(f1) FROM cmdata1;
-- 
1.8.3.1

Reply via email to