forward patch to pg_hackers

There is fixed patch. Please, Jaime, can you look on it?

Thank You
Pavel

2009/7/30 Tom Lane <t...@sss.pgh.pa.us>:
> Jaime Casanova <jcasa...@systemguards.com.ec> writes:
>>> On Mon, Jul 20, 2009 at 10:09 AM, Alvaro
>>>> Getting rid of the check on natts was "ungood" ... it needs to compare
>>>> the number of undropped columns of both tupdescs.
>
>> patch attached
>
> This patch is *still* introducing more bugs than it fixes.  The reason
> is that it has modified validate_tupdesc_compat to allow the tupdescs to
> be somewhat different, but has fixed only one of the several call sites
> to deal with the consequences of that.  The others will now become crash
> risks if we apply it as-is.
>
> What I would suggest as a suitable plan for a fix is to modify
> validate_tupdesc_compat so that it returns a flag indicating whether the
> tupdesc compatibility is exact or requires translation.  If translation
> is required, provide another routine that does that --- probably using a
> mapping data structure set up by validate_tupdesc_compat, since in some
> of these cases we'll be processing many tuples.  Then the callers just
> have to know enough to call the tuple-translation function when
> validate_tupdesc_compat tells them to.
>
> There are a number of other places in the system with similar
> requirements, although none of them seem to be exact matches.
> In particular ExecEvalConvertRowtype() provides a good template for
> efficient translation logic, but it's using column name matching
> rather than positional matching so you couldn't share the setup logic.
> I'm not sure if it's worth moving all this code into the core so that
> it can be shared with other future uses (maybe in other PLs), but it's
> worth considering that.  access/common/heaptuple.c or tupdesc.c might
> be a good place for it if we decide to do that.
>
> The executor's junkfilter code is pretty nearly related as well, and
> perhaps the Right Thing is to make all of this stuff into applications
> of junkfilters with different setup routines for the different
> requirements.  However the junkfilter code is designed to work with
> tuples that are in TupleTableSlots, which isn't particularly helpful for
> plpgsql's uses, so maybe trying to unify with that code is more trouble
> than it's worth.
>
> I'm marking this patch as Waiting on Author again, although perhaps
> Returned with Feedback would be better since my suggestions amount
> to wholesale rewrites.
>
>                        regards, tom lane
>
*** ./src/pl/plpgsql/src/pl_exec.c.orig	2009-07-22 04:31:38.000000000 +0200
--- ./src/pl/plpgsql/src/pl_exec.c	2009-08-03 21:21:42.196893375 +0200
***************
*** 190,196 ****
  					   Oid reqtype, int32 reqtypmod,
  					   bool isnull);
  static void exec_init_tuple_store(PLpgSQL_execstate *estate);
! static void validate_tupdesc_compat(TupleDesc expected, TupleDesc returned,
  						const char *msg);
  static void exec_set_found(PLpgSQL_execstate *estate, bool state);
  static void plpgsql_create_econtext(PLpgSQL_execstate *estate);
--- 190,200 ----
  					   Oid reqtype, int32 reqtypmod,
  					   bool isnull);
  static void exec_init_tuple_store(PLpgSQL_execstate *estate);
! static HeapTuple convert_tuple(TupleDesc dest_desc, TupleDesc src_desc, 
! 					    int *conversion_map, 
! 					    HeapTuple tuple);
! static int *validate_tupdesc_compat(TupleDesc expected, TupleDesc returned,
! 						bool *needs_conversion,
  						const char *msg);
  static void exec_set_found(PLpgSQL_execstate *estate, bool state);
  static void plpgsql_create_econtext(PLpgSQL_execstate *estate);
***************
*** 216,221 ****
--- 220,227 ----
  	ErrorContextCallback plerrcontext;
  	int			i;
  	int			rc;
+ 	int	*conversion_map = NULL;
+ 	bool	needs_conversion = false;
  
  	/*
  	 * Setup the execution state
***************
*** 388,394 ****
  			{
  				case TYPEFUNC_COMPOSITE:
  					/* got the expected result rowtype, now check it */
! 					validate_tupdesc_compat(tupdesc, estate.rettupdesc,
  											"returned record type does not match expected record type");
  					break;
  				case TYPEFUNC_RECORD:
