On 01.12.2010 16:03, Heikki Linnakangas wrote:
On 29.11.2010 22:21, Heikki Linnakangas wrote:
I combined those, and the Free/Flush steps, and did a bunch of other
editorializations and cleanups. Here's an updated patch, also available
in my git repository at
git://git.postgresql.org/git/users/heikki/postgres.git, branch
"pg_dump-dir". I'm going to continue reviewing this later, tomorrow
hopefully.

Here's another update.

Forgot attachment. This is also available in the above git repo.

--
  Heikki Linnakangas
  EnterpriseDB   http://www.enterprisedb.com
*** a/src/bin/pg_dump/Makefile
--- b/src/bin/pg_dump/Makefile
***************
*** 20,26 **** override CPPFLAGS := -I$(libpq_srcdir) $(CPPFLAGS)
  
  OBJS=	pg_backup_archiver.o pg_backup_db.o pg_backup_custom.o \
  	pg_backup_files.o pg_backup_null.o pg_backup_tar.o \
! 	dumputils.o $(WIN32RES)
  
  KEYWRDOBJS = keywords.o kwlookup.o
  
--- 20,26 ----
  
  OBJS=	pg_backup_archiver.o pg_backup_db.o pg_backup_custom.o \
  	pg_backup_files.o pg_backup_null.o pg_backup_tar.o \
! 	dumputils.o compress_io.o $(WIN32RES)
  
  KEYWRDOBJS = keywords.o kwlookup.o
  
