On Sun, 5 Oct 2003, Tom Lane wrote:

> Stephan Szabo <[EMAIL PROTECTED]> writes:
> >> Improve speed of building of constraints during restore
>
> > Did we get consensus on what to do with this,
>
> Not really, it was still up in the air I thought.  However, the
> discussion will become moot if we don't have an implementation
> of the faster-checking alternative to look at pretty soon.  Do you
> have something nearly ready to show?

It's not cleaned up, but yes. It appears to work for the simple tests I've
done and should fall back if the permissions don't work to do a single
query on both tables.

I wasn't sure what to do about some of the spi error conditions.  For many
of them I'm just returning false now so that it will try the other
mechanism in case that might work. I'm not really sure if that's worth it,
or if I should just elog(ERROR) and give up.
Index: src/backend/commands/tablecmds.c
===================================================================
RCS file: /projects/cvsroot/pgsql-server/src/backend/commands/tablecmds.c,v
retrieving revision 1.85
diff -c -r1.85 tablecmds.c
*** src/backend/commands/tablecmds.c    2 Oct 2003 06:36:37 -0000       1.85
--- src/backend/commands/tablecmds.c    5 Oct 2003 18:30:43 -0000
***************
*** 3437,3442 ****
--- 3437,3443 ----
        return indexoid;
  }
  
+ 
  /*
   * Scan the existing rows in a table to verify they meet a proposed FK
   * constraint.
***************
*** 3454,3531 ****
        List       *list;
        int                     count;
  
!       /*
!        * Scan through each tuple, calling RI_FKey_check_ins (insert trigger)
!        * as if that tuple had just been inserted.  If any of those fail, it
!        * should ereport(ERROR) and that's that.
!        */
!       MemSet(&trig, 0, sizeof(trig));
!       trig.tgoid = InvalidOid;
!       trig.tgname = fkconstraint->constr_name;
!       trig.tgenabled = TRUE;
!       trig.tgisconstraint = TRUE;
!       trig.tgconstrrelid = RelationGetRelid(pkrel);
!       trig.tgdeferrable = FALSE;
!       trig.tginitdeferred = FALSE;
! 
!       trig.tgargs = (char **) palloc(sizeof(char *) *
!                                                                  (4 + 
length(fkconstraint->fk_attrs)
!                                                                       + 
length(fkconstraint->pk_attrs)));
! 
!       trig.tgargs[0] = trig.tgname;
!       trig.tgargs[1] = RelationGetRelationName(rel);
!       trig.tgargs[2] = RelationGetRelationName(pkrel);
!       trig.tgargs[3] = fkMatchTypeToString(fkconstraint->fk_matchtype);
!       count = 4;
!       foreach(list, fkconstraint->fk_attrs)
!       {
!               char       *fk_at = strVal(lfirst(list));
! 
!               trig.tgargs[count] = fk_at;
!               count += 2;
!       }
!       count = 5;
!       foreach(list, fkconstraint->pk_attrs)
!       {
!               char       *pk_at = strVal(lfirst(list));
! 
!               trig.tgargs[count] = pk_at;
!               count += 2;
!       }
!       trig.tgnargs = count - 1;
! 
!       scan = heap_beginscan(rel, SnapshotNow, 0, NULL);
! 
!       while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
!       {
!               FunctionCallInfoData fcinfo;
!               TriggerData trigdata;
! 
!               /*
!                * Make a call to the trigger function
!                *
!                * No parameters are passed, but we do set a context
!                */
!               MemSet(&fcinfo, 0, sizeof(fcinfo));
! 
                /*
!                * We assume RI_FKey_check_ins won't look at flinfo...
                 */
!               trigdata.type = T_TriggerData;
!               trigdata.tg_event = TRIGGER_EVENT_INSERT | TRIGGER_EVENT_ROW;
!               trigdata.tg_relation = rel;
!               trigdata.tg_trigtuple = tuple;
!               trigdata.tg_newtuple = NULL;
!               trigdata.tg_trigger = &trig;
  
!               fcinfo.context = (Node *) &trigdata;
! 
!               RI_FKey_check_ins(&fcinfo);
        }
