Here's my current work in progress for 8.1 devel related to fixing the
timing issues with referential actions having their checks run on
intermediate states.  I've only put in a simple test that failed against
8.0 in the regression patch and regression still passes for me.  There's
still an outstanding question of whether looping gives the correct result
in the presence of explicit inserts and set constraints immediate in
before triggers.
Index: src/backend/commands/trigger.c
===================================================================
RCS file: /projects/cvsroot/pgsql/src/backend/commands/trigger.c,v
retrieving revision 1.193
diff -c -r1.193 trigger.c
*** src/backend/commands/trigger.c      23 Aug 2005 22:40:08 -0000      1.193
--- src/backend/commands/trigger.c      24 Aug 2005 03:24:48 -0000
***************
*** 2281,2286 ****
--- 2280,2286 ----
        TriggerDesc *trigdesc = NULL;
        FmgrInfo   *finfo = NULL;
        Instrumentation *instr = NULL;
+       bool last_relation_loaded = false;
  
        /* Make a per-tuple memory context for trigger function calls */
        per_tuple_context =
***************
*** 2309,2314 ****
--- 2309,2328 ----
                         */
                        if (rel == NULL || rel->rd_id != event->ate_relid)
                        {
+                               bool found_relation_in_estate = false;
+                               if (last_relation_loaded) {
+                                       if (rel)
+                                               heap_close(rel, NoLock);
+                                       if (trigdesc)
+                                               FreeTriggerDesc(trigdesc);
+                                       if (finfo)
+                                               pfree(finfo);
+                                       Assert(instr == NULL);  /* never used 
in this case */
+                                       rel = NULL;
+                                       trigdesc = NULL;
+                                       finfo = NULL;
+                               }
+ 
                                if (estate)
                                {
                                        /* Find target relation among estate's 
result rels */
***************
*** 2324,2348 ****
                                                rInfo++;
                                                nr--;
                                        }
!                                       if (nr <= 0)                            
/* should not happen */
!                                               elog(ERROR, "could not find 
relation %u among query result relations",
!                                                        event->ate_relid);
!                                       rel = rInfo->ri_RelationDesc;
!                                       trigdesc = rInfo->ri_TrigDesc;
!                                       finfo = rInfo->ri_TrigFunctions;
!                                       instr = rInfo->ri_TrigInstrument;
                                }
-                               else
-                               {
-                                       /* Hard way: we manage the resources 
for ourselves */
-                                       if (rel)
-                                               heap_close(rel, NoLock);
-                                       if (trigdesc)
-                                               FreeTriggerDesc(trigdesc);
-                                       if (finfo)
-                                               pfree(finfo);
-                                       Assert(instr == NULL);  /* never used 
in this case */
  
                                        /*
                                         * We assume that an appropriate lock 
is still held by
                                         * the executor, so grab no new lock 
here.
--- 2338,2355 ----
                                                rInfo++;
                                                nr--;
                                        }
!                                       if (nr > 0) 
!                                       {
!                                               rel = rInfo->ri_RelationDesc;
!                                               trigdesc = rInfo->ri_TrigDesc;
!                                               finfo = rInfo->ri_TrigFunctions;
!                                               instr = 
rInfo->ri_TrigInstrument;
!                                               found_relation_in_estate = true;
!                                       }
                                }
  
+                               if (!found_relation_in_estate) 
+                               {
                                        /*
                                         * We assume that an appropriate lock 
is still held by
                                         * the executor, so grab no new lock 
here.
***************
*** 2367,2372 ****
--- 2374,2386 ----
                                                palloc0(trigdesc->numtriggers * 
sizeof(FmgrInfo));
  
                                        /* Never any EXPLAIN info in this case 
*/
+                                       instr = NULL;
+ 
+                                       last_relation_loaded = true;
+                               }
+                               else 
+                               {
+                                       last_relation_loaded = false;
                                }
                        }
  