*** /dev/null
--- b/src/bin/pg_dump/compress_io.c
***************
*** 0 ****
--- 1,404 ----
+ /*-------------------------------------------------------------------------
+  *
+  * compress_io.c
+  *   Routines for archivers to write an uncompressed or compressed data
+  *   stream.
+  *
+  * Portions Copyright (c) 1996-2010, PostgreSQL Global Development Group
+  * Portions Copyright (c) 1994, Regents of the University of California
+  *
+  *  The interface for writing to an archive consists of three functions:
+  *  AllocateCompressor, WriteDataToArchive and EndCompressor. First you call
+  *  AllocateCompressor, then write all the data by calling WriteDataToArchive
+  *  as many times as needed, and finally EndCompressor. WriteDataToArchive
+  *  and EndCompressor will call the WriteFunc that was provided to
+  *  AllocateCompressor for each chunk of compressed data.
+  *
+  *  The interface for reading an archive consists of just one function:
+  *  ReadDataFromArchive. ReadDataFromArchive reads the whole compressed input
+  *  stream, by repeatedly calling the given ReadFunc. ReadFunc returns the
+  *  compressed data chunk at a time, and ReadDataFromArchive decompresses it
+  *  and passes the decompressed data to ahwrite(), until ReadFunc returns 0
+  *  to signal EOF.
+  *
+  *  The interface is the same for compressed and uncompressed streams.
+  *
+  *
+  * IDENTIFICATION
+  *     src/bin/pg_dump/compress_io.c
+  *
+  *-------------------------------------------------------------------------
+  */
+ 
+ #include "compress_io.h"
+ 
+ static const char *modulename = gettext_noop("compress_io");
+ 
+ static void ParseCompressionOption(int compression, CompressorAlgorithm *alg,
+ 								   int *level);
+ 
+ /* Routines that are private to a specific compressor (static functions) */
+ #ifdef HAVE_LIBZ
+ /* Routines that support zlib compressed data I/O */
+ static void InitCompressorZlib(CompressorState *cs, int level);
+ static void DeflateCompressorZlib(ArchiveHandle *AH, CompressorState *cs,
+ 								  bool flush);
+ static void ReadDataFromArchiveZlib(ArchiveHandle *AH, ReadFunc readF);
+ static size_t WriteDataToArchiveZlib(ArchiveHandle *AH, CompressorState *cs,
+ 									 const char *data, size_t dLen);
+ static void EndCompressorZlib(ArchiveHandle *AH, CompressorState *cs);
+ 
+ #endif
+ 
+ /* Routines that support uncompressed data I/O */
+ static void ReadDataFromArchiveNone(ArchiveHandle *AH, ReadFunc readF);
+ static size_t WriteDataToArchiveNone(ArchiveHandle *AH, CompressorState *cs,
+ 									 const char *data, size_t dLen);
+ 
+ static void
+ ParseCompressionOption(int compression, CompressorAlgorithm *alg, int *level)
+ {
+ 	/*
+ 	 * The compression is set either on the commandline when creating
+ 	 * an archive or by ReadHead() when restoring an archive. It can also be
+ 	 * set on a per-data item basis in the directory archive format.
+ 	 */
+ 	if (compression == Z_DEFAULT_COMPRESSION ||
+ 		(compression > 0 && compression <= 9))
+ 		*alg = COMPR_ALG_LIBZ;
+ 	else if (compression == 0)
+ 		*alg = COMPR_ALG_NONE;
+ 	else
+ 		die_horribly(NULL, modulename, "Invalid compression code: %d\n",
+ 					 compression);
+ 
+ 	if (level)
+ 		*level = compression;
+ }
+ 
+ /* Public interface routines */
+ 
+ /* Allocate a new compressor */
+ CompressorState *
+ AllocateCompressor(int compression, WriteFunc writeF)
+ {
+ 	CompressorState *cs;
+ 	CompressorAlgorithm alg;
+ 	int level;
+ 
+ 	ParseCompressionOption(compression, &alg, &level);
+ 
+ 	cs = (CompressorState *) calloc(1, sizeof(CompressorState));
+ 	if (cs == NULL)
+ 		die_horribly(NULL, modulename, "out of memory\n");
+ 	cs->writeF = writeF;
+ 	cs->comprAlg = alg;
+ 
+ #ifndef HAVE_LIBZ
+ 	if (alg == COMPR_ALG_LIBZ)
+ 		die_horribly(NULL, modulename, "not built with zlib support\n");
+ #endif
+ 
+ 	/*
+ 	 * Perform compression algorithm specific initialization.
+ 	 */
+ #ifdef HAVE_LIBZ
+ 	if (alg == COMPR_ALG_LIBZ)
+ 		InitCompressorZlib(cs, level);
+ #endif
+ 
+ 	return cs;
+ }
+ 
+ /*
+  * Read all compressed data from the input stream (via readF) and print it
+  * out with ahwrite().
+  */
+ void
+ ReadDataFromArchive(ArchiveHandle *AH, int compression, ReadFunc readF)
+ {
+ 	CompressorAlgorithm alg;
+ 
+ 	ParseCompressionOption(compression, &alg, NULL);
+ 
+ 	if (alg == COMPR_ALG_NONE)
+ 		ReadDataFromArchiveNone(AH, readF);
+ 	if (alg == COMPR_ALG_LIBZ)
+ 	{
+ #ifdef HAVE_LIBZ
+ 		ReadDataFromArchiveZlib(AH, readF);
+ #else
+ 		die_horribly(NULL, modulename, "not built with zlib support\n");
+ #endif
+ 	}
+ }
+ 
+ /*
+  * Compress and write data to the output stream (via writeF).
+  */
+ size_t
+ WriteDataToArchive(ArchiveHandle *AH, CompressorState *cs,
+ 				   const void *data, size_t dLen)
+ {
+ 	switch(cs->comprAlg)
+ 	{
+ 		case COMPR_ALG_LIBZ:
+ #ifdef HAVE_LIBZ
+ 			return WriteDataToArchiveZlib(AH, cs, data, dLen);
+ #else
+ 			die_horribly(NULL, modulename, "not built with zlib support\n");	
+ #endif
+ 		case COMPR_ALG_NONE:
+ 			return WriteDataToArchiveNone(AH, cs, data, dLen);
+ 	}
+ 	return 0; /* keep compiler quiet */
+ }
+ 
+ /*
+  * Terminate compression library context and flush its buffers.
+  */
+ void
+ EndCompressorState(ArchiveHandle *AH, CompressorState *cs)
+ {
+ #ifdef HAVE_LIBZ
+ 	if (cs->comprAlg == COMPR_ALG_LIBZ)
+ 		EndCompressorZlib(AH, cs);
+ #endif
+ 	free(cs);
+ }
+ 
+ /* Private routines, specific to each compression method. */
+ 
+ #ifdef HAVE_LIBZ
+ /*
+  * Functions for zlib compressed output.
+  */
+ 
+ static void
+ InitCompressorZlib(CompressorState *cs, int level)
+ {
+ 	z_streamp			zp;
+ 
+ 	zp = cs->zp = (z_streamp) malloc(sizeof(z_stream));
+ 	if (cs->zp == NULL)
+ 		die_horribly(NULL, modulename, "out of memory\n");
+ 	zp->zalloc = Z_NULL;
+ 	zp->zfree = Z_NULL;
+ 	zp->opaque = Z_NULL;
+ 
+ 	/*
+ 	 * zlibOutSize is the buffer size we tell zlib it can output
+ 	 * to.  We actually allocate one extra byte because some routines
+ 	 * want to append a trailing zero byte to the zlib output.
+ 	 */
+ 	cs->zlibOut = (char *) malloc(ZLIB_OUT_SIZE + 1);
+ 	cs->zlibOutSize = ZLIB_OUT_SIZE;
+ 
+ 	if (cs->zlibOut == NULL)
+ 		die_horribly(NULL, modulename, "out of memory\n");
+ 
+ 	if (deflateInit(zp, level) != Z_OK)
+ 		die_horribly(NULL, modulename,
+ 					 "could not initialize compression library: %s\n",
+ 					 zp->msg);
+ 
+ 	/* Just be paranoid - maybe End is called after Start, with no Write */
+ 	zp->next_out = (void *) cs->zlibOut;
+ 	zp->avail_out = cs->zlibOutSize;
+ }
+ 
+ static void
+ EndCompressorZlib(ArchiveHandle *AH, CompressorState *cs)
+ {
+ 	z_streamp			zp = cs->zp;
+ 
+ 	zp->next_in = NULL;
+ 	zp->avail_in = 0;
+ 
+ 	/* Flush any remaining data from zlib buffer */
+ 	DeflateCompressorZlib(AH, cs, true);
+ 
+ 	if (deflateEnd(zp) != Z_OK)
+ 		die_horribly(AH, modulename,
+ 					 "could not close compression stream: %s\n", zp->msg);
+ 
+ 	free(cs->zlibOut);
+ 	free(cs->zp);
+ }
+ 
+ static void
+ DeflateCompressorZlib(ArchiveHandle *AH, CompressorState *cs, bool flush)
+ {
+ 	z_streamp	zp = cs->zp;
+ 	char	   *out = cs->zlibOut;
+ 	int			res = Z_OK;
+ 
+ 	while (cs->zp->avail_in != 0 || flush)
+ 	{
+ 		res = deflate(zp, flush ? Z_FINISH : Z_NO_FLUSH);
+ 		if (res == Z_STREAM_ERROR)
+ 			die_horribly(AH, modulename,
+ 						 "could not compress data: %s\n", zp->msg);
+ 		if ((flush && (zp->avail_out < cs->zlibOutSize))
+ 			|| (zp->avail_out == 0)
+ 			|| (zp->avail_in != 0)
+ 			)
+ 		{
+ 			/*
+ 			 * Extra paranoia: avoid zero-length chunks, since a zero length
+ 			 * chunk is the EOF marker in the custom format. This should never
+ 			 * happen but...
+ 			 */
+ 			if (zp->avail_out < cs->zlibOutSize)
+ 			{
+ 				/*
+ 				 * Any write function shoud do its own error checking but
+ 				 * to make sure we do a check here as well...
+ 				 */
+ 				size_t len = cs->zlibOutSize - zp->avail_out;
+ 				if (cs->writeF(AH, out, len) != len)
+ 					die_horribly(AH, modulename,
+ 								 "could not write to output file: %s\n",
+ 								 strerror(errno));
+ 			}
+ 			zp->next_out = (void *) out;
+ 			zp->avail_out = cs->zlibOutSize;
+ 		}
+ 
+ 		if (res == Z_STREAM_END)
+ 			break;
+ 	}
+ }
+ 
+ static size_t
+ WriteDataToArchiveZlib(ArchiveHandle *AH, CompressorState *cs,
+ 					   const char *data, size_t dLen)
+ {
+ 	cs->zp->next_in = (void *) data;
+ 	cs->zp->avail_in = dLen;
+ 	DeflateCompressorZlib(AH, cs, false);
+ 	/* we have either succeeded in writing dLen bytes or we have called
+ 	 * die_horribly() */
+ 	return dLen;
+ }
+ 
+ static void
+ ReadDataFromArchiveZlib(ArchiveHandle *AH, ReadFunc readF)
+ {
+ 	z_streamp	zp;
+ 	char	   *out;
+ 	int			res = Z_OK;
+ 	size_t		cnt;
+ 	char	   *buf;
+ 	size_t		buflen;
+ 
+ 	zp = (z_streamp) malloc(sizeof(z_stream));
+ 	if (zp == NULL)
+ 		die_horribly(NULL, modulename, "out of memory\n");
+ 	zp->zalloc = Z_NULL;
+ 	zp->zfree = Z_NULL;
+ 	zp->opaque = Z_NULL;
+ 
+ 	buf = malloc(ZLIB_IN_SIZE);
+ 	if (buf == NULL)
+ 		die_horribly(NULL, modulename, "out of memory\n");
+ 	buflen = ZLIB_IN_SIZE;
+ 
+ 	out = malloc(ZLIB_OUT_SIZE + 1);
+ 	if (out == NULL)
+ 		die_horribly(NULL, modulename, "out of memory\n");
+ 
+ 	if (inflateInit(zp) != Z_OK)
+ 		die_horribly(NULL, modulename,
+ 					 "could not initialize compression library: %s\n",
+ 					 zp->msg);
+ 
+ 	/* no minimal chunk size for zlib */
+ 	while ((cnt = readF(AH, &buf, &buflen)))
+ 	{
+ 		zp->next_in = (void *) buf;
+ 		zp->avail_in = cnt;
+ 
+ 		while (zp->avail_in > 0)
+ 		{
+ 			zp->next_out = (void *) out;
+ 			zp->avail_out = ZLIB_OUT_SIZE;
+ 
+ 			res = inflate(zp, 0);
+ 			if (res != Z_OK && res != Z_STREAM_END)
+ 				die_horribly(AH, modulename,
+ 							 "could not uncompress data: %s\n", zp->msg);
+ 
+ 			out[ZLIB_OUT_SIZE - zp->avail_out] = '\0';
+ 			ahwrite(out, 1, ZLIB_OUT_SIZE - zp->avail_out, AH);
+ 		}
+ 	}
+ 
+ 	zp->next_in = NULL;
+ 	zp->avail_in = 0;
+ 	while (res != Z_STREAM_END)
+ 	{
+ 		zp->next_out = (void *) out;
+ 		zp->avail_out = ZLIB_OUT_SIZE;
+ 		res = inflate(zp, 0);
+ 		if (res != Z_OK && res != Z_STREAM_END)
+ 			die_horribly(AH, modulename,
+ 						 "could not uncompress data: %s\n", zp->msg);
+ 
+ 		out[ZLIB_OUT_SIZE - zp->avail_out] = '\0';
+ 		ahwrite(out, 1, ZLIB_OUT_SIZE - zp->avail_out, AH);
+ 	}
+ 
+ 	if (inflateEnd(zp) != Z_OK)
+ 		die_horribly(AH, modulename,
+ 					 "could not close compression library: %s\n", zp->msg);
+ 
+ 	free(buf);
+ 	free(out);
+ 	free(zp);
+ }
+ 
+ #endif  /* HAVE_LIBZ */
+ 
+ 
+ /*
+  * Functions for uncompressed output.
+  */
+ 
+ static void
+ ReadDataFromArchiveNone(ArchiveHandle *AH, ReadFunc readF)
+ {
+ 	size_t		cnt;
+ 	char	   *buf;
+ 	size_t		buflen;
+ 
+ 	buf = malloc(ZLIB_OUT_SIZE);
+ 	if (buf == NULL)
+ 		die_horribly(NULL, modulename, "out of memory\n");
+ 	buflen = ZLIB_OUT_SIZE;
+ 
+ 	/* no minimal chunk size for uncompressed data */
+ 	while ((cnt = readF(AH, &buf, &buflen)))
+ 	{
+ 		ahwrite(buf, 1, cnt, AH);
+ 	}
+ 
+ 	free(buf);
+ }
+ 
+ static size_t
+ WriteDataToArchiveNone(ArchiveHandle *AH, CompressorState *cs,
+ 					   const char *data, size_t dLen)
+ {
+ 	/*
+ 	 * Any write function should do its own error checking but to make
+ 	 * sure we do a check here as well...
+ 	 */
+ 	if (cs->writeF(AH, data, dLen) != dLen)
+ 		die_horribly(AH, modulename,
+ 					 "could not write to output file: %s\n",
+ 					 strerror(errno));
+ 	return dLen;
+ }
+ 
+ 
*** /dev/null
--- b/src/bin/pg_dump/compress_io.h
***************
*** 0 ****
--- 1,73 ----
+ /*-------------------------------------------------------------------------
+  *
+  * compress_io.h
+  *   Routines for archivers to write an uncompressed or compressed data
+  *   stream.
+  *
+  * Portions Copyright (c) 1996-2010, PostgreSQL Global Development Group
+  * Portions Copyright (c) 1994, Regents of the University of California
+  *
+  * IDENTIFICATION
+  *     src/bin/pg_dump/compress_io.h
+  *
+  *-------------------------------------------------------------------------
+  */
+ 
+ #ifndef __COMPRESS_IO__
+ #define __COMPRESS_IO__
+ 
+ #include "postgres_fe.h"
+ #include "pg_backup_archiver.h"
+ 
+ /*------------
+  * Buffers used in zlib compression.
+  *------------
+  */
+ #define ZLIB_OUT_SIZE	4096
+ #define ZLIB_IN_SIZE	4096
+ 
+ struct _CompressorState;
+ 
+ typedef enum
+ {
+ 	COMPR_ALG_NONE,
+ 	COMPR_ALG_LIBZ
+ } CompressorAlgorithm;
+ 
+ /* Prototype for callback function to WriteDataToArchive() */
+ typedef size_t (*WriteFunc)(ArchiveHandle *AH, const char *buf, size_t len);
+ 
+ /*
+  * Prototype for callback function to ReadDataFromArchive()
+  *
+  * ReadDataFromArchive will call the read function repeatedly, until it
+  * returns 0 to signal EOF. ReadDataFromArchive passes a buffer to read the
+  * data into in *buf, of length *buflen. If that's not big enough for the
+  * callback function, it can free() it and malloc() a new one, returning the
+  * new buffer and its size in *buf and *buflen.
+  *
+  * Returns the number of bytes read into *buf, or 0 on EOF.
+  */
+ typedef size_t (*ReadFunc)(ArchiveHandle *AH, char **buf, size_t *buflen);
+ 
+ typedef struct _CompressorState
+ {
+ 	CompressorAlgorithm comprAlg;
+ 	WriteFunc			writeF;
+ 
+ #ifdef HAVE_LIBZ
+ 	z_streamp			zp;
+ 	char			   *zlibOut;
+ 	size_t				zlibOutSize;
+ #endif
+ } CompressorState;
+ 
+ extern CompressorState *AllocateCompressor(int compression, WriteFunc writeF);
+ extern void ReadDataFromArchive(ArchiveHandle *AH, int compression,
+ 								ReadFunc readF);
+ extern size_t WriteDataToArchive(ArchiveHandle *AH, CompressorState *cs,
+ 								 const void *data, size_t dLen);
+ extern void EndCompressorState(ArchiveHandle *AH, CompressorState *cs);
+ extern void FreeCompressorState(CompressorState *cs);
+ 
+ #endif
*** a/src/bin/pg_dump/pg_backup_archiver.c
--- b/src/bin/pg_dump/pg_backup_archiver.c
***************
*** 22,27 ****
--- 22,28 ----
  
  #include "pg_backup_db.h"
  #include "dumputils.h"
