> Hm, I think it's possible to rig the test to do dummy
> copy of pgresult, thus it's possible to see what kind
> of speed is possible..  Will try.

I added a new method (-x) to rowdump where it asks for row
with PQgetRowData() and then tries to emulate super-efficient
PGresult copy, then loads data from that PGresult.

Quick reference:
rowdump1 - single-row-mode1 [~ libpq 9.2]
rowdump2 - single-row-mode2 [~ libpq 9.1]

-s - single row mode with PQgetResult()
-z - single row mode with PQgetRowData()
-x - simulated optimized PQgetResult()

-------------------------------------------------------------
QUERY: select 10000,200,300000,rpad('x',30,'z') from
generate_series(1,5000000)
./rowdump1 -s:   6.28   6.25   6.39  avg:  6.31 [ 100.00 % ]
./rowdump2 -s:   7.49   7.40   7.57  avg:  7.49 [ 118.71 % ]
./rowdump1 -z:   2.86   2.77   2.79  avg:  2.81 [ 44.50 % ]
./rowdump1 -x:   3.46   3.27   3.29  avg:  3.34 [ 52.96 % ]
QUERY: select
rpad('x',10,'z'),rpad('x',20,'z'),rpad('x',30,'z'),rpad('x',40,'z'),rpad('x',50,'z'),rpad('x',60,'z')
from generate_series(1,3000000)
./rowdump1 -s:   7.76   7.76   7.68  avg:  7.73 [ 100.00 % ]
./rowdump2 -s:   8.24   8.12   8.66  avg:  8.34 [ 107.85 % ]
./rowdump1 -z:   5.34   5.07   5.23  avg:  5.21 [ 67.41 % ]
./rowdump1 -x:   5.53   5.61   5.61  avg:  5.58 [ 72.20 % ]
QUERY: select
1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32,33,34,35,36,37,38,39,40,41,42,43,44,45,46,47,48,49,50,51,52,53,54,55,56,57,58,59,60,61,62,63,64,65,66,67,68,69,70,71,72,73,74,75,76,77,78,79,80,81,82,83,84,85,86,87,88,89,90,91,92,93,94,95,96,97,98,99,100
from generate_series(1,800000)
./rowdump1 -s:   7.49   7.66   7.59  avg:  7.58 [ 100.00 % ]
./rowdump2 -s:   7.56   8.12   7.95  avg:  7.88 [ 103.91 % ]
./rowdump1 -z:   2.77   2.76   2.76  avg:  2.76 [ 36.46 % ]
./rowdump1 -x:   3.07   3.05   3.18  avg:  3.10 [ 40.90 % ]
QUERY: select 1000,rpad('x', 400, 'z'),rpad('x', 4000, 'z') from
generate_series(1,100000)
./rowdump1 -s:   2.66   2.62   2.67  avg:  2.65 [ 100.00 % ]
./rowdump2 -s:   3.11   3.14   3.11  avg:  3.12 [ 117.74 % ]
./rowdump1 -z:   2.49   2.46   2.47  avg:  2.47 [ 93.33 % ]
./rowdump1 -x:   2.59   2.57   2.57  avg:  2.58 [ 97.23 % ]
-----------------------------------------------------------------

It shows that even if the actual "fast" row copy will be slower
than this one here, it's still quote competitive approach to
PQgetRowData(), as long it's not too slow.

So the optimized PGgetResult() may be good enough, thus we
can drop the idea of PQgetRowData().

Code attached, also in https://github.com/markokr/pqtest repo.

-- 
marko

pg1 = /opt/apps/pgsql92mode1
pg2 = /opt/apps/pgsql92mode2

X1 = -DHAVE_ROWDATA -I$(pg1)/include/internal -I$(pg1)/include/server

CFLAGS = -O -g -Wall

all: rowdump1 rowdump2

rowdump1: rowdump.c
        $(CC) -I$(pg1)/include $(CFLAGS) -o $@ $< -L$(pg1)/lib 
-Wl,-rpath=$(pg1)/lib -lpq $(X1)

rowdump2: rowdump.c
        $(CC) -I$(pg2)/include $(CFLAGS) -o $@ $< -L$(pg2)/lib 
-Wl,-rpath=$(pg2)/lib -lpq

clean:
        rm -f rowdump1 rowdump2 time.tmp README.html

html: README.html

README.html: README.rst
        rst2html $< > $@


#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <getopt.h>

#include <libpq-fe.h>

#ifdef HAVE_ROWDATA
#include <internal/libpq-int.h>
#endif

struct Context {
	PGconn *db;
	int count;

	char *buf;
	int buflen;
	int bufpos;
};

/* print error and exit */
static void die(PGconn *db, const char *msg)
{
	if (db)
		fprintf(stderr, "%s: %s\n", msg, PQerrorMessage(db));
	else
		fprintf(stderr, "%s\n", msg);
	exit(1);
}

/* write out buffer */
static void out_flush(struct Context *ctx)
{
	int out;
	if (!ctx->buf)
		return;

	out = write(1, ctx->buf, ctx->bufpos);
	if (out != ctx->bufpos)
		die(NULL, "failed to write file");
	ctx->bufpos = 0;
	ctx->buflen = 0;
	free(ctx->buf);
	ctx->buf = NULL;
}