***************
*** 2417,2423 ****
        events->tail = prev_event;
  
        /* Release working resources */
!       if (!estate)
        {
                if (rel)
                        heap_close(rel, NoLock);
--- 2431,2437 ----
        events->tail = prev_event;
  
        /* Release working resources */
!       if (last_relation_loaded)
        {
                if (rel)
                        heap_close(rel, NoLock);
***************
*** 2543,2551 ****
         * will be available for it to fire.
         *
         * If we find no firable events, we don't have to increment 
firing_counter.
         */
        events = &afterTriggers->query_stack[afterTriggers->query_depth];
!       if (afterTriggerMarkEvents(events, &afterTriggers->events, true))
        {
                CommandId               firing_id = 
afterTriggers->firing_counter++;
  
--- 2557,2569 ----
         * will be available for it to fire.
         *
         * If we find no firable events, we don't have to increment 
firing_counter.
+        *
+        * If we do find events, we continue to mark events and invoking them
+        * until there are no events remaining because a referential action may
+        * be adding not yet marked events to the end of our queue.
         */
        events = &afterTriggers->query_stack[afterTriggers->query_depth];
!       while (afterTriggerMarkEvents(events, &afterTriggers->events, true))
        {
                CommandId               firing_id = 
afterTriggers->firing_counter++;
  
Index: src/backend/executor/spi.c
===================================================================
RCS file: /projects/cvsroot/pgsql/src/backend/executor/spi.c,v
retrieving revision 1.141
diff -c -r1.141 spi.c
*** src/backend/executor/spi.c  9 Jun 2005 21:25:22 -0000       1.141
--- src/backend/executor/spi.c  24 Aug 2005 03:24:50 -0000
***************
*** 40,48 ****
  static int _SPI_execute_plan(_SPI_plan *plan,
                                                         Datum *Values, const 
char *Nulls,
                                                         Snapshot snapshot, 
Snapshot crosscheck_snapshot,
!                                                        bool read_only, long 
tcount);
  
! static int _SPI_pquery(QueryDesc *queryDesc, long tcount);
  
  static void _SPI_error_callback(void *arg);
  
--- 40,48 ----
  static int _SPI_execute_plan(_SPI_plan *plan,
                                                         Datum *Values, const 
char *Nulls,
                                                         Snapshot snapshot, 
Snapshot crosscheck_snapshot,
!                                                        bool read_only, long 
tcount, bool nest_triggers);
  
! static int _SPI_pquery(QueryDesc *queryDesc, long tcount, bool nest_triggers);
  
  static void _SPI_error_callback(void *arg);
  
***************
*** 302,308 ****
  
        res = _SPI_execute_plan(&plan, NULL, NULL,
                                                        InvalidSnapshot, 
InvalidSnapshot,
!                                                       read_only, tcount);
  
        _SPI_end_call(true);
        return res;
--- 302,308 ----
  
        res = _SPI_execute_plan(&plan, NULL, NULL,
                                                        InvalidSnapshot, 
InvalidSnapshot,
!                                                       read_only, tcount, 
true);
  
        _SPI_end_call(true);
        return res;
***************
*** 335,341 ****
        res = _SPI_execute_plan((_SPI_plan *) plan,
                                                        Values, Nulls,
                                                        InvalidSnapshot, 
InvalidSnapshot,
!                                                       read_only, tcount);
  
        _SPI_end_call(true);
        return res;
--- 335,341 ----
        res = _SPI_execute_plan((_SPI_plan *) plan,
                                                        Values, Nulls,
                                                        InvalidSnapshot, 
InvalidSnapshot,
!                                                       read_only, tcount, 
true);
  
        _SPI_end_call(true);
        return res;
***************
*** 361,367 ****
  SPI_execute_snapshot(void *plan,
                                         Datum *Values, const char *Nulls,
                                         Snapshot snapshot, Snapshot 
crosscheck_snapshot,
!                                        bool read_only, long tcount)
  {
        int                     res;
  
--- 361,367 ----
  SPI_execute_snapshot(void *plan,
                                         Datum *Values, const char *Nulls,
                                         Snapshot snapshot, Snapshot 
crosscheck_snapshot,
!                                        bool read_only, long tcount, bool 
nest_triggers)
  {
        int                     res;
  
***************
*** 378,384 ****
        res = _SPI_execute_plan((_SPI_plan *) plan,
                                                        Values, Nulls,
                                                        snapshot, 
crosscheck_snapshot,
!                                                       read_only, tcount);
  
        _SPI_end_call(true);
        return res;
--- 378,384 ----
        res = _SPI_execute_plan((_SPI_plan *) plan,
                                                        Values, Nulls,
                                                        snapshot, 
crosscheck_snapshot,
!                                                       read_only, tcount, 
nest_triggers);
  
        _SPI_end_call(true);
        return res;
***************
*** 1306,1316 ****
   * crosscheck_snapshot: for RI use, all others pass InvalidSnapshot
   * read_only: TRUE for read-only execution (no CommandCounterIncrement)
   * tcount: execution tuple-count limit, or 0 for none
   */
  static int
  _SPI_execute_plan(_SPI_plan *plan, Datum *Values, const char *Nulls,
                                  Snapshot snapshot, Snapshot 
crosscheck_snapshot,
!                                 bool read_only, long tcount)
  {
        volatile int res = 0;
        Snapshot        saveActiveSnapshot;
--- 1306,1319 ----
   * crosscheck_snapshot: for RI use, all others pass InvalidSnapshot
   * read_only: TRUE for read-only execution (no CommandCounterIncrement)
   * tcount: execution tuple-count limit, or 0 for none
+  * nest_triggers: TRUE to treat as a separate query for after triggers,
+  *            FALSE to place after triggers on the outer statement's 
+  *            queue.
   */
  static int
  _SPI_execute_plan(_SPI_plan *plan, Datum *Values, const char *Nulls,
                                  Snapshot snapshot, Snapshot 
crosscheck_snapshot,
!                                 bool read_only, long tcount, bool 
nest_triggers)
  {
        volatile int res = 0;
        Snapshot        saveActiveSnapshot;
***************
*** 1462,1468 ****
                                                                                
        dest,
                                                                                
        paramLI, false);
                                        res = _SPI_pquery(qdesc,
!                                                                         
queryTree->canSetTag ? tcount : 0);
                                        FreeQueryDesc(qdesc);
                                }
                                FreeSnapshot(ActiveSnapshot);
--- 1465,1471 ----
                                                                                
        dest,
                                                                                
        paramLI, false);
                                        res = _SPI_pquery(qdesc,
!                                                                         
queryTree->canSetTag ? tcount : 0, nest_triggers);
                                        FreeQueryDesc(qdesc);
                                }
                                FreeSnapshot(ActiveSnapshot);
***************
*** 1494,1500 ****
  }
  
  static int
! _SPI_pquery(QueryDesc *queryDesc, long tcount)
  {
        int                     operation = queryDesc->operation;
        CommandDest     origDest = queryDesc->dest->mydest;
--- 1497,1503 ----
  }
  
  static int
! _SPI_pquery(QueryDesc *queryDesc, long tcount, bool nest_triggers)
  {
        int                     operation = queryDesc->operation;
        CommandDest     origDest = queryDesc->dest->mydest;
***************
*** 1529,1535 ****
                ResetUsage();
  #endif
  
!       AfterTriggerBeginQuery();
  
        ExecutorStart(queryDesc, false);
  
--- 1532,1539 ----
                ResetUsage();
  #endif
  
!       if (nest_triggers)
!               AfterTriggerBeginQuery();
  
        ExecutorStart(queryDesc, false);
  
***************
*** 1544,1551 ****
                        elog(ERROR, "consistency check on SPI tuple count 
failed");
        }
  
!       /* Take care of any queued AFTER triggers */
!       AfterTriggerEndQuery(queryDesc->estate);
  
        ExecutorEnd(queryDesc);
  
--- 1548,1556 ----
                        elog(ERROR, "consistency check on SPI tuple count 
failed");
        }
  
!       /* Take care of any queued AFTER triggers if we are nesting triggers */
!       if (nest_triggers)
!               AfterTriggerEndQuery(queryDesc->estate);
  
        ExecutorEnd(queryDesc);
  
Index: src/backend/utils/adt/ri_triggers.c
===================================================================
RCS file: /projects/cvsroot/pgsql/src/backend/utils/adt/ri_triggers.c,v
retrieving revision 1.80
diff -c -r1.80 ri_triggers.c
*** src/backend/utils/adt/ri_triggers.c 28 Jun 2005 05:09:00 -0000      1.80
--- src/backend/utils/adt/ri_triggers.c 24 Aug 2005 03:24:56 -0000
***************
*** 2383,2389 ****
                                                                         
qkey.keypair[i][RI_KEYPAIR_PK_IDX]);
                                }
                                strcat(querystr, qualstr);
- 
                                /* Prepare the plan, don't save it */
                                qplan = ri_PlanCheck(querystr, qkey.nkeypairs, 
queryoids,
                                                                         &qkey, 
fk_rel, pk_rel, false);
--- 2383,2388 ----
***************
*** 2745,2751 ****
                                                                          NULL, 
NULL,
                                                                          
CopySnapshot(GetLatestSnapshot()),
                                                                          
InvalidSnapshot,
!                                                                         true, 
1);
  
        /* Check result */
        if (spi_result != SPI_OK_SELECT)
--- 2744,2750 ----
                                                                          NULL, 
NULL,
                                                                          
CopySnapshot(GetLatestSnapshot()),
                                                                          
InvalidSnapshot,
!                                                                         true, 
1, false);
  
        /* Check result */
        if (spi_result != SPI_OK_SELECT)