+ #include "compress_io.h"
  
  #include <ctype.h>
  #include <unistd.h>
*** a/src/bin/pg_dump/pg_backup_archiver.h
--- b/src/bin/pg_dump/pg_backup_archiver.h
***************
*** 49,54 ****
--- 49,55 ----
  #define GZCLOSE(fh) fclose(fh)
  #define GZWRITE(p, s, n, fh) (fwrite(p, s, n, fh) * (s))
  #define GZREAD(p, s, n, fh) fread(p, s, n, fh)
+ /* this is just the redefinition of a libz constant */
  #define Z_DEFAULT_COMPRESSION (-1)
  
  typedef struct _z_stream
***************
*** 61,66 **** typedef struct _z_stream
--- 62,70 ----
  typedef z_stream *z_streamp;
  #endif
  
+ /* XXX should we change the archive version for pg_dump with directory support?
+  * XXX We are not actually modifying the existing formats, but on the other hand
+  * XXX a file could now be compressed with liblzf. */
  /* Current archive version number (the format we can output) */
  #define K_VERS_MAJOR 1
  #define K_VERS_MINOR 12
***************
*** 266,272 **** typedef struct _archiveHandle
  	DumpId		maxDumpId;		/* largest DumpId among all TOC entries */
  
  	struct _tocEntry *currToc;	/* Used when dumping data */
