On Tue, Jan 20, 2015 at 4:05 PM, Michael Paquier
<michael.paqu...@gmail.com> wrote:
> On Tue, Jan 20, 2015 at 8:47 AM, Michael Paquier
> <michael.paqu...@gmail.com> wrote:
>> On Mon, Jan 19, 2015 at 11:06 PM, Joe Conway <m...@joeconway.com> wrote:
>>> -----BEGIN PGP SIGNED MESSAGE-----
>>> Hash: SHA1
>>>
>>> On 01/19/2015 08:16 AM, Alvaro Herrera wrote:
>>>> Haven't looked at this patch, but I wonder if it would be better
>>>> to replace the innards of connectby with a rewrite of the query to
>>>> use standard WITH queries.  Maybe we can remove a couple hundred
>>>> lines from tablefunc.c?
>>>
>>> Seems like a good idea -- connectby is really obsolete for quite a
>>> while now other than as an SRF example. I guess we only keep it around
>>> for backwards compatibility?
>> For master, yes we could brush up things a bit. Now do we really do
>> the same for back-branches? I would think that the answer there is
>> something close to the patch I sent.
>
> So, using a WITH RECURSIVE, here is a query equivalent to what connectby does:
> [...]
> Thoughts?
Looking at this stuff, actually I do not think that it is possible for
now to support orderby_fld at the same level with WITH RECURSIVE in
connectby because we need to reorder the items of the base table
within the 2nd query of the UNION ALL to give something like that and
WITH RECURSIVE does not support ORDER BY (and mutual recursion between
WITH items).

Another thing to note is that WITH RECURSIVE weakens the infinite
recursion detection. I don't think it would be good to weaken that...
Attached is a result of this random hacking, resulting in some cleanup
btw:
 1 file changed, 110 insertions(+), 264 deletions(-)
Regards,
-- 
Michael
diff --git a/contrib/tablefunc/tablefunc.c b/contrib/tablefunc/tablefunc.c
index 3388fab..e7c3674 100644
--- a/contrib/tablefunc/tablefunc.c
+++ b/contrib/tablefunc/tablefunc.c
@@ -54,7 +54,6 @@ static Tuplestorestate *get_crosstab_tuplestore(char *sql,
 						bool randomAccess);
 static void validateConnectbyTupleDesc(TupleDesc tupdesc, bool show_branch, bool show_serial);
 static bool compatCrosstabTupleDescs(TupleDesc tupdesc1, TupleDesc tupdesc2);
