On 28.03.2018 20:26, Konstantin Knizhnik wrote:


On 17.03.2018 17:18, Peter Eisentraut wrote:
On 3/11/18 13:28, Tom Lane wrote:
My proposal is the attached patch that sets the default in libpq to off
and adjusts the documentation a bit so it doesn't sound like we have
missed the news altogether.
Seems reasonable as far as it goes, but do we need to make corresponding
server-side changes?
We could add a setting to disable SSL compression on the server, as some
web servers have been doing, but the niche of version combinations where
that would be applicable seems pretty low.



One of our customers was managed to improve speed about 10 times by using SSL compression for the system where client and servers are located in different geographical regions and query results are very large because of JSON columns. Them actually do not need encryption, just compression. I expect that it is not the only case where compression of libpq protocol can be useful. Please notice that logical replication is also using libpq protocol.

If SSL compression is deprecated, should we provide own compression?
I have implemented some prototype implementation of it (patch is attached). I have added compression=on/off parameter to connection string and -Z option to psql and pgbench utilities.
Below are some results:

Compression ratio (raw->compressed):


        libz (level=1)
        libzstd (level=1)
pgbench -i -s 10
        16997209->2536330
        16997209->268077
pgbench -t 100000 -S
        6289036->1523862
6600338<-900293
        6288933->1777400
6600338<-1000318


There is no mistyping: libzstd compress COPY data about 10 times better than libz, with wonderful compression ratio 63.

Influence on execution time is minimal (I have tested local configuration when client and server are at the same host):


        no compression
        libz (level=1)
        libzstd (level=1)
pgbench -i -s 10
        1.552
        1.572
        1.611
pgbench -t 100000 -S
        4.482
        4.926
        4.877





--
Konstantin Knizhnik
Postgres Professional:http://www.postgrespro.com
The Russian Postgres Company

Sorry, I have attached wrong patch.
To use zstd compression, Postgres should be configured with --with-zstd. Otherwise compression will use zlib unless it is disabled by --without-zlib option.


--
Konstantin Knizhnik
Postgres Professional: http://www.postgrespro.com
The Russian Postgres Company

diff --git a/configure b/configure
index 6f659a5..f2ef62d 100755
--- a/configure
+++ b/configure
@@ -698,6 +698,7 @@ ELF_SYS
 EGREP
 GREP
 with_zlib
+with_zstd
 with_system_tzdata
 with_libxslt
 with_libxml
@@ -862,6 +863,7 @@ with_libxml
 with_libxslt
 with_system_tzdata
 with_zlib
+with_zstd
 with_gnu_ld
 enable_largefile
 enable_float4_byval
@@ -8016,6 +8018,86 @@ fi
 
 
 #
+# ZStd
+#
+
+
+
+# Check whether --with-zstd was given.
+if test "${with_zstd+set}" = set; then :
+  withval=$with_zstd;
+  case $withval in
+    yes)
+      ;;
+    no)
+      :
+      ;;
+    *)
+      as_fn_error $? "no argument expected for --with-zstd option" "$LINENO" 5
+      ;;
+  esac
+
+else
+  with_zstd=no
+
+fi
+
+
+
+
+if test "$with_zstd" = yes ; then
+  { $as_echo "$as_me:${as_lineno-$LINENO}: checking for ZSTD_compress in -lzstd" >&5
+$as_echo_n "checking for ZSTD_compress in -lzstd... " >&6; }
+if ${ac_cv_lib_zstd_ZSTD_compress+:} false; then :
+  $as_echo_n "(cached) " >&6
+else
+  ac_check_lib_save_LIBS=$LIBS
+LIBS="-lzstd  $LIBS"
+cat confdefs.h - <<_ACEOF >conftest.$ac_ext
+/* end confdefs.h.  */
+
+/* Override any GCC internal prototype to avoid an error.
+   Use char because int might match the return type of a GCC
+   builtin and then its argument prototype would still apply.  */
+#ifdef __cplusplus
+extern "C"
+#endif
+char ZSTD_compress ();
+int
+main ()
+{
+return ZSTD_compress ();
+  ;
+  return 0;
+}
+_ACEOF
+if ac_fn_c_try_link "$LINENO"; then :
+  ac_cv_lib_zstd_ZSTD_compress=yes
+else
+  ac_cv_lib_zstd_ZSTD_compress=no
+fi
+rm -f core conftest.err conftest.$ac_objext \
+    conftest$ac_exeext conftest.$ac_ext
+LIBS=$ac_check_lib_save_LIBS
+fi
+{ $as_echo "$as_me:${as_lineno-$LINENO}: result: $ac_cv_lib_zstd_ZSTD_compress" >&5
+$as_echo "$ac_cv_lib_zstd_ZSTD_compress" >&6; }
+if test "x$ac_cv_lib_zstd_ZSTD_compress" = xyes; then :
+  cat >>confdefs.h <<_ACEOF
+#define HAVE_LIBZSTD 1
+_ACEOF
+
+  LIBS="-lzstd $LIBS"
+
+else
+  as_fn_error $? "library 'zstd' is required for ZSTD support" "$LINENO" 5
+fi
+
+fi
+
+
+
+#
 # Elf
 #
 
diff --git a/src/Makefile.global.in b/src/Makefile.global.in
index 3bbdf17..08b8ab2 100644
--- a/src/Makefile.global.in
+++ b/src/Makefile.global.in
@@ -195,6 +195,7 @@ with_llvm	= @with_llvm@
 with_system_tzdata = @with_system_tzdata@
 with_uuid	= @with_uuid@
 with_zlib	= @with_zlib@
+with_zstd   = @with_zstd@
 enable_rpath	= @enable_rpath@
 enable_nls	= @enable_nls@
 enable_debug	= @enable_debug@
diff --git a/src/backend/Makefile b/src/backend/Makefile
index 6450c5a..1522c70 100644
--- a/src/backend/Makefile
+++ b/src/backend/Makefile
@@ -50,6 +50,14 @@ ifeq ($(with_systemd),yes)
 LIBS += -lsystemd
 endif
 
+ifeq ($(with_zstd),yes)
+LIBS += -lzstd
+endif
+
+ifeq ($(with_zlib),yes)
+LIBS += -lz
+endif
+
 ##########################################################################
 
 all: submake-libpgport submake-schemapg postgres $(POSTGRES_IMP)