! 	int			compression;	/* Compression requested on open */
  	ArchiveMode mode;			/* File mode - r or w */
  	void	   *formatData;		/* Header data specific to file format */
  
--- 270,280 ----
  	DumpId		maxDumpId;		/* largest DumpId among all TOC entries */
  
  	struct _tocEntry *currToc;	/* Used when dumping data */
! 	int			compression;	/* Compression requested on open
! 								 * Possible values for compression:
! 								 *  -1   Z_DEFAULT_COMPRESSION
! 								 *   0	COMPRESSION_NONE
! 								 *  1-9	levels for gzip compression */
  	ArchiveMode mode;			/* File mode - r or w */
  	void	   *formatData;		/* Header data specific to file format */
  
***************
*** 381,384 **** int			ahprintf(ArchiveHandle *AH, const char *fmt,...) __attribute__((format(pri
--- 389,403 ----
  
  void		ahlog(ArchiveHandle *AH, int level, const char *fmt,...) __attribute__((format(printf, 3, 4)));
  
+ #ifdef USE_ASSERT_CHECKING
+ #define Assert(condition) \
+ 	if (!(condition)) \
+ 	{ \
+ 		write_msg(NULL, "Failed assertion in %s, line %d\n", \
+ 				  __FILE__, __LINE__); \
+ 		abort();\
+ 	}
+ #else
+ #define Assert(condition)
+ #endif
  #endif
*** a/src/bin/pg_dump/pg_backup_custom.c
--- b/src/bin/pg_dump/pg_backup_custom.c
***************
*** 25,30 ****
--- 25,31 ----
   */
  
  #include "pg_backup_archiver.h"
+ #include "compress_io.h"
  
  /*--------
   * Routines in the format interface
***************
*** 58,77 **** static void _LoadBlobs(ArchiveHandle *AH, bool drop);
  static void _Clone(ArchiveHandle *AH);
  static void _DeClone(ArchiveHandle *AH);
  
! /*------------
!  * Buffers used in zlib compression and extra data stored in archive and
!  * in TOC entries.
!  *------------
!  */
! #define zlibOutSize 4096
! #define zlibInSize	4096
  
  typedef struct
  {
! 	z_streamp	zp;
! 	char	   *zlibOut;
! 	char	   *zlibIn;
! 	size_t		inSize;
  	int			hasSeek;
  	pgoff_t		filePos;
  	pgoff_t		dataStart;
--- 59,70 ----
  static void _Clone(ArchiveHandle *AH);
  static void _DeClone(ArchiveHandle *AH);
  
! static size_t _CustomWriteFunc(ArchiveHandle *AH, const char *buf, size_t len);
! static size_t _CustomReadFunc(ArchiveHandle *AH, char **buf, size_t *buflen);
  
  typedef struct
  {
! 	CompressorState *cs;
  	int			hasSeek;
  	pgoff_t		filePos;
  	pgoff_t		dataStart;
***************
*** 89,98 **** typedef struct
   *------
   */
  static void _readBlockHeader(ArchiveHandle *AH, int *type, int *id);
- static void _StartDataCompressor(ArchiveHandle *AH, TocEntry *te);
- static void _EndDataCompressor(ArchiveHandle *AH, TocEntry *te);
  static pgoff_t _getFilePos(ArchiveHandle *AH, lclContext *ctx);
- static int	_DoDeflate(ArchiveHandle *AH, lclContext *ctx, int flush);
  
  static const char *modulename = gettext_noop("custom archiver");
  
--- 82,88 ----
***************
*** 144,174 **** InitArchiveFmt_Custom(ArchiveHandle *AH)
  		die_horribly(AH, modulename, "out of memory\n");
  	AH->formatData = (void *) ctx;
  
- 	ctx->zp = (z_streamp) malloc(sizeof(z_stream));
- 	if (ctx->zp == NULL)
- 		die_horribly(AH, modulename, "out of memory\n");
- 
  	/* Initialize LO buffering */
  	AH->lo_buf_size = LOBBUFSIZE;
  	AH->lo_buf = (void *) malloc(LOBBUFSIZE);
  	if (AH->lo_buf == NULL)
  		die_horribly(AH, modulename, "out of memory\n");
  
- 	/*
- 	 * zlibOutSize is the buffer size we tell zlib it can output to.  We
- 	 * actually allocate one extra byte because some routines want to append a
- 	 * trailing zero byte to the zlib output.  The input buffer is expansible
- 	 * and is always of size ctx->inSize; zlibInSize is just the initial
- 	 * default size for it.
- 	 */
- 	ctx->zlibOut = (char *) malloc(zlibOutSize + 1);
- 	ctx->zlibIn = (char *) malloc(zlibInSize);
- 	ctx->inSize = zlibInSize;
  	ctx->filePos = 0;
  
- 	if (ctx->zlibOut == NULL || ctx->zlibIn == NULL)
- 		die_horribly(AH, modulename, "out of memory\n");
- 
  	/*
  	 * Now open the file
  	 */
--- 134,147 ----
***************
*** 324,330 **** _StartData(ArchiveHandle *AH, TocEntry *te)
  	_WriteByte(AH, BLK_DATA);	/* Block type */
  	WriteInt(AH, te->dumpId);	/* For sanity check */
  
! 	_StartDataCompressor(AH, te);
  }
  
  /*
--- 297,303 ----
  	_WriteByte(AH, BLK_DATA);	/* Block type */
  	WriteInt(AH, te->dumpId);	/* For sanity check */
  
! 	ctx->cs = AllocateCompressor(AH->compression, _CustomWriteFunc);
  }
  
  /*
***************
*** 340,356 **** static size_t
  _WriteData(ArchiveHandle *AH, const void *data, size_t dLen)
  {
  	lclContext *ctx = (lclContext *) AH->formatData;
! 	z_streamp	zp = ctx->zp;
  
! 	zp->next_in = (void *) data;
! 	zp->avail_in = dLen;
! 
! 	while (zp->avail_in != 0)
! 	{
! 		/* printf("Deflating %lu bytes\n", (unsigned long) dLen); */
! 		_DoDeflate(AH, ctx, 0);
! 	}
! 	return dLen;
  }
  
  /*
--- 313,321 ----
  _WriteData(ArchiveHandle *AH, const void *data, size_t dLen)
  {
  	lclContext *ctx = (lclContext *) AH->formatData;
! 	CompressorState	   *cs = ctx->cs;
  
! 	return WriteDataToArchive(AH, cs, data, dLen);
  }
  
  /*
***************
*** 363,372 **** _WriteData(ArchiveHandle *AH, const void *data, size_t dLen)
  static void
  _EndData(ArchiveHandle *AH, TocEntry *te)
  {
! /*	lclContext *ctx = (lclContext *) AH->formatData; */
! /*	lclTocEntry *tctx = (lclTocEntry *) te->formatData; */
  
! 	_EndDataCompressor(AH, te);
  }
  
  /*
--- 328,338 ----
  static void
  _EndData(ArchiveHandle *AH, TocEntry *te)
  {
! 	lclContext *ctx = (lclContext *) AH->formatData;
  
! 	EndCompressorState(AH, ctx->cs);
! 	/* Send the end marker */
! 	WriteInt(AH, 0);
  }
  
  /*
***************
*** 401,411 **** _StartBlobs(ArchiveHandle *AH, TocEntry *te)
  static void
  _StartBlob(ArchiveHandle *AH, TocEntry *te, Oid oid)
  {
  	if (oid == 0)
  		die_horribly(AH, modulename, "invalid OID for large object\n");
  
  	WriteInt(AH, oid);
! 	_StartDataCompressor(AH, te);
  }
  
  /*
--- 367,380 ----
  static void
  _StartBlob(ArchiveHandle *AH, TocEntry *te, Oid oid)
  {
+ 	lclContext *ctx = (lclContext *) AH->formatData;
+ 
  	if (oid == 0)
  		die_horribly(AH, modulename, "invalid OID for large object\n");
  
  	WriteInt(AH, oid);
! 
! 	ctx->cs = AllocateCompressor(AH->compression, _CustomWriteFunc);
  }
  
  /*
***************
*** 416,422 **** _StartBlob(ArchiveHandle *AH, TocEntry *te, Oid oid)
  static void
  _EndBlob(ArchiveHandle *AH, TocEntry *te, Oid oid)
  {
! 	_EndDataCompressor(AH, te);
  }
  
  /*
--- 385,395 ----
  static void
  _EndBlob(ArchiveHandle *AH, TocEntry *te, Oid oid)
  {
! 	lclContext *ctx = (lclContext *) AH->formatData;
! 
! 	EndCompressorState(AH, ctx->cs);
! 	/* Send the end marker */
! 	WriteInt(AH, 0);
  }
  
  /*
***************
*** 532,639 **** _PrintTocData(ArchiveHandle *AH, TocEntry *te, RestoreOptions *ropt)
  static void
  _PrintData(ArchiveHandle *AH)
  {
! 	lclContext *ctx = (lclContext *) AH->formatData;
! 	z_streamp	zp = ctx->zp;
! 	size_t		blkLen;
! 	char	   *in = ctx->zlibIn;
! 	size_t		cnt;
! 
! #ifdef HAVE_LIBZ
! 	int			res;
! 	char	   *out = ctx->zlibOut;
! #endif
! 
! #ifdef HAVE_LIBZ
! 
! 	res = Z_OK;
! 
! 	if (AH->compression != 0)
! 	{
! 		zp->zalloc = Z_NULL;
! 		zp->zfree = Z_NULL;
! 		zp->opaque = Z_NULL;
! 
! 		if (inflateInit(zp) != Z_OK)
! 			die_horribly(AH, modulename, "could not initialize compression library: %s\n", zp->msg);
! 	}
! #endif
! 
! 	blkLen = ReadInt(AH);
! 	while (blkLen != 0)
! 	{
! 		if (blkLen + 1 > ctx->inSize)
! 		{
! 			free(ctx->zlibIn);
! 			ctx->zlibIn = NULL;
! 			ctx->zlibIn = (char *) malloc(blkLen + 1);
! 			if (!ctx->zlibIn)
! 				die_horribly(AH, modulename, "out of memory\n");
! 
! 			ctx->inSize = blkLen + 1;
! 			in = ctx->zlibIn;
! 		}
! 
! 		cnt = fread(in, 1, blkLen, AH->FH);
! 		if (cnt != blkLen)
! 		{
! 			if (feof(AH->FH))
! 				die_horribly(AH, modulename,
! 							 "could not read from input file: end of file\n");
! 			else
! 				die_horribly(AH, modulename,
! 					"could not read from input file: %s\n", strerror(errno));
! 		}
! 
! 		ctx->filePos += blkLen;
! 
! 		zp->next_in = (void *) in;
! 		zp->avail_in = blkLen;
! 
! #ifdef HAVE_LIBZ
! 		if (AH->compression != 0)
! 		{
! 			while (zp->avail_in != 0)
! 			{
! 				zp->next_out = (void *) out;
! 				zp->avail_out = zlibOutSize;
! 				res = inflate(zp, 0);
! 				if (res != Z_OK && res != Z_STREAM_END)
! 					die_horribly(AH, modulename, "could not uncompress data: %s\n", zp->msg);
! 
! 				out[zlibOutSize - zp->avail_out] = '\0';
! 				ahwrite(out, 1, zlibOutSize - zp->avail_out, AH);
! 			}
! 		}
! 		else
! #endif
! 		{
! 			in[zp->avail_in] = '\0';
! 			ahwrite(in, 1, zp->avail_in, AH);
! 			zp->avail_in = 0;
! 		}
! 		blkLen = ReadInt(AH);
! 	}
! 
! #ifdef HAVE_LIBZ
! 	if (AH->compression != 0)
! 	{
! 		zp->next_in = NULL;
! 		zp->avail_in = 0;
! 		while (res != Z_STREAM_END)
! 		{
! 			zp->next_out = (void *) out;
! 			zp->avail_out = zlibOutSize;
! 			res = inflate(zp, 0);
! 			if (res != Z_OK && res != Z_STREAM_END)
! 				die_horribly(AH, modulename, "could not uncompress data: %s\n", zp->msg);
! 
! 			out[zlibOutSize - zp->avail_out] = '\0';
! 			ahwrite(out, 1, zlibOutSize - zp->avail_out, AH);
! 		}
! 		if (inflateEnd(zp) != Z_OK)
! 			die_horribly(AH, modulename, "could not close compression library: %s\n", zp->msg);
! 	}
! #endif
  }
  
  static void
--- 505,511 ----
  static void
  _PrintData(ArchiveHandle *AH)
  {
! 	ReadDataFromArchive(AH, AH->compression, _CustomReadFunc);
  }
  
  static void
***************
*** 684,703 **** _skipData(ArchiveHandle *AH)
  {
  	lclContext *ctx = (lclContext *) AH->formatData;
  	size_t		blkLen;
! 	char	   *in = ctx->zlibIn;
  	size_t		cnt;
  
  	blkLen = ReadInt(AH);
  	while (blkLen != 0)
  	{
! 		if (blkLen > ctx->inSize)
  		{
! 			free(ctx->zlibIn);
! 			ctx->zlibIn = (char *) malloc(blkLen);
! 			ctx->inSize = blkLen;
! 			in = ctx->zlibIn;
  		}
! 		cnt = fread(in, 1, blkLen, AH->FH);
  		if (cnt != blkLen)
  		{
  			if (feof(AH->FH))
--- 556,576 ----
  {
  	lclContext *ctx = (lclContext *) AH->formatData;
  	size_t		blkLen;
! 	char	   *buf = NULL;
! 	int			buflen = 0;
  	size_t		cnt;
  
  	blkLen = ReadInt(AH);
  	while (blkLen != 0)
  	{
! 		if (blkLen > buflen)
  		{
! 			if (buf)
! 				free(buf);
! 			buf = (char *) malloc(blkLen);
! 			buflen = blkLen;
  		}
! 		cnt = fread(buf, 1, blkLen, AH->FH);
  		if (cnt != blkLen)
  		{
  			if (feof(AH->FH))
***************
*** 712,717 **** _skipData(ArchiveHandle *AH)
--- 585,593 ----
  
  		blkLen = ReadInt(AH);
  	}
+ 
+ 	if (buf)
+ 		free(buf);
  }
  
  /*
***************
*** 960,1105 **** _readBlockHeader(ArchiveHandle *AH, int *type, int *id)
  	*id = ReadInt(AH);
  }
  
! /*
!  * If zlib is available, then startit up. This is called from
!  * StartData & StartBlob. The buffers are setup in the Init routine.
!  */
! static void
! _StartDataCompressor(ArchiveHandle *AH, TocEntry *te)
  {
! 	lclContext *ctx = (lclContext *) AH->formatData;
! 	z_streamp	zp = ctx->zp;
! 
! #ifdef HAVE_LIBZ
  
! 	if (AH->compression < 0 || AH->compression > 9)
! 		AH->compression = Z_DEFAULT_COMPRESSION;
  
! 	if (AH->compression != 0)
! 	{
! 		zp->zalloc = Z_NULL;
! 		zp->zfree = Z_NULL;
! 		zp->opaque = Z_NULL;
! 
! 		if (deflateInit(zp, AH->compression) != Z_OK)
! 			die_horribly(AH, modulename, "could not initialize compression library: %s\n", zp->msg);
! 	}
! #else
! 
! 	AH->compression = 0;
! #endif
! 
! 	/* Just be paranoid - maybe End is called after Start, with no Write */
! 	zp->next_out = (void *) ctx->zlibOut;
! 	zp->avail_out = zlibOutSize;
  }
  
