Here is an example:

postgres=# create table col_desc (a int, b int) partition by list (a);
postgres=# create table col_desc_1 partition of col_desc for values in (1);
postgres=# alter table col_desc_1 add check (b > 0);
postgres=# create role col_desc_user;
postgres=# grant insert on col_desc to col_desc_user;
postgres=# revoke select on col_desc from col_desc_user;
postgres=# set role col_desc_user;
postgres=> insert into col_desc values (1, -1) returning 1;
ERROR: new row for relation "col_desc_1" violates check constraint "col_desc_1_b_check"
DETAIL:  Failing row contains (a, b) = (1, -1).

Looks good, but

postgres=> reset role;
postgres=# create table result (f1 text default 'foo', f2 text default 'bar', f3 int);
postgres=# grant insert on result to col_desc_user;
postgres=# set role col_desc_user;
postgres=> with t as (insert into col_desc values (1, -1) returning 1) insert into result (f3) select * from t; ERROR: new row for relation "col_desc_1" violates check constraint "col_desc_1_b_check"

Looks odd to me because the error message doesn't show any DETAIL info; since the CTE query, which produces the message, is the same as the above query, the message should also be the same as the one for the above query. I think the reason for that is: ExecConstraints looks at an incorrect resultRelInfo to build the message for a violating tuple that has been routed *in the case where the partitioned table isn't the primary ModifyTable node*, which leads to deriving an incorrect modifiedCols and then invoking ExecBuildSlotValueDescription with the wrong bitmap. I think this should be fixed. Attached is a patch for that.

Best regards,
Etsuro Fujita
*** a/src/backend/executor/execMain.c
--- b/src/backend/executor/execMain.c
***************
*** 92,97 **** static bool ExecCheckRTEPermsModified(Oid relOid, Oid userid,
--- 92,99 ----
                                                  Bitmapset *modifiedCols,
                                                  AclMode requiredPerms);
  static void ExecCheckXactReadOnly(PlannedStmt *plannedstmt);