--- 394,400 ----
  			{
  				case TYPEFUNC_COMPOSITE:
  					/* got the expected result rowtype, now check it */
! 					conversion_map = validate_tupdesc_compat(tupdesc, estate.rettupdesc, &needs_conversion,
  											"returned record type does not match expected record type");
  					break;
  				case TYPEFUNC_RECORD:
***************
*** 414,422 ****
  			 * Copy tuple to upper executor memory, as a tuple Datum. Make
  			 * sure it is labeled with the caller-supplied tuple type.
  			 */
! 			estate.retval =
! 				PointerGetDatum(SPI_returntuple((HeapTuple) DatumGetPointer(estate.retval),
  												tupdesc));
  		}
  		else
  		{
--- 420,437 ----
  			 * Copy tuple to upper executor memory, as a tuple Datum. Make
  			 * sure it is labeled with the caller-supplied tuple type.
  			 */
! 			if (!needs_conversion)
! 				estate.retval =
! 					PointerGetDatum(SPI_returntuple((HeapTuple) DatumGetPointer(estate.retval),
  												tupdesc));
+ 			else
+ 			{
+ 				HeapTuple tuple = (HeapTuple) DatumGetPointer(estate.retval);
+ 				tuple = convert_tuple(tupdesc, estate.rettupdesc, conversion_map, tuple);
+ 				estate.retval = 
+ 					PointerGetDatum(SPI_returntuple(tuple, tupdesc));
+ 				pfree(conversion_map);
+ 			}
  		}
  		else
  		{
***************
*** 486,491 ****
--- 501,508 ----
  	PLpgSQL_rec *rec_new,
  			   *rec_old;
  	HeapTuple	rettup;
+ 	int		*conversion_map;
+ 	bool		needs_conversion;
  
  	/*
  	 * Setup the execution state
***************
*** 705,715 ****
  		rettup = NULL;
  	else
  	{
! 		validate_tupdesc_compat(trigdata->tg_relation->rd_att,
! 								estate.rettupdesc,
  								"returned row structure does not match the structure of the triggering table");
  		/* Copy tuple to upper executor memory */
! 		rettup = SPI_copytuple((HeapTuple) DatumGetPointer(estate.retval));
  	}
  
  	/*
--- 722,741 ----
  		rettup = NULL;
  	else
  	{
! 		conversion_map = validate_tupdesc_compat(trigdata->tg_relation->rd_att,
! 								estate.rettupdesc, &needs_conversion,
  								"returned row structure does not match the structure of the triggering table");
  		/* Copy tuple to upper executor memory */