***************
*** 3175,3181 ****
        spi_result = SPI_execute_snapshot(qplan,
                                                                          vals, 
nulls,
                                                                          
test_snapshot, crosscheck_snapshot,
!                                                                         
false, limit);
  
        /* Restore UID */
        SetUserId(save_uid);
--- 3174,3180 ----
        spi_result = SPI_execute_snapshot(qplan,
                                                                          vals, 
nulls,
                                                                          
test_snapshot, crosscheck_snapshot,
!                                                                         
false, limit, false);
  
        /* Restore UID */
        SetUserId(save_uid);
Index: src/include/executor/spi.h
===================================================================
RCS file: /projects/cvsroot/pgsql/src/include/executor/spi.h,v
retrieving revision 1.52
diff -c -r1.52 spi.h
*** src/include/executor/spi.h  2 May 2005 00:37:06 -0000       1.52
--- src/include/executor/spi.h  24 Aug 2005 03:24:58 -0000
***************
*** 92,98 ****
                                                                 Datum *Values, 
const char *Nulls,
                                                                 Snapshot 
snapshot,
                                                                 Snapshot 
crosscheck_snapshot,
!                                                                bool 
read_only, long tcount);
  extern void *SPI_prepare(const char *src, int nargs, Oid *argtypes);
  extern void *SPI_saveplan(void *plan);
  extern int    SPI_freeplan(void *plan);