- 
-       heap_endscan(scan);
- 
-       pfree(trig.tgargs);
  }
  
  /*
--- 3455,3533 ----
        List       *list;
        int                     count;
  
!       if (!RI_Check_Table(fkconstraint, rel, pkrel)) {
                /*
!                * Scan through each tuple, calling RI_FKey_check_ins (insert trigger)
!                * as if that tuple had just been inserted.  If any of those fail, it
!                * should ereport(ERROR) and that's that.
                 */
!               MemSet(&trig, 0, sizeof(trig));
!               trig.tgoid = InvalidOid;
!               trig.tgname = fkconstraint->constr_name;
!               trig.tgenabled = TRUE;
!               trig.tgisconstraint = TRUE;
!               trig.tgconstrrelid = RelationGetRelid(pkrel);
!               trig.tgdeferrable = FALSE;
!               trig.tginitdeferred = FALSE;
! 
!               trig.tgargs = (char **) palloc(sizeof(char *) *
!                                                                          (4 + 
length(fkconstraint->fk_attrs)
!                                                                               + 
length(fkconstraint->pk_attrs)));
! 
!               trig.tgargs[0] = trig.tgname;
!               trig.tgargs[1] = RelationGetRelationName(rel);
!               trig.tgargs[2] = RelationGetRelationName(pkrel);
!               trig.tgargs[3] = fkMatchTypeToString(fkconstraint->fk_matchtype);
!               count = 4;
!               foreach(list, fkconstraint->fk_attrs)
!               {
!                       char       *fk_at = strVal(lfirst(list));
! 
!                       trig.tgargs[count] = fk_at;
!                       count += 2;
!               }
!               count = 5;
!               foreach(list, fkconstraint->pk_attrs)
!               {
!                       char       *pk_at = strVal(lfirst(list));
! 
!                       trig.tgargs[count] = pk_at;
!                       count += 2;
!               }
!               trig.tgnargs = count - 1;
! 
!               scan = heap_beginscan(rel, SnapshotNow, 0, NULL);
! 
!               while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
!               {
!                       FunctionCallInfoData fcinfo;
!                       TriggerData trigdata;
! 
!                       /*
!                        * Make a call to the trigger function
!                        *
!                        * No parameters are passed, but we do set a context
!                        */
!                       MemSet(&fcinfo, 0, sizeof(fcinfo));
! 
!                       /*
!                        * We assume RI_FKey_check_ins won't look at flinfo...
!                        */
!                       trigdata.type = T_TriggerData;
!                       trigdata.tg_event = TRIGGER_EVENT_INSERT | TRIGGER_EVENT_ROW;
!                       trigdata.tg_relation = rel;
!                       trigdata.tg_trigtuple = tuple;
!                       trigdata.tg_newtuple = NULL;
!                       trigdata.tg_trigger = &trig;
! 
!                       fcinfo.context = (Node *) &trigdata;
! 
!                       RI_FKey_check_ins(&fcinfo);
!               }
!               heap_endscan(scan);
  