! 		if (!needs_conversion)
! 			rettup = SPI_copytuple((HeapTuple) DatumGetPointer(estate.retval));
! 		else
! 		{
! 			HeapTuple tuple = (HeapTuple) DatumGetPointer(estate.retval);
! 			tuple = convert_tuple(trigdata->tg_relation->rd_att,
! 						    estate.rettupdesc, conversion_map, tuple);
! 			rettup = SPI_copytuple(tuple);
! 			pfree(conversion_map);
! 		}
  	}
  
  	/*
***************
*** 2191,2196 ****
--- 2217,2224 ----
  			case PLPGSQL_DTYPE_REC:
  				{
  					PLpgSQL_rec *rec = (PLpgSQL_rec *) retvar;
+ 					int	*conversion_map;
+ 					bool	needs_conversion;
  
  					if (!HeapTupleIsValid(rec->tup))
  						ereport(ERROR,
***************
*** 2199,2207 ****
  								  rec->refname),
  						errdetail("The tuple structure of a not-yet-assigned"
  								  " record is indeterminate.")));
! 					validate_tupdesc_compat(tupdesc, rec->tupdesc,
! 								"wrong record type supplied in RETURN NEXT");
! 					tuple = rec->tup;
  				}
  				break;
  
--- 2227,2241 ----
  								  rec->refname),
  						errdetail("The tuple structure of a not-yet-assigned"
  								  " record is indeterminate.")));
! 					conversion_map = validate_tupdesc_compat(tupdesc, rec->tupdesc, &needs_conversion,
! 									"wrong record type supplied in RETURN NEXT");
! 					if (!needs_conversion)
! 						tuple = rec->tup;
! 					else
! 					{
! 						tuple = convert_tuple(tupdesc, rec->tupdesc, conversion_map, rec->tup);
! 						pfree(conversion_map);
! 					}
  				}
  				break;
  
***************
*** 2285,2290 ****
--- 2319,2332 ----
  {
  	Portal		portal;
  	uint32		processed = 0;
+ 	int		*conversion_map;
+ 	bool		needs_conversion;
+ 	int	returned_natts;
+ 	int	expected_natts;
+ 	Datum		*returned_values;
+ 	Datum		*expected_values;
+ 	bool		*returned_nulls;
+ 	bool		*expected_nulls;
  
  	if (!estate->retisset)
  		ereport(ERROR,
***************
*** 2307,2340 ****
  										   stmt->params);
  	}
  
! 	validate_tupdesc_compat(estate->rettupdesc, portal->tupDesc,
  				   "structure of query does not match function result type");
  
  	while (true)
  	{
  		MemoryContext old_cxt;
- 		int			i;
  
  		SPI_cursor_fetch(portal, true, 50);
  		if (SPI_processed == 0)
  			break;
  
! 		old_cxt = MemoryContextSwitchTo(estate->tuple_store_cxt);
! 		for (i = 0; i < SPI_processed; i++)
! 		{
! 			HeapTuple	tuple = SPI_tuptable->vals[i];
  
! 			tuplestore_puttuple(estate->tuple_store, tuple);
! 			processed++;
  		}
- 		MemoryContextSwitchTo(old_cxt);
- 
- 		SPI_freetuptable(SPI_tuptable);
  	}
  
  	SPI_freetuptable(SPI_tuptable);
  	SPI_cursor_close(portal);
  
  	estate->eval_processed = processed;
  	exec_set_found(estate, processed != 0);
  
--- 2349,2442 ----
  										   stmt->params);
  	}
  
! 	conversion_map = validate_tupdesc_compat(estate->rettupdesc, portal->tupDesc, &needs_conversion,
  				   "structure of query does not match function result type");
+ 				   
+ 	/* 
+ 	 * Prepare space for tuple conversion, when it is necessary - when
+ 	 * returned or evaluated tuple has dropped columns.
+ 	 */
+ 	if (needs_conversion)
+ 	{
+ 		int		i;
+ 	
+ 		returned_natts= portal->tupDesc->natts;
+ 		expected_natts = estate->rettupdesc->natts;
+ 		
+ 		returned_values = (Datum *) palloc(returned_natts * sizeof(Datum));
+ 		returned_nulls = (bool *) palloc(returned_natts * sizeof(bool));
+ 		expected_values = (Datum *) palloc(expected_natts * sizeof(Datum));
+ 		expected_nulls = (bool *) palloc(expected_natts * sizeof(bool));
+ 		
+ 		for (i = 0; i < expected_natts; i++)
+ 			expected_nulls[i] = true;
+ 	}
  
  	while (true)
  	{
  		MemoryContext old_cxt;
  
  		SPI_cursor_fetch(portal, true, 50);
  		if (SPI_processed == 0)
  			break;
  
! 		CHECK_FOR_INTERRUPTS();
  
! 		/* when conversion is not necessary, then store tuples in tuple store */
! 		if (!needs_conversion)
! 		{
! 			int	i;
! 			
! 			old_cxt = MemoryContextSwitchTo(estate->tuple_store_cxt);
! 			for (i = 0; i < SPI_processed; i++)
! 			{
! 				tuplestore_puttuple(estate->tuple_store, SPI_tuptable->vals[i]);
! 				processed++;
! 			}
! 			MemoryContextSwitchTo(old_cxt);
! 		}
! 		else
! 		{
! 			int	i;
! 			int		j;
! 			HeapTuple	tuple;
! 			
! 			/* convert tuple and store it in tuple store */
! 			for (i = 0; i < SPI_processed; i++)
! 			{
! 				/* Deconstruct the tuple */
! 				heap_deform_tuple(SPI_tuptable->vals[i], portal->tupDesc, 
! 										returned_values, 
! 										returned_nulls);
! 				for (j = 0; j < expected_natts; j++)
! 					if (conversion_map[j] != 0)
! 					{
! 						expected_values[j] = returned_values[conversion_map[j] - 1];
! 						expected_nulls[j] = returned_nulls[conversion_map[j] - 1];
! 					}
! 					/* expected_nulls for dropped atts are true allways */
! 			
! 				tuple = heap_form_tuple(estate->rettupdesc, expected_values, expected_nulls);
! 				old_cxt = MemoryContextSwitchTo(estate->tuple_store_cxt);
! 				tuplestore_puttuple(estate->tuple_store, tuple);
! 				MemoryContextSwitchTo(old_cxt);
! 				processed++;
! 			}
  		}
  	}
  
  	SPI_freetuptable(SPI_tuptable);
  	SPI_cursor_close(portal);
  