--- 92,98 ----
                                                                 Datum *Values, 
const char *Nulls,
                                                                 Snapshot 
snapshot,
                                                                 Snapshot 
crosscheck_snapshot,
!                                                                bool 
read_only, long tcount, bool nest_triggers);
  extern void *SPI_prepare(const char *src, int nargs, Oid *argtypes);
  extern void *SPI_saveplan(void *plan);
  extern int    SPI_freeplan(void *plan);
Index: src/test/regress/expected/foreign_key.out
===================================================================
RCS file: /projects/cvsroot/pgsql/src/test/regress/expected/foreign_key.out,v
retrieving revision 1.39
diff -c -r1.39 foreign_key.out
*** src/test/regress/expected/foreign_key.out   30 May 2005 07:20:59 -0000      
1.39
--- src/test/regress/expected/foreign_key.out   24 Aug 2005 03:25:03 -0000
***************
*** 646,652 ****
  UPDATE PKTABLE set ptest2=5 where ptest2=2;
  ERROR:  insert or update on table "fktable" violates foreign key constraint 
"constrname3"
  DETAIL:  Key (ftest1,ftest2,ftest3)=(1,-1,3) is not present in table 
"pktable".
- CONTEXT:  SQL statement "UPDATE ONLY "public"."fktable" SET "ftest2" = 
DEFAULT WHERE "ftest1" = $1 AND "ftest2" = $2 AND "ftest3" = $3"
  -- Try to update something that will set default
  UPDATE PKTABLE set ptest1=0, ptest2=5, ptest3=10 where ptest2=2;
  UPDATE PKTABLE set ptest2=10 where ptest2=4;