-static bool compatConnectbyTupleDescs(TupleDesc tupdesc1, TupleDesc tupdesc2);
 static void get_normal_pair(float8 *x1, float8 *x2);
 static Tuplestorestate *connectby(char *relname,
 		  char *key_fld,
@@ -68,21 +67,6 @@ static Tuplestorestate *connectby(char *relname,
 		  MemoryContext per_query_ctx,
 		  bool randomAccess,
 		  AttInMetadata *attinmeta);
-static Tuplestorestate *build_tuplestore_recursively(char *key_fld,
-							 char *parent_key_fld,
-							 char *relname,
-							 char *orderby_fld,
-							 char *branch_delim,
-							 char *start_with,
-							 char *branch,
-							 int level,
-							 int *serial,
-							 int max_depth,
-							 bool show_branch,
-							 bool show_serial,
-							 MemoryContext per_query_ctx,
-							 AttInMetadata *attinmeta,
-							 Tuplestorestate *tupstore);
 
 typedef struct
 {
@@ -1161,14 +1145,16 @@ connectby(char *relname,
 	Tuplestorestate *tupstore = NULL;
 	int			ret;
 	MemoryContext oldcontext;
-
-	int			serial = 1;
+	StringInfoData inner_sql, outer_sql, orderby_sql;
+	char	  **values;
+	int			num_cols;
 
 	/* Connect to SPI manager */
 	if ((ret = SPI_connect()) < 0)
 		/* internal error */
 		elog(ERROR, "connectby: SPI_connect returned %d", ret);
 
+
 	/* switch to longer term context to create the tuple store */
 	oldcontext = MemoryContextSwitchTo(per_query_ctx);
 
@@ -1177,244 +1163,137 @@ connectby(char *relname,
 
 	MemoryContextSwitchTo(oldcontext);
 
-	/* now go get the whole tree */
-	tupstore = build_tuplestore_recursively(key_fld,
-											parent_key_fld,
-											relname,
-											orderby_fld,
-											branch_delim,
-											start_with,
-											start_with, /* current_branch */
-											0,	/* initial level is 0 */
-											&serial,	/* initial serial is 1 */
-											max_depth,
-											show_branch,
-											show_serial,
-											per_query_ctx,
-											attinmeta,
-											tupstore);
-
-	SPI_finish();
-
-	return tupstore;
-}
-
-static Tuplestorestate *
-build_tuplestore_recursively(char *key_fld,
-							 char *parent_key_fld,
-							 char *relname,
-							 char *orderby_fld,
-							 char *branch_delim,
-							 char *start_with,
-							 char *branch,
-							 int level,
-							 int *serial,
-							 int max_depth,
-							 bool show_branch,
-							 bool show_serial,
-							 MemoryContext per_query_ctx,
-							 AttInMetadata *attinmeta,
-							 Tuplestorestate *tupstore)
-{
-	TupleDesc	tupdesc = attinmeta->tupdesc;
-	int			ret;
-	int			proc;
-	int			serial_column;
-	StringInfoData sql;
-	char	  **values;
-	char	   *current_key;
-	char	   *current_key_parent;
-	char		current_level[INT32_STRLEN];
-	char		serial_str[INT32_STRLEN];
-	char	   *current_branch;
-	HeapTuple	tuple;
-
-	if (max_depth > 0 && level > max_depth)
-		return tupstore;
-
-	initStringInfo(&sql);
-
-	/* Build initial sql statement */
-	if (!show_serial)
-	{
-		appendStringInfo(&sql, "SELECT %s, %s FROM %s WHERE %s = %s AND %s IS NOT NULL AND %s <> %s",
-						 key_fld,
-						 parent_key_fld,
-						 relname,
-						 parent_key_fld,
-						 quote_literal_cstr(start_with),
-						 key_fld, key_fld, parent_key_fld);
-		serial_column = 0;
-	}
+	/*
+	 * Build query to be executed, first begin with the internal
+	 * WITH query recursively run to build the trees.
+	 */
+	initStringInfo(&inner_sql);
+	initStringInfo(&outer_sql);
+	initStringInfo(&orderby_sql);
+
+	if (orderby_fld)
+		appendStringInfo(&orderby_sql, ", %s AS orderby_txt ", orderby_fld);
+
+	/* Start point with root portion */
+	appendStringInfo(&inner_sql,
+			"SELECT %s AS key, 0::int AS level, NULL::text AS parent_key, %s::text AS "
+			"ct_full_list "
+			"%s"
+			"FROM %s "
+			"WHERE %s = %s ",
+			key_fld,
+			key_fld,
+			orderby_fld ? orderby_sql.data : " ",
+			relname,
+			key_fld,
+			quote_literal_cstr(start_with));
+	appendStringInfo(&inner_sql, "UNION ALL ");
+	appendStringInfo(&inner_sql,
+			"SELECT ctext.%s AS key, "
+			"(ctree.level + 1)::int AS level, "
+			"ctext.%s::text AS parent_key, "
+			"CAST(ctree.ct_full_list || %s || ctext.%s AS text) AS ct_full_list "
+			"%s"
+			"FROM %s AS ctext "
+			"INNER JOIN connectby_tree AS ctree "
+			"ON (ctext.%s = ctree.key) "
+			"WHERE ctree.level <= %d - 1 OR %d <= 0 ",
+			key_fld,
+			parent_key_fld,
+			quote_literal_cstr(branch_delim),
+			key_fld,
+			orderby_fld ? orderby_sql.data : " ",
+			relname,
+			parent_key_fld,
+			max_depth, max_depth);
+
+	appendStringInfo(&outer_sql,
+					 "WITH RECURSIVE connectby_tree AS ( %s ) "
+					 "SELECT key, parent_key, level, ct_full_list "
+					 "FROM connectby_tree ",
+					 inner_sql.data);
+
+	if (orderby_fld != NULL)
+		appendStringInfo(&outer_sql,
+						 "ORDER BY ct_full_list, orderby_txt, level");
 	else
-	{
-		appendStringInfo(&sql, "SELECT %s, %s FROM %s WHERE %s = %s AND %s IS NOT NULL AND %s <> %s ORDER BY %s",
-						 key_fld,
-						 parent_key_fld,
-						 relname,
-						 parent_key_fld,
-						 quote_literal_cstr(start_with),
-						 key_fld, key_fld, parent_key_fld,
-						 orderby_fld);
-		serial_column = 1;
-	}
+		appendStringInfo(&outer_sql,
+						 "ORDER BY ct_full_list, level");
+
+	elog(LOG, "query: %s", outer_sql.data);
 
+	/* Determine number of result columns */
 	if (show_branch)
-		values = (char **) palloc((CONNECTBY_NCOLS + serial_column) * sizeof(char *));
+		num_cols = CONNECTBY_NCOLS;
 	else
-		values = (char **) palloc((CONNECTBY_NCOLS_NOBRANCH + serial_column) * sizeof(char *));
-
-	/* First time through, do a little setup */
-	if (level == 0)
-	{
-		/* root value is the one we initially start with */
-		values[0] = start_with;
-
-		/* root value has no parent */
-		values[1] = NULL;
-
-		/* root level is 0 */
-		sprintf(current_level, "%d", level);
-		values[2] = current_level;
-
-		/* root branch is just starting root value */
-		if (show_branch)
-			values[3] = start_with;
-
-		/* root starts the serial with 1 */
-		if (show_serial)
-		{
-			sprintf(serial_str, "%d", (*serial)++);
-			if (show_branch)
-				values[4] = serial_str;
-			else
-				values[3] = serial_str;
-		}
-
-		/* construct the tuple */
-		tuple = BuildTupleFromCStrings(attinmeta, values);
-
-		/* now store it */
-		tuplestore_puttuple(tupstore, tuple);
-
-		/* increment level */
-		level++;
-	}
+		num_cols = CONNECTBY_NCOLS_NOBRANCH;
+	if (show_serial)
+		num_cols++;
+	values = (char **) palloc(num_cols * sizeof(char *));
 
-	/* Retrieve the desired rows */
-	ret = SPI_execute(sql.data, true, 0);
-	proc = SPI_processed;
+	/* execute query and fetch results */
+	ret = SPI_execute(outer_sql.data, true, 0);
 
-	/* Check for qualifying tuples */
-	if ((ret == SPI_OK_SELECT) && (proc > 0))
+	if ((ret == SPI_OK_SELECT) && (SPI_processed > 0))
 	{
-		HeapTuple	spi_tuple;
-		SPITupleTable *tuptable = SPI_tuptable;
-		TupleDesc	spi_tupdesc = tuptable->tupdesc;
+		SPITupleTable *spi_tuptable = SPI_tuptable;
+		TupleDesc	spi_tupdesc = spi_tuptable->tupdesc;
 		int			i;
-		StringInfoData branchstr;
-		StringInfoData chk_branchstr;
-		StringInfoData chk_current_key;
-
-		/* First time through, do a little more setup */
-		if (level == 0)
-		{
-			/*
-			 * Check that return tupdesc is compatible with the one we got
-			 * from the query, but only at level 0 -- no need to check more
-			 * than once
-			 */
-
-			if (!compatConnectbyTupleDescs(tupdesc, spi_tupdesc))
-				ereport(ERROR,
-						(errcode(ERRCODE_SYNTAX_ERROR),
-						 errmsg("invalid return type"),
-						 errdetail("Return and SQL tuple descriptions are " \
-								   "incompatible.")));
-		}
 
-		initStringInfo(&branchstr);
-		initStringInfo(&chk_branchstr);
-		initStringInfo(&chk_current_key);
-
-		for (i = 0; i < proc; i++)
+		/* Build tuples and store them in store */
+		for (i = 0; i < SPI_processed; i++)
 		{
-			/* initialize branch for this pass */
-			appendStringInfoString(&branchstr, branch);
-			appendStringInfo(&chk_branchstr, "%s%s%s", branch_delim, branch, branch_delim);
-
-			/* get the next sql result tuple */
-			spi_tuple = tuptable->vals[i];
-
-			/* get the current key and parent */
-			current_key = SPI_getvalue(spi_tuple, spi_tupdesc, 1);
-			appendStringInfo(&chk_current_key, "%s%s%s", branch_delim, current_key, branch_delim);
-			current_key_parent = pstrdup(SPI_getvalue(spi_tuple, spi_tupdesc, 2));
-
-			/* get the current level */
-			sprintf(current_level, "%d", level);
-
-			/* check to see if this key is also an ancestor */
-			if (strstr(chk_branchstr.data, chk_current_key.data))
-				elog(ERROR, "infinite recursion detected");
+			HeapTuple	spi_tuple = spi_tuptable->vals[i];
+			char	   *key;
+			char	   *parent_key;
+			char	   *branch;
+			char	   *level;
+			char		serial_str[INT32_STRLEN];
+			int			count = 0;
+			HeapTuple	tuple;
 
-			/* OK, extend the branch */
-			appendStringInfo(&branchstr, "%s%s", branch_delim, current_key);
-			current_branch = branchstr.data;
+			/* get values */
+			key = SPI_getvalue(spi_tuple, spi_tupdesc, 1);
+			parent_key = SPI_getvalue(spi_tuple, spi_tupdesc, 2);
+			level = SPI_getvalue(spi_tuple, spi_tupdesc, 3);
+			branch = SPI_getvalue(spi_tuple, spi_tupdesc, 4);
 
-			/* build a tuple */
-			values[0] = pstrdup(current_key);
-			values[1] = current_key_parent;
-			values[2] = current_level;
+			/* build tuple */
+			values[count++] = key;
+			values[count++] = parent_key;
+			values[count++] = level;
 			if (show_branch)
-				values[3] = current_branch;
+				values[count++] = branch;
 			if (show_serial)
 			{
-				sprintf(serial_str, "%d", (*serial)++);
-				if (show_branch)
-					values[4] = serial_str;
-				else
-					values[3] = serial_str;
+				sprintf(serial_str, "%d", i + 1);
+				values[count++] = serial_str;
 			}
 
+			/* construct the tuple */
 			tuple = BuildTupleFromCStrings(attinmeta, values);
 
-			xpfree(current_key);
-			xpfree(current_key_parent);
-
-			/* store the tuple for later use */
+			/* now store it */
 			tuplestore_puttuple(tupstore, tuple);
-
 			heap_freetuple(tuple);
 
-			/* recurse using current_key_parent as the new start_with */
-			tupstore = build_tuplestore_recursively(key_fld,
-													parent_key_fld,
-													relname,
-													orderby_fld,
-													branch_delim,
-													values[0],
-													current_branch,
-													level + 1,
-													serial,
-													max_depth,
-													show_branch,
-													show_serial,
-													per_query_ctx,
-													attinmeta,
-													tupstore);
-
-			/* reset branch for next pass */
-			resetStringInfo(&branchstr);
-			resetStringInfo(&chk_branchstr);
-			resetStringInfo(&chk_current_key);
+			if (key)
+				pfree(key);
+			if (level)
+				pfree(level);
+			if (parent_key)
+				pfree(parent_key);
+			if (branch)
+				pfree(branch);
 		}
-
-		xpfree(branchstr.data);
-		xpfree(chk_branchstr.data);
-		xpfree(chk_current_key.data);
 	}
 
+	SPI_finish();
+
+	/* clean up */
+	resetStringInfo(&inner_sql);
+	resetStringInfo(&outer_sql);
+
 	return tupstore;
 }
 
@@ -1486,39 +1365,6 @@ validateConnectbyTupleDesc(TupleDesc tupdesc, bool show_branch, bool show_serial
 }
 
 /*
- * Check if spi sql tupdesc and return tupdesc are compatible
- */
-static bool
-compatConnectbyTupleDescs(TupleDesc ret_tupdesc, TupleDesc sql_tupdesc)
-{
-	Oid			ret_atttypid;
-	Oid			sql_atttypid;
-
-	/* check the key_fld types match */
-	ret_atttypid = ret_tupdesc->attrs[0]->atttypid;
-	sql_atttypid = sql_tupdesc->attrs[0]->atttypid;
-	if (ret_atttypid != sql_atttypid)
-		ereport(ERROR,
-				(errcode(ERRCODE_SYNTAX_ERROR),
-				 errmsg("invalid return type"),
-				 errdetail("SQL key field datatype does " \
-						   "not match return key field datatype.")));
-
-	/* check the parent_key_fld types match */
-	ret_atttypid = ret_tupdesc->attrs[1]->atttypid;
-	sql_atttypid = sql_tupdesc->attrs[1]->atttypid;
-	if (ret_atttypid != sql_atttypid)
-		ereport(ERROR,
-				(errcode(ERRCODE_SYNTAX_ERROR),
-				 errmsg("invalid return type"),
-				 errdetail("SQL parent key field datatype does " \
-						   "not match return parent key field datatype.")));
-
-	/* OK, the two tupdescs are compatible for our purposes */
-	return true;
-}
-
-/*
  * Check if two tupdescs match in type of attributes
  */
 static bool
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to