+ 	if (needs_conversion)
+ 	{
+ 		pfree(expected_values);
+ 		pfree(expected_nulls);
+ 		pfree(returned_values);
+ 		pfree(returned_nulls);
+ 		pfree(conversion_map);
+ 	}
+ 
  	estate->eval_processed = processed;
  	exec_set_found(estate, processed != 0);
  
***************
*** 5121,5162 ****
  }
  
  /*
!  * Validates compatibility of supplied TupleDesc pair by checking number and type
!  * of attributes.
   */
! static void
! validate_tupdesc_compat(TupleDesc expected, TupleDesc returned, const char *msg)
  {
! 	int			i;
! 	const char *dropped_column_type = gettext_noop("N/A (dropped column)");
  
  	if (!expected || !returned)
  		ereport(ERROR,
  				(errcode(ERRCODE_DATATYPE_MISMATCH),
  				 errmsg("%s", _(msg))));
  
! 	if (expected->natts != returned->natts)
  		ereport(ERROR,
  				(errcode(ERRCODE_DATATYPE_MISMATCH),
  				 errmsg("%s", _(msg)),
  				 errdetail("Number of returned columns (%d) does not match "
  						   "expected column count (%d).",
! 						   returned->natts, expected->natts)));
  
! 	for (i = 0; i < expected->natts; i++)
! 		if (expected->attrs[i]->atttypid != returned->attrs[i]->atttypid)
! 			ereport(ERROR,
! 					(errcode(ERRCODE_DATATYPE_MISMATCH),
! 					 errmsg("%s", _(msg)),
! 				   errdetail("Returned type %s does not match expected type "
! 							 "%s in column \"%s\".",
! 							 OidIsValid(returned->attrs[i]->atttypid) ?
! 							 format_type_be(returned->attrs[i]->atttypid) :
! 							 _(dropped_column_type),
! 							 OidIsValid(expected->attrs[i]->atttypid) ?
! 							 format_type_be(expected->attrs[i]->atttypid) :
! 							 _(dropped_column_type),
! 							 NameStr(expected->attrs[i]->attname))));
  }
  
  /* ----------
--- 5223,5360 ----
  }
  
  /*
!  * convert_tuple function transforms tuple format described in src_desc to
!  * format described in dest_desc descriptor. These descriptors are compatible. One 
!  * allowed difference are dropped attribs.
   */
! static HeapTuple
! convert_tuple(TupleDesc dest_desc, TupleDesc src_desc, int *conversion_map, HeapTuple tuple)
  {
! 	int	snatts = src_desc->natts;
! 	int	dnatts = dest_desc->natts;
! 	Datum	*src_values;
! 	bool	*src_nulls;
! 	Datum		*dest_values;
! 	bool		*dest_nulls;
! 	int		i;
! 	HeapTuple   result;
! 	
! 	src_values = (Datum *) palloc0 (snatts * sizeof(Datum));
! 	src_nulls = (bool *) palloc(snatts * sizeof(bool));
! 	dest_values = (Datum *) palloc0 (dnatts * sizeof(Datum));
! 	dest_nulls = (bool *) palloc(dnatts * sizeof(Datum));
! 	
! 	heap_deform_tuple(tuple, src_desc, src_values, src_nulls);
! 	for (i = 0; i < dnatts; i++)
! 	{
! 		if (conversion_map[i] != 0)
! 		{
! 			dest_values[i] = src_values[conversion_map[i] - 1];
! 			dest_nulls[i] = src_nulls[conversion_map[i] - 1];
! 		}
! 		else
! 			dest_nulls[i] = true;
! 	}
! 	
! 	result = heap_form_tuple(dest_desc, dest_values, dest_nulls);
! 	
! 	pfree(src_values);
! 	pfree(src_nulls);
! 	pfree(dest_values);
! 	pfree(dest_nulls);
! 	
! 	return result;
! }
  