! /*
!  * Send compressed data to the output stream (via ahwrite).
!  * Each data chunk is preceded by it's length.
!  * In the case of Z0, or no zlib, just write the raw data.
!  *
!  */
! static int
! _DoDeflate(ArchiveHandle *AH, lclContext *ctx, int flush)
  {
! 	z_streamp	zp = ctx->zp;
  
! #ifdef HAVE_LIBZ
! 	char	   *out = ctx->zlibOut;
! 	int			res = Z_OK;
  
! 	if (AH->compression != 0)
! 	{
! 		res = deflate(zp, flush);
! 		if (res == Z_STREAM_ERROR)
! 			die_horribly(AH, modulename, "could not compress data: %s\n", zp->msg);
! 
! 		if (((flush == Z_FINISH) && (zp->avail_out < zlibOutSize))
! 			|| (zp->avail_out == 0)
! 			|| (zp->avail_in != 0)
! 			)
! 		{
! 			/*
! 			 * Extra paranoia: avoid zero-length chunks since a zero length
! 			 * chunk is the EOF marker. This should never happen but...
! 			 */
! 			if (zp->avail_out < zlibOutSize)
! 			{
! 				/*
! 				 * printf("Wrote %lu byte deflated chunk\n", (unsigned long)
! 				 * (zlibOutSize - zp->avail_out));
! 				 */
! 				WriteInt(AH, zlibOutSize - zp->avail_out);
! 				if (fwrite(out, 1, zlibOutSize - zp->avail_out, AH->FH) != (zlibOutSize - zp->avail_out))
! 					die_horribly(AH, modulename, "could not write to output file: %s\n", strerror(errno));
! 				ctx->filePos += zlibOutSize - zp->avail_out;
! 			}
! 			zp->next_out = (void *) out;
! 			zp->avail_out = zlibOutSize;
! 		}
! 	}
! 	else
! #endif
  	{
! 		if (zp->avail_in > 0)
! 		{
! 			WriteInt(AH, zp->avail_in);
! 			if (fwrite(zp->next_in, 1, zp->avail_in, AH->FH) != zp->avail_in)
! 				die_horribly(AH, modulename, "could not write to output file: %s\n", strerror(errno));
! 			ctx->filePos += zp->avail_in;
! 			zp->avail_in = 0;
! 		}
! 		else
! 		{
! #ifdef HAVE_LIBZ
! 			if (flush == Z_FINISH)
! 				res = Z_STREAM_END;
! #endif
! 		}
  	}
  
! #ifdef HAVE_LIBZ
! 	return res;
! #else
! 	return 1;
! #endif
! }
! 
! /*
!  * Terminate zlib context and flush it's buffers. If no zlib
!  * then just return.
!  */
! static void
! _EndDataCompressor(ArchiveHandle *AH, TocEntry *te)
! {
! 
! #ifdef HAVE_LIBZ
! 	lclContext *ctx = (lclContext *) AH->formatData;
! 	z_streamp	zp = ctx->zp;
! 	int			res;
! 
! 	if (AH->compression != 0)
  	{
! 		zp->next_in = NULL;
! 		zp->avail_in = 0;
! 
! 		do
! 		{
! 			/* printf("Ending data output\n"); */
! 			res = _DoDeflate(AH, ctx, Z_FINISH);
! 		} while (res != Z_STREAM_END);
! 
! 		if (deflateEnd(zp) != Z_OK)
! 			die_horribly(AH, modulename, "could not close compression stream: %s\n", zp->msg);
  	}
! #endif
! 
! 	/* Send the end marker */
! 	WriteInt(AH, 0);
  }
  