/* add char to buffer */
static void out_char(struct Context *ctx, char c)
{
	if (ctx->bufpos + 1 > ctx->buflen) {
		if (!ctx->buf) {
			ctx->buflen = 16;
			ctx->buf = malloc(ctx->buflen);
			if (!ctx->buf)
				die(NULL, "failed to allocate buffer");
		} else {
			ctx->buflen *= 2;
			ctx->buf = realloc(ctx->buf, ctx->buflen);
			if (!ctx->buf)
				die(NULL, "failed to resize buffer");
		}
	}

	ctx->buf[ctx->bufpos++] = c;
}

/* quote string for copy */
static void proc_value(struct Context *ctx, const char *val, int vlen)
{
	int i;
	char c;

	for (i = 0; i < vlen; i++) {
		c = val[i];
		switch (c) {
		case '\\':
			out_char(ctx, '\\');
			out_char(ctx, '\\');
			break;
		case '\t':
			out_char(ctx, '\\');
			out_char(ctx, 't');
			break;
		case '\n':
			out_char(ctx, '\\');
			out_char(ctx, 'n');
			break;
		case '\r':
			out_char(ctx, '\\');
			out_char(ctx, 'r');
			break;
		default:
			out_char(ctx, c);
			break;
		}
	}
}

/* quote one row for copy from regular PGresult */
static void proc_row(struct Context *ctx, PGresult *res, int tup)
{
	int n = PQnfields(res);
	const char *val;
	int i, vlen;

	ctx->count++;

	for (i = 0; i < n; i++) {
		if (i > 0)
			out_char(ctx, '\t');
		if (PQgetisnull(res, tup, i)) {
			out_char(ctx, '\\');
			out_char(ctx, 'N');
			continue;
		}

		vlen = PQgetlength(res, tup, i);
		val = PQgetvalue(res, tup, i);
		proc_value(ctx, val, vlen);
	}
	out_char(ctx, '\n');
	out_flush(ctx);
}

/* load everything to single PGresult */
static void exec_query_full(struct Context *ctx, const char *q)
{
	PGconn *db = ctx->db;
	PGresult *r;
	ExecStatusType s;
	int i;

	ctx->count = 0;

	if (!PQsendQuery(db, q))
		die(db, "PQsendQuery");

	/* get next result */
	r = PQgetResult(db);
	s = PQresultStatus(r);
	if (s != PGRES_TUPLES_OK)
		die(db, PQresStatus(s));

	for (i = 0; i < PQntuples(r); i++) {
		proc_row(ctx, r, i);
		ctx->count++;
	}
	PQclear(r);
}

/* load each row as PGresult */
static void exec_query_single_row(struct Context *ctx, const char *q)
{
	PGconn *db = ctx->db;
	PGresult *r;
	ExecStatusType s;

	ctx->count = 0;

	if (!PQsendQuery(db, q))
		die(db, "PQsendQuery");

	if (!PQsetSingleRowMode(db))
		die(NULL, "PQsetSingleRowMode");

	/* loop until all resultset is done */
	while (1) {
		/* get next result */
		r = PQgetResult(db);
		if (!r)
			break;
		s = PQresultStatus(r);
		switch (s) {
			case PGRES_TUPLES_OK:
				//printf("query successful, got %d rows\n", ctx->count);
				ctx->count = 0;
				break;
			case PGRES_SINGLE_TUPLE:
				/* process first (only) row */
				proc_row(ctx, r, 0);
				break;
			default:
				fprintf(stderr, "result: %s\n", PQresStatus(s));
				exit(1);
				break;
		}

		PQclear(r);
	}
}

#ifdef HAVE_ROWDATA

/* load row data directly from network buffer */
static void proc_row_zcopy(struct Context *ctx, PGresult *res, PGdataValue *cols)
{
	int n = PQnfields(res);
	const char *val;
	int i, vlen;

	ctx->count++;

	for (i = 0; i < n; i++) {
		if (i > 0)
			out_char(ctx, '\t');
		if (cols[i].len == -1) {
			out_char(ctx, '\\');
			out_char(ctx, 'N');
			continue;
		}

		vlen = cols[i].len;
		val = cols[i].value;
		proc_value(ctx, val, vlen);
	}
	out_char(ctx, '\n');
	out_flush(ctx);
}

/* load rows directly from network buffer */
static void exec_query_zero_copy(struct Context *ctx, const char *q)
{
	PGconn *db = ctx->db;
	PGresult *r;
	ExecStatusType s;
	PGdataValue *cols;

	ctx->count = 0;

	if (!PQsendQuery(db, q))
		die(db, "PQsendQuery");

	if (!PQsetSingleRowMode(db))
		die(NULL, "PQsetSingleRowMode");

	/* loop until all resultset is done */
	while (PQgetRowData(db, &r, &cols)) {
		proc_row_zcopy(ctx, r, cols);
	}

	/* get final result */
	r = PQgetResult(db);
	s = PQresultStatus(r);
	switch (s) {
	case PGRES_TUPLES_OK:
		//printf("query successful, got %d rows\n", ctx->count);
		ctx->count = 0;
		break;
	default:
		printf("result: %s\n", PQresStatus(s));
		break;
	}
	PQclear(r);
}