+ 
+ /*
+  * Validates compatibility of supplied TupleDesc pair by checking number and type
+  * of attributes. When TupDescs has dropped columns, then needs_translation flag is
+  * true and function returns array of source tuple field's positions in destionation
+  * tuple. Start is from 1. Zero means null for dropped attribute.
+  */
+ static int *
+ validate_tupdesc_compat(TupleDesc expected, TupleDesc returned, 
+ 								    bool *needs_conversion, 
+ 								    const char *msg)
+ {
+ 	int			i;
+ 	int		j = 0;
+ 	int *result = NULL;
+ 	int valid_expected_natts;
+ 	int valid_returned_natts;
+ 	int	conversion_map[FUNC_MAX_ARGS];
+ 	
  	if (!expected || !returned)
  		ereport(ERROR,
  				(errcode(ERRCODE_DATATYPE_MISMATCH),
  				 errmsg("%s", _(msg))));
  
! 	valid_expected_natts = expected->natts;
! 	valid_returned_natts = returned->natts;
! 
! 	for (i = 0; i < expected->natts; i++)
! 	{
! 		if (!expected->attrs[i]->attisdropped)
! 		{
! 			/* find first valid returned att */
! 			while (j < returned->natts && returned->attrs[j]->attisdropped)
! 			{
! 				valid_returned_natts--;
! 				j++;
! 			}
! 			/* validate att pair only when j is valid */
! 			if (j < returned->natts)
! 			{
! 				Assert(!expected->attrs[i]->attisdropped);
! 				Assert(!returned->attrs[j]->attisdropped);
! 				
! 				if (expected->attrs[i]->atttypid != returned->attrs[j]->atttypid)
! 					ereport(ERROR,
! 							(errcode(ERRCODE_DATATYPE_MISMATCH),
! 							 errmsg("%s", _(msg)),
! 							 errdetail("Returned type %s does not match expected type "
! 								"%s in column \"%s\".",
! 									format_type_be(returned->attrs[j]->atttypid),
! 									format_type_be(expected->attrs[i]->atttypid),
! 									NameStr(expected->attrs[i]->attname))));
! 				conversion_map[i] = j+1;
! 				j++;
! 			}
! 			/* Cannot break this cycle - needs correct valid_expected_natts for err msg */
! 		}
! 		else
! 		{
! 			valid_expected_natts--;
! 			conversion_map[i] = 0;
! 		}
! 	}
! 
! 	/* complete valid_returned_natts evaluation */
! 	while (j < returned->natts)
! 	{
! 		if (returned->attrs[j++]->attisdropped)
! 			valid_returned_natts--;
! 	}
! 	
! 	if (valid_expected_natts != valid_returned_natts)
  		ereport(ERROR,
  				(errcode(ERRCODE_DATATYPE_MISMATCH),
  				 errmsg("%s", _(msg)),
  				 errdetail("Number of returned columns (%d) does not match "
  						   "expected column count (%d).",
! 						   valid_returned_natts, valid_expected_natts)));
  
! 	if (valid_expected_natts != expected->natts || valid_returned_natts != returned->natts)
! 	{
! 		*needs_conversion = true;
! 		result = (int *) palloc(expected->natts * sizeof(int));
! 		memcpy(result, conversion_map, expected->natts * sizeof(int));
! 	}
! 	else
! 		*needs_conversion = false;
! 
! 	return result;
  }
  
  /* ----------
*** ./src/test/regress/expected/plpgsql.out.orig	2009-08-02 14:56:35.000000000 +0200
--- ./src/test/regress/expected/plpgsql.out	2009-08-03 21:39:40.000000000 +0200
***************
*** 3287,3292 ****
--- 3287,3368 ----
  (4 rows)
  
  drop function return_dquery();
+ -- fix return query and dropped columns bug
+ create table tabwithcols(a int, b int, c int, d int);
+ insert into tabwithcols values(10,20,30,40),(50,60,70,80);
+ create or replace function returnqueryf()
+ returns setof tabwithcols as $$
+ begin
+   return query select * from tabwithcols;
+   return query execute 'select * from tabwithcols';
+ end;
+ $$ language plpgsql;
+ select * from returnqueryf();
+  a  | b  | c  | d  
+ ----+----+----+----
+  10 | 20 | 30 | 40
+  50 | 60 | 70 | 80
+  10 | 20 | 30 | 40
+  50 | 60 | 70 | 80
+ (4 rows)
+ 
+ alter table tabwithcols drop column b;
+ alter table tabwithcols drop column c;
+ select * from returnqueryf();
+  a  | d  
+ ----+----
+  10 | 40
+  50 | 80
+  10 | 40
+  50 | 80
+ (4 rows)
+ 
+ alter table tabwithcols drop column d;
+ select * from returnqueryf();
+  a  
+ ----
+  10
+  50
+  10
+  50
+ (4 rows)
+ 
+ alter table tabwithcols add column d int;
+ select * from returnqueryf();
+  a  | d 
+ ----+---
+  10 |  
+  50 |  
+  10 |  
+  50 |  
+ (4 rows)
+ 
+ -- better to be sure we don't break anything else
+ alter table tabwithcols add column b date;
+ create function trigger_function() returns trigger as $$
+ begin
+  new.b = to_date('2009-08-03','YYYY-MM-DD') + new.a;
+  return new;
+ end;
+ $$language plpgsql;
+ create trigger trg_ins_tabwithcols before insert on tabwithcols
+ for each row execute procedure trigger_function();
+ insert into tabwithcols(a, d) values(30,30),(60,70);
+ select * from returnqueryf();
+  a  | d  |     b      
+ ----+----+------------
+  10 |    | 
+  50 |    | 
+  30 | 30 | 09-02-2009
+  60 | 70 | 10-02-2009
+  10 |    | 
+  50 |    | 
+  30 | 30 | 09-02-2009
+  60 | 70 | 10-02-2009
+ (8 rows)
+ 
+ drop function returnqueryf();
+ drop table tabwithcols;
  -- Tests for 8.4's new RAISE features
  create or replace function raise_test() returns void as $$
  begin
*** ./src/test/regress/sql/plpgsql.sql.orig	2009-08-02 15:54:00.402438910 +0200
--- ./src/test/regress/sql/plpgsql.sql	2009-08-03 21:38:25.753894296 +0200
***************
*** 2684,2689 ****
--- 2684,2735 ----
  
  drop function return_dquery();
  
+ -- fix return query and dropped columns bug
+ create table tabwithcols(a int, b int, c int, d int);
+ insert into tabwithcols values(10,20,30,40),(50,60,70,80);
+ 
+ create or replace function returnqueryf()
+ returns setof tabwithcols as $$
+ begin
+   return query select * from tabwithcols;
+   return query execute 'select * from tabwithcols';
+ end;
+ $$ language plpgsql;
+ 
+ select * from returnqueryf();
+ 
+ alter table tabwithcols drop column b;
+ alter table tabwithcols drop column c;
+ 
+ select * from returnqueryf();
+ 
+ alter table tabwithcols drop column d;
+ 
+ select * from returnqueryf();
+ 
+ alter table tabwithcols add column d int;
+ 
+ select * from returnqueryf();
+ 
+ -- better to be sure we don't break anything else
+ alter table tabwithcols add column b date;
+ 
+ create function trigger_function() returns trigger as $$
+ begin
+  new.b = to_date('2009-08-03','YYYY-MM-DD') + new.a;
+  return new;
+ end;
+ $$language plpgsql;
+ 
+ create trigger trg_ins_tabwithcols before insert on tabwithcols
+ for each row execute procedure trigger_function();
+ insert into tabwithcols(a, d) values(30,30),(60,70);
+ 
+ select * from returnqueryf();
+ 
+ drop function returnqueryf();
+ drop table tabwithcols;
+ 
  -- Tests for 8.4's new RAISE features
  
  create or replace function raise_test() returns void as $$
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to