diff --git a/src/backend/libpq/pqcomm.c b/src/backend/libpq/pqcomm.c
index a4f6d4d..3a6f54b 100644
--- a/src/backend/libpq/pqcomm.c
+++ b/src/backend/libpq/pqcomm.c
@@ -95,6 +95,7 @@
 #include "storage/ipc.h"
 #include "utils/guc.h"
 #include "utils/memutils.h"
+#include  "common/zpq_stream.h"
 
 /*
  * Cope with the various platform-specific ways to spell TCP keepalive socket
@@ -143,6 +144,9 @@ static char PqRecvBuffer[PQ_RECV_BUFFER_SIZE];
 static int	PqRecvPointer;		/* Next index to read a byte from PqRecvBuffer */
 static int	PqRecvLength;		/* End of data available in PqRecvBuffer */
 
+static ZpqStream* PqStream;
+
+
 /*
  * Message status
  */
@@ -185,6 +189,31 @@ PQcommMethods *PqCommMethods = &PqCommSocketMethods;
 
 WaitEventSet *FeBeWaitSet;
 
+/* --------------------------------
+ *		pq_configure - configure connection using port settings
+ *
+ * Right now only conpression is toggled in the configure.
+ * Function returns 0 in case of success, non-null in case of error
+ * --------------------------------
+ */
+int
+pq_configure(Port* port)
+{
+	if (port->use_compression)
+	{
+		char compression = 'z'; /* Request compression message */
+		int rc;
+		/* Switch on compression at client side */
+		socket_set_nonblocking(false);
+		while ((rc = secure_write(MyProcPort, &compression, 1)) < 0 && errno == EINTR);
+		if (rc != 1)
+			return -1;
+
+		/* initialize compression */
+		PqStream = zpq_create((zpq_tx_func)secure_write, (zpq_rx_func)secure_read, MyProcPort);
+	}
+	return 0;
+}
 
 /* --------------------------------
  *		pq_init - initialize libpq at backend startup
@@ -225,6 +254,7 @@ pq_init(void)
 					  NULL, NULL);
 	AddWaitEventToSet(FeBeWaitSet, WL_LATCH_SET, -1, MyLatch, NULL);
 	AddWaitEventToSet(FeBeWaitSet, WL_POSTMASTER_DEATH, -1, NULL, NULL);
+
 }
 
 /* --------------------------------
@@ -282,6 +312,9 @@ socket_close(int code, Datum arg)
 		free(MyProcPort->gss);
 #endif							/* ENABLE_GSS || ENABLE_SSPI */
 
+		/* Release compression streams */
+		zpq_free(PqStream);
+
 		/*
 		 * Cleanly shut down SSL layer.  Nowhere else does a postmaster child
 		 * call this, so this is safe when interrupting BackendInitialize().
@@ -932,12 +965,14 @@ socket_set_nonblocking(bool nonblocking)
 /* --------------------------------
  *		pq_recvbuf - load some bytes into the input buffer
  *
- *		returns 0 if OK, EOF if trouble
+ *		returns number of read bytes, EOF if trouble
  * --------------------------------
  */
 static int
-pq_recvbuf(void)
+pq_recvbuf(bool nowait)
 {
+	int			   r;
+
 	if (PqRecvPointer > 0)
 	{
 		if (PqRecvLength > PqRecvPointer)
@@ -953,21 +988,34 @@ pq_recvbuf(void)
 	}
 
 	/* Ensure that we're in blocking mode */
-	socket_set_nonblocking(false);
+	socket_set_nonblocking(nowait);
 
 	/* Can fill buffer from PqRecvLength and upwards */
 	for (;;)
 	{
-		int			r;
-
-		r = secure_read(MyProcPort, PqRecvBuffer + PqRecvLength,
-						PQ_RECV_BUFFER_SIZE - PqRecvLength);
+		size_t processed = 0;
+		r = PqStream
+			? zpq_read(PqStream, PqRecvBuffer + PqRecvLength,
+					   PQ_RECV_BUFFER_SIZE - PqRecvLength, &processed)
+			: secure_read(MyProcPort, PqRecvBuffer + PqRecvLength,
+						  PQ_RECV_BUFFER_SIZE - PqRecvLength);
+		PqRecvLength += processed;
 
 		if (r < 0)
 		{
+			if (r == ZPQ_DECOMPRESS_ERROR)
+			{
+				ereport(COMMERROR,
+						(errcode_for_socket_access(),
+						 errmsg("Failed to decompress data: %s", zpq_error(PqStream))));
+				return EOF;
+			}
 			if (errno == EINTR)
 				continue;		/* Ok if interrupted */
 
+			if (nowait && (errno == EAGAIN || errno == EWOULDBLOCK))
+				return 0;
+
 			/*
 			 * Careful: an ereport() that tries to write to the client would
 			 * cause recursion to here, leading to stack overflow and core
@@ -988,7 +1036,7 @@ pq_recvbuf(void)
 		}
 		/* r contains number of bytes read, so just incr length */
 		PqRecvLength += r;
-		return 0;
+		return r;
 	}
 }
 
@@ -1003,7 +1051,7 @@ pq_getbyte(void)
 
 	while (PqRecvPointer >= PqRecvLength)
 	{
-		if (pq_recvbuf())		/* If nothing in buffer, then recv some */
+		if (pq_recvbuf(false) == EOF)		/* If nothing in buffer, then recv some */
 			return EOF;			/* Failed to recv data */
 	}
 	return (unsigned char) PqRecvBuffer[PqRecvPointer++];
@@ -1022,7 +1070,7 @@ pq_peekbyte(void)
 
 	while (PqRecvPointer >= PqRecvLength)
 	{
-		if (pq_recvbuf())		/* If nothing in buffer, then recv some */
+		if (pq_recvbuf(false) == EOF)		/* If nothing in buffer, then recv some */
 			return EOF;			/* Failed to recv data */
 	}
 	return (unsigned char) PqRecvBuffer[PqRecvPointer];
@@ -1043,44 +1091,11 @@ pq_getbyte_if_available(unsigned char *c)
 
 	Assert(PqCommReadingMsg);
 
-	if (PqRecvPointer < PqRecvLength)
+	if (PqRecvPointer < PqRecvLength || (r = pq_recvbuf(true)) > 0)
 	{
 		*c = PqRecvBuffer[PqRecvPointer++];
 		return 1;
 	}
-
-	/* Put the socket into non-blocking mode */
-	socket_set_nonblocking(true);
-
-	r = secure_read(MyProcPort, c, 1);
-	if (r < 0)
-	{
-		/*
-		 * Ok if no data available without blocking or interrupted (though
-		 * EINTR really shouldn't happen with a non-blocking socket). Report
-		 * other errors.
-		 */
-		if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR)
-			r = 0;
-		else
-		{
-			/*
-			 * Careful: an ereport() that tries to write to the client would
-			 * cause recursion to here, leading to stack overflow and core
-			 * dump!  This message must go *only* to the postmaster log.
-			 */
-			ereport(COMMERROR,
-					(errcode_for_socket_access(),
-					 errmsg("could not receive data from client: %m")));
-			r = EOF;
-		}
-	}
-	else if (r == 0)
-	{
-		/* EOF detected */
-		r = EOF;
-	}
-
 	return r;
 }
 