!               pfree(trig.tgargs);
        }
  }
  
  /*
Index: src/backend/utils/adt/ri_triggers.c
===================================================================
RCS file: /projects/cvsroot/pgsql-server/src/backend/utils/adt/ri_triggers.c,v
retrieving revision 1.61
diff -c -r1.61 ri_triggers.c
*** src/backend/utils/adt/ri_triggers.c 1 Oct 2003 21:30:52 -0000       1.61
--- src/backend/utils/adt/ri_triggers.c 5 Oct 2003 18:30:50 -0000
***************
*** 40,48 ****
  #include "rewrite/rewriteHandler.h"
  #include "utils/lsyscache.h"
  #include "utils/typcache.h"
  #include "miscadmin.h"
  
- 
  /* ----------
   * Local definitions
   * ----------
--- 40,48 ----
  #include "rewrite/rewriteHandler.h"
  #include "utils/lsyscache.h"
  #include "utils/typcache.h"
+ #include "utils/acl.h"
  #include "miscadmin.h"
  
  /* ----------
   * Local definitions
   * ----------
***************
*** 3396,3399 ****
--- 3396,3606 ----
         */
        return DatumGetBool(FunctionCall2(&(typentry->eq_opr_finfo),
                                                                          oldvalue, 
newvalue));
+ }
+ 
+ 
+ /* ----------
+  * RI_Check_Table -
+  *
+  *    Check an entire table for non-matching values using a single query.
+  *    We expect that an exclusive lock has been taken on rel and pkrel
+  *    such that we do not need to do row locking for the check.
+  *
+  *    If the check fails due to SPI errors or the permissions are not such
+  *    that the the current user has read permissions on both tables return
+  *    false to let our caller know that they will need to do something else
+  *    to check the constraint (or error as the case may be).
+  *
+  * ----------
+  */
+ bool
+ RI_Check_Table(FkConstraint *fkconstraint, Relation rel, Relation pkrel) {
+       char            querystr[MAX_QUOTED_REL_NAME_LEN * 2 + 250 +
+                                               (MAX_QUOTED_NAME_LEN + 32) * 
((RI_MAX_NUMKEYS * 4)+1) ];
+       char            pkrelname[MAX_QUOTED_REL_NAME_LEN];
+       char            relname[MAX_QUOTED_REL_NAME_LEN];
+       char            attname[MAX_QUOTED_NAME_LEN];
+       char            fkattname[MAX_QUOTED_NAME_LEN];
+       char            *sep = "";
+       List            *list;
+       List            *list2;
+       int             spi_result;
+       void            *qplan;
+       Datum           vals[RI_MAX_NUMKEYS * 2];
+       char            nulls[RI_MAX_NUMKEYS * 2];
+ 
+       if (!rel || !pkrel)
+               return false;
+  
+       /*
+        * Do a pre-check to see if we believe that the query below will succeed.
+        * I'm not sure if these checks are sufficient, but they cover the common
+        * case that would cause failures.
+        */
+       if (pg_class_aclcheck(RelationGetRelid(rel), GetUserId(), ACL_SELECT) != 
ACLCHECK_OK)
+               return false;
+       if (pg_class_aclcheck(RelationGetRelid(pkrel), GetUserId(), ACL_SELECT) != 
ACLCHECK_OK) 
+               return false;
+ 
+       quoteRelationName(pkrelname, pkrel);
+       quoteRelationName(relname, rel);
+ 
+       /* The query string build is:
+        *  SELECT fk.keycols FROM ONLY relname fk 
+        *   LEFT OUTER JOIN pkrelname pk 
+        *   ON (pk.pkkeycol1=fk.keycol1 [AND ...])
+        *   WHERE pk.pkkeycol1 IS NULL AND
+        * For MATCH unspecified:
+        *   (fk.keycol1 IS NOT NULL [AND ...])
+        * For MATCH FULL:
+        *   (fk.keycol1 IS NOT NULL [OR ...])
+        */
+ 
+       snprintf(querystr, sizeof(querystr), "SELECT ");
+       foreach (list, fkconstraint->fk_attrs) {
+               quoteOneName(attname, strVal(lfirst(list)));
+               snprintf(querystr + strlen(querystr), sizeof(querystr), "%sfk.%s", 
sep, attname);
+               sep = ", ";
+       }
+       snprintf(querystr + strlen(querystr), sizeof(querystr), " FROM ONLY %s fk LEFT 
OUTER JOIN "
+                       " %s pk ON (", relname, pkrelname);
+ 
+       sep="";
+       for (list=fkconstraint->pk_attrs, list2=fkconstraint->fk_attrs; 
+               list != NIL && list2 != NIL; 
+               list=lnext(list), list2=lnext(list2)) {
+               quoteOneName(attname, strVal(lfirst(list)));
+               quoteOneName(fkattname, strVal(lfirst(list2)));
+               snprintf(querystr + strlen(querystr), sizeof(querystr), " 
%spk.%s=fk.%s", sep, attname, fkattname);
+               sep = "AND ";
+       }
+       quoteOneName(fkattname, strVal(lfirst(fkconstraint->pk_attrs)));
+       snprintf(querystr + strlen(querystr), sizeof(querystr), ") WHERE pk.%s IS NULL 
AND (", fkattname);
+ 
+       sep="";
+       foreach (list, fkconstraint->fk_attrs) {
+               quoteOneName(attname, strVal(lfirst(list)));
+               snprintf(querystr + strlen(querystr), sizeof(querystr), " %sfk.%s IS 
NOT NULL", sep, attname);
+               switch (fkconstraint->fk_matchtype) {
+                       case FKCONSTR_MATCH_UNSPECIFIED:
+                               sep="AND ";
+                               break;
+                       case FKCONSTR_MATCH_FULL:
+                               sep="OR ";
+                               break;
+                       case FKCONSTR_MATCH_PARTIAL:
+                               ereport(ERROR,
+                                       (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+                                               errmsg("MATCH PARTIAL not yet 
implemented")));
+                               break;
+                       default:
+                               elog(ERROR, "Unrecognized match type: %d", 
fkconstraint->fk_matchtype);
+                               break;
+               }
+       }
+       snprintf(querystr + strlen(querystr), sizeof(querystr), ")");
+ 
+       if (SPI_connect() != SPI_OK_CONNECT) {
+               elog(DEBUG1, "SPI_connect failed");
+               return false;
+       }
+ 
+       /*
+        * Generate the plan.  We don't want to cache it, and there are no
+        * arguments to the plan. 
+        */
+ 
+       /* Create the plan */
+       qplan = SPI_prepare(querystr, 0, NULL);
+ 
+       if (qplan == NULL) {
+               elog(DEBUG1, "Unable to prepare plan");
+               if (SPI_finish() != SPI_OK_FINISH)
+                       elog(ERROR, "SPI_finish failed");
+               return false;
+       }
+ 
+       nulls[0] = ' ';
+ 
+       /* Run the plan */
+       spi_result = SPI_execp(qplan, vals, nulls, 1);
+ 
+       /* Check result */
+       if (spi_result < 0) {
+               elog(DEBUG1, "SPI_execp failed %d", spi_result);
+               if (SPI_finish() != SPI_OK_FINISH)
+                       elog(ERROR, "SPI_finish failed");
+               return false;
+       }
+ 
+       if (spi_result != SPI_OK_SELECT) {
+               elog(DEBUG1, "SPI_execp failed to return SPI_OK_SELECT");
+               if (SPI_finish() != SPI_OK_FINISH)
+                       elog(ERROR, "SPI_finish failed");
+               return false;
+       }
+ 
+       if (SPI_processed>0) 
+       {
+               char            key_names[BUFLENGTH];
+               char            key_values[BUFLENGTH];
+               char       *name_ptr = key_names;
+               char       *val_ptr = key_values;
+               int             colpos = 1;
+               bool            nullsfound=false;
+ 
+               foreach (list, fkconstraint->fk_attrs)
+               {
+                       char    *val;
+ 
+                       quoteOneName(attname, strVal(lfirst(list)));
+ 
+                       val = SPI_getvalue(SPI_tuptable->vals[0], 
SPI_tuptable->tupdesc, colpos);
+                       if (!val) {
+                               nullsfound=true;
+                               break;
+                       }
+ 
+                       /*
+                        * Go to "..." if name or value doesn't fit in buffer.  We 
reserve
+                        * 5 bytes to ensure we can add comma, "...", null.
+                        */
+                       if (strlen(attname) >= (key_names + BUFLENGTH - 5) - name_ptr 
||
+                               strlen(val) >= (key_values + BUFLENGTH - 5) - val_ptr)
+                       {
+                               sprintf(name_ptr, "...");
+                               sprintf(val_ptr, "...");
+                               break;
+                       }
+ 
+                       name_ptr += sprintf(name_ptr, "%s%s", colpos > 1 ? "," : "", 
attname);
+                       val_ptr += sprintf(val_ptr, "%s%s", colpos > 1 ? "," : "", 
val);
+                       colpos++;
+               }
+ 
+               if (!nullsfound) {
+                       ereport(ERROR,
+                               (errcode(ERRCODE_FOREIGN_KEY_VIOLATION),
+                                errmsg("insert or update on table %s violates foreign 
key constraint \"%s\"",
+                                               relname, fkconstraint->constr_name),
+                                errdetail("Key (%s)=(%s) is not present in table 
\"%s\".",
+                                                  key_names, key_values,
+                                                  RelationGetRelationName(pkrel))));
+               }
+               else {
+                       /* 
+                        * If we get here currently, this means we must be MATCH FULL 
(since unspecified
+                        * couldn't have nulls in the output) and the nulls had to be 
mixed with non-NULLS.
+                        */
+                       ereport(ERROR,
+                                       (errcode(ERRCODE_FOREIGN_KEY_VIOLATION),
+                                        errmsg("insert or update on table %s violates 
foreign key constraint \"%s\"",
+                                               relname, fkconstraint->constr_name),
+                                        errdetail("MATCH FULL does not allow mixing 
of NULL and non-NULL key values.")));
+               }
+       }
+ 
+       if (SPI_finish() != SPI_OK_FINISH)
+               elog(ERROR, "SPI_finish failed");
+       return true;
  }
Index: src/include/commands/trigger.h
===================================================================
RCS file: /projects/cvsroot/pgsql-server/src/include/commands/trigger.h,v
retrieving revision 1.43
diff -c -r1.43 trigger.h
*** src/include/commands/trigger.h      4 Aug 2003 02:40:13 -0000       1.43
--- src/include/commands/trigger.h      5 Oct 2003 18:30:53 -0000
***************
*** 197,201 ****
--- 197,204 ----
   * in utils/adt/ri_triggers.c
   */
  extern bool RI_FKey_keyequal_upd(TriggerData *trigdata);
+ extern bool RI_Check_Table(FkConstraint *fkconstraint, 
+                                 Relation rel, 
+                                 Relation pkrel);
  
  #endif   /* TRIGGER_H */
Index: src/test/regress/expected/alter_table.out
===================================================================
RCS file: /projects/cvsroot/pgsql-server/src/test/regress/expected/alter_table.out,v
retrieving revision 1.77
diff -c -r1.77 alter_table.out
*** src/test/regress/expected/alter_table.out   2 Oct 2003 06:32:46 -0000       1.77
--- src/test/regress/expected/alter_table.out   5 Oct 2003 18:31:02 -0000
***************
*** 313,320 ****
  ERROR:  column "b" referenced in foreign key constraint does not exist
  -- Try (and fail) to add constraint due to invalid data
  ALTER TABLE tmp3 add constraint tmpconstr foreign key (a) references tmp2 match full;
! ERROR:  insert or update on table "tmp3" violates foreign key constraint "tmpconstr"
! DETAIL:  Key (a)=(5) is not present in table "tmp2".
  -- Delete failing row
  DELETE FROM tmp3 where a=5;
  -- Try (and succeed)
--- 313,320 ----
  ERROR:  column "b" referenced in foreign key constraint does not exist
  -- Try (and fail) to add constraint due to invalid data
  ALTER TABLE tmp3 add constraint tmpconstr foreign key (a) references tmp2 match full;
! ERROR:  insert or update on table "public"."tmp3" violates foreign key constraint 
"tmpconstr"
! DETAIL:  Key ("a")=(5) is not present in table "tmp2".
  -- Delete failing row
  DELETE FROM tmp3 where a=5;
  -- Try (and succeed)
---------------------------(end of broadcast)---------------------------
TIP 7: don't forget to increase your free space map settings

Reply via email to