- 
  /*
   * Clone format-specific fields during parallel restoration.
   */
--- 836,891 ----
  	*id = ReadInt(AH);
  }
  
! static size_t
! _CustomWriteFunc(ArchiveHandle *AH, const char *buf, size_t len)
  {
! 	Assert(len != 0);
  
! 	/* never write 0-byte blocks (this should not happen) */
! 	if (len == 0)
! 		return 0;
  
! 	WriteInt(AH, len);
! 	return _WriteBuf(AH, buf, len);
  }
  
! static size_t
! _CustomReadFunc(ArchiveHandle *AH, char **buf, size_t *buflen)
  {
! 	size_t		blkLen;
! 	size_t		cnt;
  
! 	/*
! 	 * To keep things simple, we always read one compressed block at a time.
! 	 */
  
! 	blkLen = ReadInt(AH);
! 	if (blkLen == 0)
! 		return 0;
! 
! 	/* If the caller's buffer is not large enough, allocate a bigger one */
! 	if (blkLen > *buflen)
  	{
! 		free(*buf);
! 		*buf = (char *) malloc(blkLen);
! 		if (!(*buf))
! 			die_horribly(AH, modulename, "out of memory\n");
! 		*buflen = blkLen;
  	}
  
! 	cnt = _ReadBuf(AH, *buf, blkLen);
! 	if (cnt != blkLen)
  	{
! 		if (feof(AH->FH))
! 			die_horribly(AH, modulename,
! 						 "could not read from input file: end of file\n");
! 		else
! 			die_horribly(AH, modulename,
! 				"could not read from input file: %s\n", strerror(errno));
  	}
! 	return cnt;
  }
  
  /*
   * Clone format-specific fields during parallel restoration.
   */