--- 646,651 ----
***************
*** 1172,1174 ****
--- 1171,1187 ----
  COMMIT;
  ERROR:  insert or update on table "fktable" violates foreign key constraint 
"fktable_fk_fkey"
  DETAIL:  Key (fk)=(20) is not present in table "pktable".
+ -- Check to make sure that forcing checks like the above doesn't allow
+ -- us to see intermediate states caused by multiple referential actions
+ -- on the same row.
+ DROP TABLE pktable, fktable CASCADE;
+ NOTICE:  drop cascades to constraint fktable_fk_fkey on table fktable
+ CREATE TEMP TABLE pktable(id int PRIMARY KEY);
+ NOTICE:  CREATE TABLE / PRIMARY KEY will create implicit index "pktable_pkey" 
for table "pktable"
+ CREATE TEMP TABLE fktable(id1 int REFERENCES pktable ON UPDATE CASCADE,
+                           id2 int REFERENCES pktable ON UPDATE CASCADE,
+                           id3 int REFERENCES pktable ON UPDATE CASCADE);
+ INSERT INTO pktable VALUES (1);
+ INSERT INTO fktable VALUES (1,1,1);
+ -- This should succeed
+ UPDATE pktable SET id = 2;
Index: src/test/regress/sql/foreign_key.sql
===================================================================
RCS file: /projects/cvsroot/pgsql/src/test/regress/sql/foreign_key.sql,v
retrieving revision 1.16
diff -c -r1.16 foreign_key.sql
*** src/test/regress/sql/foreign_key.sql        30 May 2005 07:20:59 -0000      
1.16
--- src/test/regress/sql/foreign_key.sql        24 Aug 2005 03:25:04 -0000
***************
*** 808,810 ****
--- 808,827 ----
  
  -- should catch error from initial INSERT
  COMMIT;
+ 
+ 
+ -- Check to make sure that forcing checks like the above doesn't allow
+ -- us to see intermediate states caused by multiple referential actions
+ -- on the same row if we update it a bunch of times in a single statement.
+ DROP TABLE pktable, fktable CASCADE;
+ CREATE TEMP TABLE pktable(id int PRIMARY KEY);
+ CREATE TEMP TABLE fktable(id1 int REFERENCES pktable ON UPDATE CASCADE, 
+                         id2 int REFERENCES pktable ON UPDATE CASCADE,
+                         id3 int REFERENCES pktable ON UPDATE CASCADE);
+ 
+ INSERT INTO pktable VALUES (1);
+ INSERT INTO fktable VALUES (1,1,1);
+ 
+ -- This should succeed
+ UPDATE pktable SET id = 2;
+ 
---------------------------(end of broadcast)---------------------------
TIP 6: explain analyze is your friend

Reply via email to