+ static ResultRelInfo *ExecFindResultRelInfo(EState *estate, Oid relid,
+                                                                               
        bool missing_ok);
  static char *ExecBuildSlotValueDescription(Oid reloid,
                                                          TupleTableSlot *slot,
                                                          TupleDesc tupdesc,
***************
*** 1360,1365 **** InitResultRelInfo(ResultRelInfo *resultRelInfo,
--- 1362,1392 ----
  }
  
  /*
+  * ExecFindResultRelInfo -- find the ResultRelInfo struct for given relation 
OID
+  *
+  * If no such struct, either return NULL or throw error depending on 
missing_ok
+  */
+ static ResultRelInfo *
+ ExecFindResultRelInfo(EState *estate, Oid relid, bool missing_ok)
+ {
+       ResultRelInfo *rInfo;
+       int                     nr;
+ 
+       rInfo = estate->es_result_relations;
+       nr = estate->es_num_result_relations;
+       while (nr > 0)
+       {
+               if (RelationGetRelid(rInfo->ri_RelationDesc) == relid)
+                       return rInfo;
+               rInfo++;
+               nr--;
+       }
+       if (!missing_ok)
+               elog(ERROR, "failed to find ResultRelInfo for relation %u", 
relid);
+       return NULL;
+ }
+ 
+ /*
   *            ExecGetTriggerResultRel
   *
   * Get a ResultRelInfo for a trigger target relation.  Most of the time,
***************
*** 1379,1399 **** ResultRelInfo *
  ExecGetTriggerResultRel(EState *estate, Oid relid)
  {
        ResultRelInfo *rInfo;
-       int                     nr;
        ListCell   *l;
        Relation        rel;
        MemoryContext oldcontext;
  
        /* First, search through the query result relations */
!       rInfo = estate->es_result_relations;
!       nr = estate->es_num_result_relations;
!       while (nr > 0)
!       {
!               if (RelationGetRelid(rInfo->ri_RelationDesc) == relid)
!                       return rInfo;
!               rInfo++;
!               nr--;
!       }
        /* Nope, but maybe we already made an extra ResultRelInfo for it */
        foreach(l, estate->es_trig_target_relations)
        {
--- 1406,1419 ----
  ExecGetTriggerResultRel(EState *estate, Oid relid)
  {
        ResultRelInfo *rInfo;
        ListCell   *l;
        Relation        rel;
        MemoryContext oldcontext;
  
        /* First, search through the query result relations */
!       rInfo = ExecFindResultRelInfo(estate, relid, true);
!       if (rInfo)
!               return rInfo;
        /* Nope, but maybe we already made an extra ResultRelInfo for it */
        foreach(l, estate->es_trig_target_relations)
        {
***************
*** 1828,1833 **** ExecPartitionCheck(ResultRelInfo *resultRelInfo, 
TupleTableSlot *slot,
--- 1848,1854 ----
                                   EState *estate)
  {
        Relation        rel = resultRelInfo->ri_RelationDesc;
+       Oid                     relid = RelationGetRelid(rel);
        TupleDesc       tupdesc = RelationGetDescr(rel);
        Bitmapset  *modifiedCols;
        Bitmapset  *insertedCols;
***************
*** 1870,1877 **** ExecPartitionCheck(ResultRelInfo *resultRelInfo, 
TupleTableSlot *slot,
--- 1891,1900 ----
                        HeapTuple       tuple = ExecFetchSlotTuple(slot);
                        TupleDesc       old_tupdesc = RelationGetDescr(rel);
                        TupleConversionMap *map;
+                       ResultRelInfo *rInfo;
  
                        rel = resultRelInfo->ri_PartitionRoot;
+                       relid = RelationGetRelid(rel);
                        tupdesc = RelationGetDescr(rel);
                        /* a reverse map */
                        map = convert_tuples_by_name(old_tupdesc, tupdesc,
***************
*** 1881,1892 **** ExecPartitionCheck(ResultRelInfo *resultRelInfo, 
TupleTableSlot *slot,
                                tuple = do_convert_tuple(tuple, map);
                                ExecStoreTuple(tuple, slot, InvalidBuffer, 
false);
                        }
                }
- 
-               insertedCols = GetInsertedColumns(resultRelInfo, estate);
-               updatedCols = GetUpdatedColumns(resultRelInfo, estate);
                modifiedCols = bms_union(insertedCols, updatedCols);
!               val_desc = ExecBuildSlotValueDescription(RelationGetRelid(rel),
                                                                                
                 slot,
                                                                                
                 tupdesc,
                                                                                
                 modifiedCols,
--- 1904,1921 ----
                                tuple = do_convert_tuple(tuple, map);
                                ExecStoreTuple(tuple, slot, InvalidBuffer, 
false);
                        }
+                       /* Find the root table's ResultRelInfo */
+                       rInfo = ExecFindResultRelInfo(estate, relid, false);
+                       insertedCols = GetInsertedColumns(rInfo, estate);
+                       updatedCols = GetUpdatedColumns(rInfo, estate);
+               }
+               else
+               {
+                       insertedCols = GetInsertedColumns(resultRelInfo, 
estate);
+                       updatedCols = GetUpdatedColumns(resultRelInfo, estate);
                }
                modifiedCols = bms_union(insertedCols, updatedCols);
!               val_desc = ExecBuildSlotValueDescription(relid,
                                                                                
                 slot,
                                                                                
                 tupdesc,
                                                                                
                 modifiedCols,
***************
*** 1914,1919 **** ExecConstraints(ResultRelInfo *resultRelInfo,
--- 1943,1949 ----
                                TupleTableSlot *slot, EState *estate)
  {
        Relation        rel = resultRelInfo->ri_RelationDesc;
+       Oid                     relid = RelationGetRelid(rel);
        TupleDesc       tupdesc = RelationGetDescr(rel);
        TupleConstr *constr = tupdesc->constr;
        Bitmapset  *modifiedCols;
***************
*** 1947,1954 **** ExecConstraints(ResultRelInfo *resultRelInfo,
--- 1977,1986 ----
                                {
                                        HeapTuple       tuple = 
ExecFetchSlotTuple(slot);
                                        TupleConversionMap *map;
+                                       ResultRelInfo *rInfo;
  
                                        rel = resultRelInfo->ri_PartitionRoot;
+                                       relid = RelationGetRelid(rel);
                                        tupdesc = RelationGetDescr(rel);
                                        /* a reverse map */
                                        map = 
convert_tuples_by_name(orig_tupdesc, tupdesc,
***************
*** 1958,1969 **** ExecConstraints(ResultRelInfo *resultRelInfo,
                                                tuple = do_convert_tuple(tuple, 
map);
                                                ExecStoreTuple(tuple, slot, 
InvalidBuffer, false);
                                        }
                                }
- 
-                               insertedCols = 
GetInsertedColumns(resultRelInfo, estate);
-                               updatedCols = GetUpdatedColumns(resultRelInfo, 
estate);
                                modifiedCols = bms_union(insertedCols, 
updatedCols);
!                               val_desc = 
ExecBuildSlotValueDescription(RelationGetRelid(rel),
                                                                                
                                 slot,
                                                                                
                                 tupdesc,
                                                                                
                                 modifiedCols,
--- 1990,2007 ----
                                                tuple = do_convert_tuple(tuple, 
map);
                                                ExecStoreTuple(tuple, slot, 
InvalidBuffer, false);
                                        }
+                                       /* Find the root table's ResultRelInfo 
*/
+                                       rInfo = ExecFindResultRelInfo(estate, 
relid, false);
+                                       insertedCols = 
GetInsertedColumns(rInfo, estate);
+                                       updatedCols = GetUpdatedColumns(rInfo, 
estate);
+                               }
+                               else
+                               {
+                                       insertedCols = 
GetInsertedColumns(resultRelInfo, estate);
+                                       updatedCols = 
GetUpdatedColumns(resultRelInfo, estate);
                                }
                                modifiedCols = bms_union(insertedCols, 
updatedCols);
!                               val_desc = ExecBuildSlotValueDescription(relid,
                                                                                
                                 slot,
                                                                                
                                 tupdesc,
                                                                                
                                 modifiedCols,
***************
*** 1994,2001 **** ExecConstraints(ResultRelInfo *resultRelInfo,
--- 2032,2041 ----
                                HeapTuple       tuple = 
ExecFetchSlotTuple(slot);
                                TupleDesc       old_tupdesc = 
RelationGetDescr(rel);
                                TupleConversionMap *map;
+                               ResultRelInfo *rInfo;
  
                                rel = resultRelInfo->ri_PartitionRoot;
+                               relid = RelationGetRelid(rel);
                                tupdesc = RelationGetDescr(rel);
                                /* a reverse map */
                                map = convert_tuples_by_name(old_tupdesc, 
tupdesc,
***************
*** 2005,2016 **** ExecConstraints(ResultRelInfo *resultRelInfo,
                                        tuple = do_convert_tuple(tuple, map);
                                        ExecStoreTuple(tuple, slot, 
InvalidBuffer, false);
                                }
                        }
- 
-                       insertedCols = GetInsertedColumns(resultRelInfo, 
estate);
-                       updatedCols = GetUpdatedColumns(resultRelInfo, estate);
                        modifiedCols = bms_union(insertedCols, updatedCols);
!                       val_desc = 
ExecBuildSlotValueDescription(RelationGetRelid(rel),
                                                                                
                         slot,
                                                                                
                         tupdesc,
                                                                                
                         modifiedCols,
--- 2045,2062 ----
                                        tuple = do_convert_tuple(tuple, map);
                                        ExecStoreTuple(tuple, slot, 
InvalidBuffer, false);
                                }
+                               /* Find the root table's ResultRelInfo */
+                               rInfo = ExecFindResultRelInfo(estate, relid, 
false);
+                               insertedCols = GetInsertedColumns(rInfo, 
estate);
+                               updatedCols = GetUpdatedColumns(rInfo, estate);
+                       }
+                       else
+                       {
+                               insertedCols = 
GetInsertedColumns(resultRelInfo, estate);
+                               updatedCols = GetUpdatedColumns(resultRelInfo, 
estate);
                        }
                        modifiedCols = bms_union(insertedCols, updatedCols);
!                       val_desc = ExecBuildSlotValueDescription(relid,
                                                                                
                         slot,
                                                                                
                         tupdesc,
                                                                                
                         modifiedCols,
*** a/src/test/regress/expected/insert.out
--- b/src/test/regress/expected/insert.out
***************
*** 506,510 **** DETAIL:  Failing row contains (2, hi there).
--- 506,528 ----
  insert into brtrigpartcon1 values (1, 'hi there');
  ERROR:  new row for relation "brtrigpartcon1" violates partition constraint
  DETAIL:  Failing row contains (2, hi there).
+ -- check that the message shows the appropriate column description in a
+ -- situation where the partitioned table is not the primary ModifyTable node
+ create table inserttest3 (f1 text default 'foo', f2 text default 'bar', f3 
int);
+ create role regress_coldesc_role;
+ grant insert on inserttest3 to regress_coldesc_role;
+ grant insert on brtrigpartcon to regress_coldesc_role;
+ revoke select on brtrigpartcon from regress_coldesc_role;
+ set role regress_coldesc_role;
+ with result as (insert into brtrigpartcon values (1, 'hi there') returning 1)
+   insert into inserttest3 (f3) select * from result;
+ ERROR:  new row for relation "brtrigpartcon1" violates partition constraint
+ DETAIL:  Failing row contains (a, b) = (2, hi there).
+ reset role;
+ -- cleanup
+ revoke all on inserttest3 from regress_coldesc_role;
+ revoke all on brtrigpartcon from regress_coldesc_role;
+ drop role regress_coldesc_role;
+ drop table inserttest3;
  drop table brtrigpartcon;
  drop function brtrigpartcon1trigf();
*** a/src/test/regress/sql/insert.sql
--- b/src/test/regress/sql/insert.sql
***************
*** 340,344 **** create or replace function brtrigpartcon1trigf() returns 
trigger as $$begin new.
--- 340,362 ----
  create trigger brtrigpartcon1trig before insert on brtrigpartcon1 for each 
row execute procedure brtrigpartcon1trigf();
  insert into brtrigpartcon values (1, 'hi there');
  insert into brtrigpartcon1 values (1, 'hi there');
+ 
+ -- check that the message shows the appropriate column description in a
+ -- situation where the partitioned table is not the primary ModifyTable node
+ create table inserttest3 (f1 text default 'foo', f2 text default 'bar', f3 
int);
+ create role regress_coldesc_role;
+ grant insert on inserttest3 to regress_coldesc_role;
+ grant insert on brtrigpartcon to regress_coldesc_role;
+ revoke select on brtrigpartcon from regress_coldesc_role;
+ set role regress_coldesc_role;
+ with result as (insert into brtrigpartcon values (1, 'hi there') returning 1)
+   insert into inserttest3 (f3) select * from result;
+ reset role;
+ 
+ -- cleanup
+ revoke all on inserttest3 from regress_coldesc_role;
+ revoke all on brtrigpartcon from regress_coldesc_role;
+ drop role regress_coldesc_role;
+ drop table inserttest3;
  drop table brtrigpartcon;
  drop function brtrigpartcon1trigf();
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to