***************
*** 1114,1125 **** _Clone(ArchiveHandle *AH)
  	memcpy(AH->formatData, ctx, sizeof(lclContext));
  	ctx = (lclContext *) AH->formatData;
  
! 	ctx->zp = (z_streamp) malloc(sizeof(z_stream));
! 	ctx->zlibOut = (char *) malloc(zlibOutSize + 1);
! 	ctx->zlibIn = (char *) malloc(ctx->inSize);
! 
! 	if (ctx->zp == NULL || ctx->zlibOut == NULL || ctx->zlibIn == NULL)
! 		die_horribly(AH, modulename, "out of memory\n");
  
  	/*
  	 * Note: we do not make a local lo_buf because we expect at most one BLOBS
--- 900,907 ----
  	memcpy(AH->formatData, ctx, sizeof(lclContext));
  	ctx = (lclContext *) AH->formatData;
  
! 	if (ctx->cs != NULL)
! 		die_horribly(AH, modulename, "compressor active\n");
  
  	/*
  	 * Note: we do not make a local lo_buf because we expect at most one BLOBS
***************
*** 1133,1141 **** static void
  _DeClone(ArchiveHandle *AH)
  {
  	lclContext *ctx = (lclContext *) AH->formatData;
  
- 	free(ctx->zlibOut);
- 	free(ctx->zlibIn);
- 	free(ctx->zp);
  	free(ctx);
  }
--- 915,924 ----
  _DeClone(ArchiveHandle *AH)
  {
  	lclContext *ctx = (lclContext *) AH->formatData;
+ 	CompressorState	   *cs = ctx->cs;
+ 
+ 	EndCompressorState(AH, cs);
  
  	free(ctx);
  }
+ 
*** a/src/bin/pg_dump/pg_dump.c
--- b/src/bin/pg_dump/pg_dump.c
***************
*** 56,61 ****
--- 56,62 ----
  
  #include "pg_backup_archiver.h"
  #include "dumputils.h"
+ #include "compress_io.h"
  
  extern char *optarg;
  extern int	optind,
***************
*** 2174,2180 **** dumpBlobs(Archive *AH, void *arg)
  					exit_nicely();
  				}
  
! 				WriteData(AH, buf, cnt);
  			} while (cnt > 0);
  
  			lo_close(g_conn, loFd);
--- 2175,2183 ----
  					exit_nicely();
  				}
  
! 				/* we try to avoid writing empty chunks */
! 				if (cnt > 0)
! 					WriteData(AH, buf, cnt);
  			} while (cnt > 0);
  
  			lo_close(g_conn, loFd);
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to