@@ -1101,7 +1116,7 @@ pq_getbytes(char *s, size_t len)
 	{
 		while (PqRecvPointer >= PqRecvLength)
 		{
-			if (pq_recvbuf())	/* If nothing in buffer, then recv some */
+			if (pq_recvbuf(false) == EOF)	/* If nothing in buffer, then recv some */
 				return EOF;		/* Failed to recv data */
 		}
 		amount = PqRecvLength - PqRecvPointer;
@@ -1135,7 +1150,7 @@ pq_discardbytes(size_t len)
 	{
 		while (PqRecvPointer >= PqRecvLength)
 		{
-			if (pq_recvbuf())	/* If nothing in buffer, then recv some */
+			if (pq_recvbuf(false) == EOF)	/* If nothing in buffer, then recv some */
 				return EOF;		/* Failed to recv data */
 		}
 		amount = PqRecvLength - PqRecvPointer;
@@ -1176,7 +1191,7 @@ pq_getstring(StringInfo s)
 	{
 		while (PqRecvPointer >= PqRecvLength)
 		{
-			if (pq_recvbuf())	/* If nothing in buffer, then recv some */
+			if (pq_recvbuf(false) == EOF)	/* If nothing in buffer, then recv some */
 				return EOF;		/* Failed to recv data */
 		}
 
@@ -1426,13 +1441,18 @@ internal_flush(void)
 	char	   *bufptr = PqSendBuffer + PqSendStart;
 	char	   *bufend = PqSendBuffer + PqSendPointer;
 
-	while (bufptr < bufend)
+	while (bufptr < bufend || zpq_buffered(PqStream) != 0) /* has more data to flush or unsent data in internal compression buffer */
 	{
-		int			r;
-
-		r = secure_write(MyProcPort, bufptr, bufend - bufptr);
-
-		if (r <= 0)
+		int		r;
+		size_t  processed = 0;
+		size_t  available = bufend - bufptr;
+		r = PqStream
+			? zpq_write(PqStream, bufptr, available, &processed)
+			: secure_write(MyProcPort, bufptr, available);
+		bufptr += processed;
+		PqSendStart += processed;
+
+		if (r < 0 || (r == 0 && available))
 		{
 			if (errno == EINTR)
 				continue;		/* Ok if we were interrupted */
@@ -1480,7 +1500,6 @@ internal_flush(void)
 		bufptr += r;
 		PqSendStart += r;
 	}
-
 	PqSendStart = PqSendPointer = 0;
 	return 0;
 }
diff --git a/src/backend/postmaster/postmaster.c b/src/backend/postmaster/postmaster.c
index 660f318..295d756 100644
--- a/src/backend/postmaster/postmaster.c
+++ b/src/backend/postmaster/postmaster.c
@@ -2101,6 +2101,16 @@ retry1:
 				port->database_name = pstrdup(valptr);
 			else if (strcmp(nameptr, "user") == 0)
 				port->user_name = pstrdup(valptr);
+			else if (strcmp(nameptr, "compression") == 0)
+			{
+				if (!parse_bool(valptr, &port->use_compression))
+					ereport(FATAL,
+							(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+							 errmsg("invalid boolean value for parameter \"%s\": \"%s\"",
+									"compression",
+									valptr),
+							 errhint("Valid values are: \"false\", \"off\", 0, \"true\", \"on\", 1.")));
+			}
 			else if (strcmp(nameptr, "options") == 0)
 				port->cmdline_options = pstrdup(valptr);
 			else if (strcmp(nameptr, "replication") == 0)
@@ -4305,6 +4315,14 @@ BackendInitialize(Port *port)
 	if (status != STATUS_OK)
 		proc_exit(0);
 
+	if (pq_configure(port))
+	{
+		ereport(COMMERROR,
+				(errcode_for_socket_access(),
+				 errmsg("failed to send compression message: %m")));
+		proc_exit(0);
+	}
+
 	/*
 	 * Now that we have the user and database name, we can set the process
 	 * title for ps.  It's good to do this as early as possible in startup.
diff --git a/src/bin/pgbench/pgbench.c b/src/bin/pgbench/pgbench.c
index a15aa06..769a06d 100644
--- a/src/bin/pgbench/pgbench.c
+++ b/src/bin/pgbench/pgbench.c
@@ -183,6 +183,7 @@ int			nclients = 1;		/* number of clients */
 int			nthreads = 1;		/* number of threads */
 bool		is_connect;			/* establish connection for each transaction */
 bool		is_latencies;		/* report per-command latencies */
+bool        libpq_compression;  /* use libpq compression */
 int			main_pid;			/* main process id used in log filename */
 
 char	   *pghost = "";
@@ -575,6 +576,7 @@ usage(void)
 		   "  -h, --host=HOSTNAME      database server host or socket directory\n"
 		   "  -p, --port=PORT          database server port number\n"
 		   "  -U, --username=USERNAME  connect as specified database user\n"
+		   "  -Z, --compression        use libpq compression\n"
 		   "  -V, --version            output version information, then exit\n"
 		   "  -?, --help               show this help, then exit\n"
 		   "\n"
@@ -1092,7 +1094,7 @@ doConnect(void)
 	 */
 	do
 	{
-#define PARAMS_ARRAY_SIZE	7
+#define PARAMS_ARRAY_SIZE	8
 
 		const char *keywords[PARAMS_ARRAY_SIZE];
 		const char *values[PARAMS_ARRAY_SIZE];
@@ -1109,8 +1111,10 @@ doConnect(void)
 		values[4] = dbName;
 		keywords[5] = "fallback_application_name";
 		values[5] = progname;
-		keywords[6] = NULL;
-		values[6] = NULL;
+		keywords[6] = "compression";
+		values[6] = libpq_compression ? "on" : "off";
+		keywords[7] = NULL;
+		values[7] = NULL;
 
 		new_pass = false;
 
@@ -4442,6 +4446,7 @@ main(int argc, char **argv)
 		{"builtin", required_argument, NULL, 'b'},
 		{"client", required_argument, NULL, 'c'},
 		{"connect", no_argument, NULL, 'C'},
+		{"compression", no_argument, NULL, 'Z'},
 		{"debug", no_argument, NULL, 'd'},
 		{"define", required_argument, NULL, 'D'},
 		{"file", required_argument, NULL, 'f'},
@@ -4543,12 +4548,15 @@ main(int argc, char **argv)
 	state = (CState *) pg_malloc(sizeof(CState));
 	memset(state, 0, sizeof(CState));
 
-	while ((c = getopt_long(argc, argv, "iI:h:nvp:dqb:SNc:j:Crs:t:T:U:lf:D:F:M:P:R:L:", long_options, &optindex)) != -1)
+	while ((c = getopt_long(argc, argv, "iI:h:nvp:dqb:SNc:j:Crs:t:T:U:lf:D:F:M:P:R:L:Z", long_options, &optindex)) != -1)
 	{
 		char	   *script;
 
 		switch (c)
 		{
+			case 'Z':
+				libpq_compression = true;
+				break;
 			case 'i':
 				is_init_mode = true;
 				break;
diff --git a/src/bin/psql/startup.c b/src/bin/psql/startup.c
index be57574..9271716 100644
--- a/src/bin/psql/startup.c
+++ b/src/bin/psql/startup.c
@@ -75,6 +75,7 @@ struct adhoc_opts
 	bool		no_psqlrc;
 	bool		single_txn;
 	bool		list_dbs;
+	bool        compression;
 	SimpleActionList actions;
 };
 
@@ -237,8 +238,10 @@ main(int argc, char *argv[])
 		values[5] = pset.progname;
 		keywords[6] = "client_encoding";
 		values[6] = (pset.notty || getenv("PGCLIENTENCODING")) ? NULL : "auto";
-		keywords[7] = NULL;
-		values[7] = NULL;
+		keywords[7] = "compression";
+		values[7] = options.compression ? "on" : "off";
+		keywords[8] = NULL;
+		values[8] = NULL;
 
 		new_pass = false;
 		pset.db = PQconnectdbParams(keywords, values, true);
@@ -436,6 +439,7 @@ parse_psql_options(int argc, char *argv[], struct adhoc_opts *options)
 		{"echo-all", no_argument, NULL, 'a'},
 		{"no-align", no_argument, NULL, 'A'},
 		{"command", required_argument, NULL, 'c'},
+		{"compression", no_argument, NULL, 'Z'},
 		{"dbname", required_argument, NULL, 'd'},
 		{"echo-queries", no_argument, NULL, 'e'},
 		{"echo-errors", no_argument, NULL, 'b'},
@@ -476,7 +480,7 @@ parse_psql_options(int argc, char *argv[], struct adhoc_opts *options)
 
 	memset(options, 0, sizeof *options);
 
-	while ((c = getopt_long(argc, argv, "aAbc:d:eEf:F:h:HlL:no:p:P:qR:sStT:U:v:VwWxXz?01",
+	while ((c = getopt_long(argc, argv, "aAbc:d:eEf:F:h:HlL:no:p:P:qR:sStT:U:v:VwWxXz?01Z",
 							long_options, &optindex)) != -1)
 	{
 		switch (c)
@@ -540,6 +544,9 @@ parse_psql_options(int argc, char *argv[], struct adhoc_opts *options)
 			case 'p':
 				options->port = pg_strdup(optarg);
 				break;
+			case 'Z':
+			    options->compression = true;
+				break;
 			case 'P':
 				{
 					char	   *value;
diff --git a/src/bin/scripts/pg_isready.c b/src/bin/scripts/pg_isready.c
index f7ad7b4..5d035cd 100644
--- a/src/bin/scripts/pg_isready.c
+++ b/src/bin/scripts/pg_isready.c
@@ -34,7 +34,7 @@ main(int argc, char **argv)
 	const char *pghostaddr_str = NULL;
 	const char *pgport_str = NULL;
 
-#define PARAMS_ARRAY_SIZE	7
+#define PARAMS_ARRAY_SIZE	8
 
 	const char *keywords[PARAMS_ARRAY_SIZE];
 	const char *values[PARAMS_ARRAY_SIZE];
diff --git a/src/common/Makefile b/src/common/Makefile
index 80e78d7..1548196 100644
--- a/src/common/Makefile
+++ b/src/common/Makefile
@@ -43,7 +43,7 @@ override CPPFLAGS += -DVAL_LIBS="\"$(LIBS)\""
 OBJS_COMMON = base64.o config_info.o controldata_utils.o exec.o ip.o \
 	keywords.o md5.o pg_lzcompress.o pgfnames.o psprintf.o relpath.o \
 	rmtree.o saslprep.o scram-common.o string.o unicode_norm.o \
-	username.o wait_error.o
+	username.o wait_error.o zpq_stream.o
 
 ifeq ($(with_openssl),yes)
 OBJS_COMMON += sha2_openssl.o
diff --git a/src/common/zpq_stream.c b/src/common/zpq_stream.c
new file mode 100644
index 0000000..97326fd
--- /dev/null
+++ b/src/common/zpq_stream.c
@@ -0,0 +1,338 @@
+#include "common/zpq_stream.h"
+#include "c.h"
+#include "pg_config.h"
+
+#if HAVE_LIBZSTD
+
+#include <malloc.h>
+#include <zstd.h>
+
+#define ZPQ_BUFFER_SIZE (8*1024)
+#define ZSTD_COMPRESSION_LEVEL 1
+
+struct ZpqStream
+{
+	ZSTD_CStream*  tx_stream;
+	ZSTD_DStream*  rx_stream;
+	ZSTD_outBuffer tx;
+	ZSTD_inBuffer  rx;
+	size_t         tx_not_flushed; /* Amount of datas in internal zstd buffer */
+	size_t         tx_buffered;    /* Data which is consumed by zpq_read but not yet sent */
+	zpq_tx_func    tx_func;
+	zpq_rx_func    rx_func;
+	void*          arg;
+	char const*    rx_error;    /* Decompress error message */
+	size_t         tx_total;
+	size_t         tx_total_raw;
+	size_t         rx_total;
+	size_t         rx_total_raw;
+	char           tx_buf[ZPQ_BUFFER_SIZE];
+	char           rx_buf[ZPQ_BUFFER_SIZE];
+};
+
+ZpqStream* zpq_create(zpq_tx_func tx_func, zpq_rx_func rx_func, void* arg)
+{
+	ZpqStream* zs = (ZpqStream*)malloc(sizeof(ZpqStream));
+	zs->tx_stream = ZSTD_createCStream();
+	ZSTD_initCStream(zs->tx_stream, ZSTD_COMPRESSION_LEVEL);
+	zs->rx_stream = ZSTD_createDStream();
+	ZSTD_initDStream(zs->rx_stream);
+	zs->tx.dst = zs->tx_buf;
+	zs->tx.pos = 0;
+	zs->tx.size = ZPQ_BUFFER_SIZE;
+	zs->rx.src = zs->rx_buf;
+	zs->rx.pos = 0;
+	zs->rx.size = 0;
+	zs->rx_func = rx_func;
+	zs->tx_func = tx_func;
+	zs->tx_buffered = 0;
+	zs->tx_not_flushed = 0;
+	zs->rx_error = NULL;
+	zs->arg = arg;
+	zs->tx_total = zs->tx_total_raw = 0;
+	zs->rx_total = zs->rx_total_raw = 0;
+	return zs;
+}
+
+ssize_t zpq_read(ZpqStream* zs, void* buf, size_t size, size_t* processed)
+{
+	ssize_t rc;
+	ZSTD_outBuffer out;
+	out.dst = buf;
+	out.pos = 0;
+	out.size = size;
+
+	while (1)
+	{
+		rc = ZSTD_decompressStream(zs->rx_stream, &out, &zs->rx);
+		if (ZSTD_isError(rc))
+		{
+			zs->rx_error = ZSTD_getErrorName(rc);
+			return ZPQ_DECOMPRESS_ERROR;
+		}
+		/* Return result if we fill requested amount of bytes or read operation was performed */
+		if (out.pos != 0)
+		{
+			zs->rx_total_raw += out.pos;
+			return out.pos;
+		}
+		if (zs->rx.pos == zs->rx.size)
+		{
+			zs->rx.pos = zs->rx.size = 0; /* Reset rx buffer */
+		}
+		rc = zs->rx_func(zs->arg, (char*)zs->rx.src + zs->rx.size, ZPQ_BUFFER_SIZE - zs->rx.size);
+		if (rc > 0) /* read fetches some data */
+		{
+			zs->rx.size += rc;
+			zs->rx_total += rc;
+		}
+		else /* read failed */
+		{
+			*processed = out.pos;
+			zs->rx_total_raw += out.pos;
+			return rc;
+		}
+	}
+}
+
+ssize_t zpq_write(ZpqStream* zs, void const* buf, size_t size, size_t* processed)
+{
+	ssize_t rc;
+	ZSTD_inBuffer in_buf;
+	in_buf.src = buf;
+	in_buf.pos = 0;
+	in_buf.size = size;
+
+	do
+	{
+		if (zs->tx.pos == 0) /* Compress buffer is empty */
+		{
+			zs->tx.dst = zs->tx_buf; /* Reset pointer to the beginning of buffer */
+
+			if (in_buf.pos < size) /* Has something to compress in input buffer */
+				ZSTD_compressStream(zs->tx_stream, &zs->tx, &in_buf);
+
+			if (in_buf.pos == size) /* All data is compressed: flushed internal zstd buffer */
+			{
+				zs->tx_not_flushed = ZSTD_flushStream(zs->tx_stream, &zs->tx);
+			}
+		}
+		rc = zs->tx_func(zs->arg, zs->tx.dst, zs->tx.pos);
+		if (rc > 0)
+		{
+			zs->tx.pos -= rc;
+			zs->tx.dst = (char*)zs->tx.dst + rc;
+			zs->tx_total += rc;
+		}
+		else
+		{
+			*processed = in_buf.pos;
+			zs->tx_buffered = zs->tx.pos;
+			zs->tx_total_raw += in_buf.pos;
+			return rc;
+		}
+	} while (zs->tx.pos == 0 && (in_buf.pos < size || zs->tx_not_flushed)); /* repeat sending data until first partial write */
+
+	zs->tx_total_raw += in_buf.pos;
+	zs->tx_buffered = zs->tx.pos;
+	return in_buf.pos;
+}
+
+void zpq_free(ZpqStream* zs)
+{
+	if (zs != NULL)
+	{
+		ZSTD_freeCStream(zs->tx_stream);
+		ZSTD_freeDStream(zs->rx_stream);
+		free(zs);
+	}
+}
+
+char const* zpq_error(ZpqStream* zs)
+{
+	return zs->rx_error;
+}
+
+size_t zpq_buffered(ZpqStream* zs)
+{
+	return zs != NULL ? zs->tx_buffered + zs->tx_not_flushed : 0;
+}
+
+#elif HAVE_LIBZ
+
+#include <malloc.h>
+#include <zlib.h>
+
+#define ZPQ_BUFFER_SIZE 8192
+#define ZLIB_COMPRESSION_LEVEL 1
+
+struct ZpqStream
+{
+	z_stream tx;
+	z_stream rx;
+
+	zpq_tx_func    tx_func;
+	zpq_rx_func    rx_func;
+	void*          arg;
+
+	size_t         tx_buffered;
+
+	Bytef          tx_buf[ZPQ_BUFFER_SIZE];
+	Bytef          rx_buf[ZPQ_BUFFER_SIZE];
+};
+
+ZpqStream* zpq_create(zpq_tx_func tx_func, zpq_rx_func rx_func, void* arg)
+{
+	ZpqStream* zs = (ZpqStream*)malloc(sizeof(ZpqStream));
+	memset(&zs->tx, 0, sizeof(zs->tx));
+	zs->tx.next_out = zs->tx_buf;
+	zs->tx.avail_out = ZPQ_BUFFER_SIZE;
+	zs->tx_buffered = 0;
+	deflateInit(&zs->tx, ZLIB_COMPRESSION_LEVEL);
+	Assert(zs->tx.next_out == zs->tx_buf && zs->tx.avail_out == ZPQ_BUFFER_SIZE);
+
+	memset(&zs->rx, 0, sizeof(zs->tx));
+	zs->rx.next_in = zs->rx_buf;
+	zs->rx.avail_in = ZPQ_BUFFER_SIZE;
+	inflateInit(&zs->rx);
+	Assert(zs->rx.next_in == zs->rx_buf && zs->rx.avail_in == ZPQ_BUFFER_SIZE);
+	zs->rx.avail_in = 0;
+
+	zs->rx_func = rx_func;
+	zs->tx_func = tx_func;
+	zs->arg = arg;
+
+	return zs;
+}
+
+ssize_t zpq_read(ZpqStream* zs, void* buf, size_t size, size_t* processed)
+{
+	int rc;
+	zs->rx.next_out = (Bytef *)buf;
+	zs->rx.avail_out = size;
+
+	while (1)
+	{
+		if (zs->rx.avail_in != 0) /* If there is some data in receiver buffer, then decompress it */
+		{
+			rc = inflate(&zs->rx, Z_SYNC_FLUSH);
+			if (rc != Z_OK)
+			{
+				return ZPQ_DECOMPRESS_ERROR;
+			}
+			if (zs->rx.avail_out != size)
+			{
+				return size - zs->rx.avail_out;
+			}
+			if (zs->rx.avail_in == 0)
+			{
+				zs->rx.next_in = zs->rx_buf;
+			}
+		}
+		else
+		{
+			zs->rx.next_in = zs->rx_buf;
+		}
+		rc = zs->rx_func(zs->arg, zs->rx.next_in + zs->rx.avail_in, zs->rx_buf + ZPQ_BUFFER_SIZE - zs->rx.next_in - zs->rx.avail_in);
+		if (rc > 0)
+		{
+			zs->rx.avail_in += rc;
+		}
+		else
+		{
+			*processed = size - zs->rx.avail_out;
+			return rc;
+		}
+	}
+}
+
+ssize_t zpq_write(ZpqStream* zs, void const* buf, size_t size, size_t* processed)
+{
+    int rc;
+	zs->tx.next_in = (Bytef *)buf;
+	zs->tx.avail_in = size;
+	do
+	{
+		if (zs->tx.avail_out == ZPQ_BUFFER_SIZE) /* Compress buffer is empty */
+		{
+			zs->tx.next_out = zs->tx_buf; /* Reset pointer to the  beginning of buffer */
+
+			if (zs->tx.avail_in != 0) /* Has something in input buffer */
+			{
+				rc = deflate(&zs->tx, Z_SYNC_FLUSH);
+				Assert(rc == Z_OK);
+				zs->tx.next_out = zs->tx_buf; /* Reset pointer to the  beginning of buffer */
+			}
+		}
+		rc = zs->tx_func(zs->arg, zs->tx.next_out, ZPQ_BUFFER_SIZE - zs->tx.avail_out);
+		if (rc > 0)
+		{
+			zs->tx.next_out += rc;
+			zs->tx.avail_out += rc;
+		}
+		else
+		{
+			*processed = size - zs->tx.avail_in;
+			zs->tx_buffered = ZPQ_BUFFER_SIZE - zs->tx.avail_out;
+			return rc;
+		}
+	} while (zs->tx.avail_out == ZPQ_BUFFER_SIZE && zs->tx.avail_in != 0); /* repeat sending data until first partial write */
+
+	zs->tx_buffered = ZPQ_BUFFER_SIZE - zs->tx.avail_out;
+
+	return size - zs->tx.avail_in;
+}
+
+void zpq_free(ZpqStream* zs)
+{
+	if (zs != NULL)
+	{
+		inflateEnd(&zs->rx);
+		deflateEnd(&zs->tx);
+		free(zs);
+	}
+}
+
+char const* zpq_error(ZpqStream* zs)
+{
+	return zs->rx.msg;
+}
+
+size_t zpq_buffered(ZpqStream* zs)
+{
+	return zs != NULL ? zs->tx_buffered : 0;
+}
+
+#else
+
+ZpqStream* zpq_create(zpq_tx_func tx_func, zpq_rx_func rx_func, void* arg)
+{
+	return NULL;
+}
+
+ssize_t zpq_read(ZpqStream* zs, void* buf, size_t size)
+{
+	return -1;
+}
+
+ssize_t zpq_write(ZpqStream* zs, void const* buf, size_t size)
+{
+	return -1;
+}
+
+void zpq_free(ZpqStream* zs)
+{
+}
+
+char const* zpq_error(ZpqStream* zs)
+{
+	return NULL;
+}
+
+
+size_t zpq_buffered(ZpqStream* zs)
+{
+	return 0;
+}
+
+#endif
diff --git a/src/include/common/zpq_stream.h b/src/include/common/zpq_stream.h
new file mode 100644
index 0000000..dc765af
--- /dev/null
+++ b/src/include/common/zpq_stream.h
@@ -0,0 +1,28 @@
+/*
+ * zpq_stream.h
+ *     Streaiming compression for libpq
+ */
+
+#ifndef ZPQ_STREAM_H
+#define ZPQ_STREAM_H
+
+#include <stdlib.h>
+
+#define ZPQ_IO_ERROR (-1)
+#define ZPQ_DECOMPRESS_ERROR (-2)
+
+struct ZpqStream;
+typedef struct ZpqStream ZpqStream;
+
+typedef ssize_t(*zpq_tx_func)(void* arg, void const* data, size_t size);
+typedef ssize_t(*zpq_rx_func)(void* arg, void* data, size_t size);
+
+
+ZpqStream* zpq_create(zpq_tx_func tx_func, zpq_rx_func rx_func, void* arg);
+ssize_t zpq_read(ZpqStream* zs, void* buf, size_t size, size_t* processed);
+ssize_t zpq_write(ZpqStream* zs, void const* buf, size_t size, size_t* processed);
+char const* zpq_error(ZpqStream* zs);
+size_t zpq_buffered(ZpqStream* zs);
+void zpq_free(ZpqStream* zs);
+
+#endif
diff --git a/src/include/libpq/libpq-be.h b/src/include/libpq/libpq-be.h
index 7698cd1..5203f2d 100644
--- a/src/include/libpq/libpq-be.h
+++ b/src/include/libpq/libpq-be.h
@@ -182,6 +182,8 @@ typedef struct Port
 	char	   *peer_cn;
 	bool		peer_cert_valid;
 
+	bool        use_compression;
+	
 	/*
 	 * OpenSSL structures. (Keep these last so that the locations of other
 	 * fields are the same whether or not you build with OpenSSL.)
diff --git a/src/include/libpq/libpq.h b/src/include/libpq/libpq.h
index 997947b..4ae548a 100644
--- a/src/include/libpq/libpq.h
+++ b/src/include/libpq/libpq.h
@@ -61,6 +61,7 @@ extern void StreamClose(pgsocket sock);
 extern void TouchSocketFiles(void);
 extern void RemoveSocketFiles(void);
 extern void pq_init(void);
+extern int  pq_configure(Port* port);
 extern int	pq_getbytes(char *s, size_t len);
 extern int	pq_getstring(StringInfo s);
 extern void pq_startmsgread(void);
diff --git a/src/include/pg_config.h.in b/src/include/pg_config.h.in
index 625cd21..305a53b 100644
--- a/src/include/pg_config.h.in
+++ b/src/include/pg_config.h.in
@@ -365,6 +365,9 @@
 /* Define to 1 if you have the `z' library (-lz). */
 #undef HAVE_LIBZ
 
+/* Define to 1 if you have the `zstd' library (-lzstd). */
+#undef HAVE_LIBZSTD
+
 /* Define to 1 if the system has the type `locale_t'. */
 #undef HAVE_LOCALE_T
 
diff --git a/src/interfaces/libpq/Makefile b/src/interfaces/libpq/Makefile
index abe0a50..e14c1f7 100644
--- a/src/interfaces/libpq/Makefile
+++ b/src/interfaces/libpq/Makefile
@@ -29,6 +29,10 @@ endif
 # platforms require special flags.
 LIBS := $(LIBS:-lpgport=)
 
+ifeq ($(with_zstd),yes)
+LIBS += -lzstd
+endif
+
 # We can't use Makefile variables here because the MSVC build system scrapes
 # OBJS from this file.
 OBJS=	fe-auth.o fe-auth-scram.o fe-connect.o fe-exec.o fe-misc.o fe-print.o fe-lobj.o \
diff --git a/src/interfaces/libpq/fe-connect.c b/src/interfaces/libpq/fe-connect.c
index 39c1999..67462eb 100644
--- a/src/interfaces/libpq/fe-connect.c
+++ b/src/interfaces/libpq/fe-connect.c
@@ -72,6 +72,7 @@ static int ldapServiceLookup(const char *purl, PQconninfoOption *options,
 
 #include "common/ip.h"
 #include "common/scram-common.h"
+#include "common/zpq_stream.h"
 #include "mb/pg_wchar.h"
 #include "port/pg_bswap.h"
 
@@ -269,6 +270,10 @@ static const internalPQconninfoOption PQconninfoOptions[] = {
 		21,	/* sizeof("tls-server-end-point") == 21 */
 	offsetof(struct pg_conn, scram_channel_binding)},
 
+	{"compression", "COMPRESSION", "0", NULL,
+		"ZSTD-Compression", "", 1,
+	offsetof(struct pg_conn, compression)},
+
 	/*
 	 * ssl options are allowed even without client SSL support because the
 	 * client can still handle SSL modes "disable" and "allow". Other
@@ -325,6 +330,10 @@ static const internalPQconninfoOption PQconninfoOptions[] = {
 		"Replication", "D", 5,
 	offsetof(struct pg_conn, replication)},
 
+	{"compression", NULL, NULL, NULL,
+		"Compression", "Z", 5,
+	offsetof(struct pg_conn, compression)},
+
 	{"target_session_attrs", "PGTARGETSESSIONATTRS",
 		DefaultTargetSessionAttrs, NULL,
 		"Target-Session-Attrs", "", 11, /* sizeof("read-write") = 11 */
@@ -430,6 +439,10 @@ pgthreadlock_t pg_g_threadlock = default_threadlock;
 void
 pqDropConnection(PGconn *conn, bool flushInput)
 {
+	/* Release compression streams */
+	zpq_free(conn->zstream);
+	conn->zstream = NULL;
+
 	/* Drop any SSL state */
 	pqsecure_close(conn);
 
@@ -2648,11 +2661,23 @@ keep_going:						/* We will come back to here until there is
 				 */
 				conn->inCursor = conn->inStart;
 
-				/* Read type byte */
-				if (pqGetc(&beresp, conn))
+				while (1)
 				{
-					/* We'll come back when there is more data */
-					return PGRES_POLLING_READING;
+					/* Read type byte */
+					if (pqGetc(&beresp, conn))
+					{
+						/* We'll come back when there is more data */
+						return PGRES_POLLING_READING;
+					}
+
+					if (beresp == 'z') /* Switch on compression */
+					{
+						/* mark byte consumed */
+						conn->inStart = conn->inCursor;
+						Assert(!conn->zstream);
+						conn->zstream = zpq_create((zpq_tx_func)pqsecure_write, (zpq_rx_func)pqsecure_read, conn);
+					} else
+						break;
 				}
 
 				/*
@@ -3462,6 +3487,8 @@ freePGconn(PGconn *conn)
 		free(conn->dbName);
 	if (conn->replication)
 		free(conn->replication);
+	if (conn->compression)
+		free(conn->compression);
 	if (conn->pguser)
 		free(conn->pguser);
 	if (conn->pgpass)
diff --git a/src/interfaces/libpq/fe-misc.c b/src/interfaces/libpq/fe-misc.c
index 2a6637f..488f8d2 100644
--- a/src/interfaces/libpq/fe-misc.c
+++ b/src/interfaces/libpq/fe-misc.c
@@ -53,11 +53,12 @@
 #include "port/pg_bswap.h"
 #include "pg_config_paths.h"
 
+#include  <common/zpq_stream.h>
 
 static int	pqPutMsgBytes(const void *buf, size_t len, PGconn *conn);
 static int	pqSendSome(PGconn *conn, int len);
-static int pqSocketCheck(PGconn *conn, int forRead, int forWrite,
-			  time_t end_time);
+static int  pqSocketCheck(PGconn *conn, int forRead, int forWrite,
+						  time_t end_time);
 static int	pqSocketPoll(int sock, int forRead, int forWrite, time_t end_time);
 
 /*
@@ -630,6 +631,7 @@ pqReadData(PGconn *conn)
 {
 	int			someread = 0;
 	int			nread;
+	size_t      processed;
 
 	if (conn->sock == PGINVALID_SOCKET)
 	{
@@ -678,10 +680,23 @@ pqReadData(PGconn *conn)
 
 	/* OK, try to read some data */
 retry3:
-	nread = pqsecure_read(conn, conn->inBuffer + conn->inEnd,
-						  conn->inBufSize - conn->inEnd);
+	processed = 0;
+	nread = conn->zstream
+		? zpq_read(conn->zstream, conn->inBuffer + conn->inEnd,
+				   conn->inBufSize - conn->inEnd, &processed)
+		: pqsecure_read(conn, conn->inBuffer + conn->inEnd,
+						conn->inBufSize - conn->inEnd);
+	conn->inEnd += processed;
 	if (nread < 0)
 	{
+		if (nread == ZPQ_DECOMPRESS_ERROR)
+		{
+			printfPQExpBuffer(&conn->errorMessage,
+							  libpq_gettext("decompress error: %s\n"),
+							  zpq_error(conn->zstream));
+			return -1;
+		}
+
 		if (SOCK_ERRNO == EINTR)
 			goto retry3;
 		/* Some systems return EAGAIN/EWOULDBLOCK for no data */
@@ -768,10 +783,24 @@ retry3:
 	 * arrived.
 	 */
 retry4:
-	nread = pqsecure_read(conn, conn->inBuffer + conn->inEnd,
-						  conn->inBufSize - conn->inEnd);
+	processed = 0;
+	nread = conn->zstream
+		? zpq_read(conn->zstream, conn->inBuffer + conn->inEnd,
+				   conn->inBufSize - conn->inEnd, &processed)
+		: pqsecure_read(conn, conn->inBuffer + conn->inEnd,
+						conn->inBufSize - conn->inEnd);
+	conn->inEnd += processed;
+
 	if (nread < 0)
 	{
+		if (nread == ZPQ_DECOMPRESS_ERROR)
+		{
+			printfPQExpBuffer(&conn->errorMessage,
+							  libpq_gettext("decompress error: %s\n"),
+							  zpq_error(conn->zstream));
+			return -1;
+		}
+
 		if (SOCK_ERRNO == EINTR)
 			goto retry4;
 		/* Some systems return EAGAIN/EWOULDBLOCK for no data */
@@ -842,12 +871,14 @@ pqSendSome(PGconn *conn, int len)
 	}
 
 	/* while there's still data to send */
-	while (len > 0)
+	while (len > 0 || zpq_buffered(conn->zstream))
 	{
 		int			sent;
-
+		size_t      processed = 0;
+		sent = conn->zstream
+			? zpq_write(conn->zstream, ptr, len, &processed)
 #ifndef WIN32
-		sent = pqsecure_write(conn, ptr, len);
+			: pqsecure_write(conn, ptr, len);
 #else
 
 		/*
@@ -855,8 +886,11 @@ pqSendSome(PGconn *conn, int len)
 		 * failure-point appears to be different in different versions of
 		 * Windows, but 64k should always be safe.
 		 */
-		sent = pqsecure_write(conn, ptr, Min(len, 65536));
+			: pqsecure_write(conn, ptr, Min(len, 65536));
 #endif
+		ptr += processed;
+		len -= processed;
+		remaining -= processed;
 
 		if (sent < 0)
 		{
@@ -896,7 +930,7 @@ pqSendSome(PGconn *conn, int len)
 			remaining -= sent;
 		}
 
-		if (len > 0)
+		if (len > 0 || sent < 0 || zpq_buffered(conn->zstream))
 		{
 			/*
 			 * We didn't send it all, wait till we can send more.
diff --git a/src/interfaces/libpq/fe-protocol3.c b/src/interfaces/libpq/fe-protocol3.c
index d3ca5d2..fe82457 100644
--- a/src/interfaces/libpq/fe-protocol3.c
+++ b/src/interfaces/libpq/fe-protocol3.c
@@ -2171,6 +2171,8 @@ build_startup_packet(const PGconn *conn, char *packet,
 		ADD_STARTUP_OPTION("database", conn->dbName);
 	if (conn->replication && conn->replication[0])
 		ADD_STARTUP_OPTION("replication", conn->replication);
+	if (conn->compression && conn->compression[0])
+		ADD_STARTUP_OPTION("compression", conn->compression);
 	if (conn->pgoptions && conn->pgoptions[0])
 		ADD_STARTUP_OPTION("options", conn->pgoptions);
 	if (conn->send_appname)
diff --git a/src/interfaces/libpq/libpq-int.h b/src/interfaces/libpq/libpq-int.h
index eba23dc..c9bf84c 100644
--- a/src/interfaces/libpq/libpq-int.h
+++ b/src/interfaces/libpq/libpq-int.h
@@ -40,6 +40,7 @@
 /* include stuff common to fe and be */
 #include "getaddrinfo.h"
 #include "libpq/pqcomm.h"
+#include "common/zpq_stream.h"
 /* include stuff found in fe only */
 #include "pqexpbuffer.h"
 
@@ -357,6 +358,7 @@ struct pg_conn
 	char	   *sslrootcert;	/* root certificate filename */
 	char	   *sslcrl;			/* certificate revocation list filename */
 	char	   *requirepeer;	/* required peer credentials for local sockets */
+	char	   *compression;    /* stream compression (0 or 1) */
 
 #if defined(ENABLE_GSS) || defined(ENABLE_SSPI)
 	char	   *krbsrvname;		/* Kerberos service name */
@@ -495,6 +497,9 @@ struct pg_conn
 
 	/* Buffer for receiving various parts of messages */
 	PQExpBufferData workBuffer; /* expansible string */
+
+	/* Compression stream */
+	ZpqStream* zstream;
 };
 
 /* PGcancel stores all data necessary to cancel a connection. A copy of this

Reply via email to