/* create new PGresult as fast as possible */
static char *fake_copy(PGresult *res, PGdataValue *cols, char **data_p)
{
	PGresult_data *hdr = res->curBlock;
	int n = PQnfields(res);
	int slen;
	int datalen;
	int hdrlen;
	char *buf;

	/* structure length */
	slen = sizeof(PGresult);

	/* row headers */
	hdrlen = res->curOffset;

	/* row data */
	datalen = cols[n-1].value - cols[0].value;
	if (cols[n-1].len > 0)
		datalen += cols[n-1].len;

	/* malloc & copy */
	buf = malloc(slen + hdrlen + datalen);
	if (!buf)
		return NULL;
	memcpy(buf, res, slen);
	memcpy(buf + slen, hdr, hdrlen);
	memcpy(buf + slen + hdrlen, cols[0].value, datalen);

	/* return also data offset */
	*data_p = buf + slen + hdrlen;

	return buf;
}

/* use cols for offsets only, load actual data from *data */
static void proc_row_fake(struct Context *ctx, PGresult *res, PGdataValue *cols, char *data)
{
	int n = PQnfields(res);
	const char *val;
	int i, vlen;
	const char *xstart = cols[0].value;

	ctx->count++;

	for (i = 0; i < n; i++) {
		if (i > 0)
			out_char(ctx, '\t');
		if (cols[i].len == -1) {
			out_char(ctx, '\\');
			out_char(ctx, 'N');
			continue;
		}

		vlen = cols[i].len;

		/* recalcuate value into *data */
		val = data + (cols[i].value - xstart);

		proc_value(ctx, val, vlen);
	}
	out_char(ctx, '\n');
	out_flush(ctx);
}

/* emulate optimized PGresult */
static void exec_query_fake_copy(struct Context *ctx, const char *q)
{
	PGconn *db = ctx->db;
	PGresult *r;
	ExecStatusType s;
	PGdataValue *cols;

	ctx->count = 0;

	if (!PQsendQuery(db, q))
		die(db, "PQsendQuery");

	if (!PQsetSingleRowMode(db))
		die(NULL, "PQsetSingleRowMode");

	/* loop until all resultset is done */
	while (PQgetRowData(db, &r, &cols)) {
		char *data = NULL;
		void *copy = fake_copy(r, cols, &data);
		proc_row_fake(ctx, r, cols, data);
		free(copy);
	}

	/* get final result */
	r = PQgetResult(db);
	s = PQresultStatus(r);
	switch (s) {
	case PGRES_TUPLES_OK:
		//printf("query successful, got %d rows\n", ctx->count);
		ctx->count = 0;
		break;
	default:
		printf("result: %s\n", PQresStatus(s));
		break;
	}
	PQclear(r);
}

#else

static void exec_query_fake_copy(struct Context *ctx, const char *q)
{
	die(NULL, "PQgetRowData() not available");
}

static void exec_query_zero_copy(struct Context *ctx, const char *q)
{
	die(NULL, "PQgetRowData() not available");
}

#endif

static const char usage_str[] =
"usage: rowdump [-z|-s|-f|-x] [-d CONNSTR] [-c SQLCOMMAND]\n"
"switches:\n"
"  -f  Load full resultset at once\n"
"  -s  Single-row mode\n"
"  -z  Single-row with direct access (PQgetRowData())\n"
"  -x  Single-row with fast PGresult copy\n"
;


static void usage(int err)
{
	printf("%s\n", usage_str);
	exit(err);
}

int main(int argc, char *argv[])
{
	const char *connstr = "dbname=postgres";
	const char *q = "show all";
	PGconn *db;
	struct Context ctx;
	char c;
	int exec_type = 'f';

	while ((c = getopt(argc, argv, "c:d:zsxfh")) != -1) {
		switch (c) {
		case 'c':
			q = optarg;
			break;
		case 'd':
			connstr = optarg;
			break;
		case 'z':
		case 's':
		case 'f':
		case 'x':
			exec_type = c;
			break;
		case 'h':
			usage(0);
			break;
		default:
			usage(1);
			break;
		}
	}

	db = PQconnectdb(connstr);
	if (!db || PQstatus(db) == CONNECTION_BAD)
		die(db, "connect");

	memset(&ctx, 0, sizeof(ctx));
	ctx.db = db;
	switch (exec_type) {
	case 'x':
		exec_query_fake_copy(&ctx, q);
		break;
	case 'z':
		exec_query_zero_copy(&ctx, q);
		break;
	case 'f':
		exec_query_full(&ctx, q);
		break;
	case 's':
		exec_query_single_row(&ctx, q);
		break;
	}

	PQfinish(db);

	return 0;
}

Attachment: xtest.sh
Description: Bourne shell script

-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to