Stephan Szabo <[EMAIL PROTECTED]> writes:
> It's not cleaned up, but yes. It appears to work for the simple tests I've
> done and should fall back if the permissions don't work to do a single
> query on both tables.

Here's my code-reviewed version of the patch.  Anyone else want to take
a look?

> I wasn't sure what to do about some of the spi error conditions.  For many
> of them I'm just returning false now so that it will try the other
> mechanism in case that might work. I'm not really sure if that's worth it,
> or if I should just elog(ERROR) and give up.

I think you may as well keep it the same as the other RI routines and
just elog() on SPI error.  If SPI is broken, the trigger procedure is
gonna fail too.

I changed that, consolidated the error-reporting code, and fixed a couple
other little issues, notably:

* The generated query applied ONLY to the FK table but not the PK table.
  I assume this was just an oversight.

* The query is now run using SPI_execp_current and selecting "current"
  snapshot.  Without this, we could fail in a serializable transaction
  if someone else has already committed changes to either relation.
  For example:

        create pk and fk tables;

        begin serializable xact;
        insert into pk values(1);
        insert into fk values(1);

                                                begin;
                                                insert into fk values(2);
                                                commit;

        alter table fk add foreign key ...;

  The ALTER will not be blocked from acquiring exclusive lock, since
  T2 already committed.  But if we run the query in the serializable
  snapshot, it won't see the violating row fk=2.

The old trigger-based check avoids this error because the scan loop uses
SnapshotNow to select live rows from the FK table.  There is a dual race
condition where T2 deletes a row from the PK table.  In current CVS tip
this will be detected and reported as a serialization failure, because
T1 won't be able to get SELECT FOR UPDATE lock on the deleted row.  With
the proposed patch you'll instead see a "no such key" failure, which I
think is fine, even though it nominally violates serializability.

Comments?  Can anyone else do a code review (Jan??)?

                        regards, tom lane

*** src/backend/commands/tablecmds.c.orig       Thu Oct  2 15:24:52 2003
--- src/backend/commands/tablecmds.c    Sun Oct  5 16:29:51 2003
***************
*** 3455,3460 ****
--- 3455,3467 ----
        int                     count;
  
        /*
+        * See if we can do it with a single LEFT JOIN query.  A FALSE result
+        * indicates we must proceed with the fire-the-trigger method.
+        */
+       if (RI_Initial_Check(fkconstraint, rel, pkrel))
+               return;
+ 
+       /*
         * Scan through each tuple, calling RI_FKey_check_ins (insert trigger)
         * as if that tuple had just been inserted.  If any of those fail, it
         * should ereport(ERROR) and that's that.
*** src/backend/utils/adt/ri_triggers.c.orig    Wed Oct  1 17:30:52 2003
--- src/backend/utils/adt/ri_triggers.c Sun Oct  5 16:42:37 2003
***************
*** 40,45 ****
--- 40,46 ----
  #include "rewrite/rewriteHandler.h"
  #include "utils/lsyscache.h"
  #include "utils/typcache.h"
+ #include "utils/acl.h"
  #include "miscadmin.h"
  
  
***************
*** 164,170 ****
                                 Datum *vals, char *nulls);
  static void ri_ReportViolation(RI_QueryKey *qkey, const char *constrname,
                                   Relation pk_rel, Relation fk_rel,
!                                  HeapTuple violator, bool spi_err);
  
  
  /* ----------
--- 165,172 ----
                                 Datum *vals, char *nulls);
  static void ri_ReportViolation(RI_QueryKey *qkey, const char *constrname,
                                   Relation pk_rel, Relation fk_rel,
!                                  HeapTuple violator, TupleDesc tupdesc,
!                                  bool spi_err);
  
  
  /* ----------
***************
*** 2540,2546 ****
--- 2542,2743 ----
  }
  
  
+ /* ----------
+  * RI_Initial_Check -
+  *
+  *    Check an entire table for non-matching values using a single query.
+  *    This is not a trigger procedure, but is called during ALTER TABLE
+  *    ADD FOREIGN KEY to validate the initial table contents.
+  *
+  *    We expect that an exclusive lock has been taken on rel and pkrel;
+  *    hence, we do not need to lock individual rows for the check.
+  *
+  *    If the check fails because the current user doesn't have permissions
+  *    to read both tables, return false to let our caller know that they will
+  *    need to do something else to check the constraint.
+  * ----------
+  */
+ bool
+ RI_Initial_Check(FkConstraint *fkconstraint, Relation rel, Relation pkrel)
+ {
+       const char *constrname = fkconstraint->constr_name;
+       char            querystr[MAX_QUOTED_REL_NAME_LEN * 2 + 250 +
+                                               (MAX_QUOTED_NAME_LEN + 32) * 
((RI_MAX_NUMKEYS * 4)+1)];
+       char            pkrelname[MAX_QUOTED_REL_NAME_LEN];
+       char            relname[MAX_QUOTED_REL_NAME_LEN];
+       char            attname[MAX_QUOTED_NAME_LEN];
+       char            fkattname[MAX_QUOTED_NAME_LEN];
+       const char *sep;
+       List            *list;
+       List            *list2;
+       int                     spi_result;
+       void            *qplan;
+ 
+       /*
+        * Check to make sure current user has enough permissions to do the
+        * test query.  (If not, caller can fall back to the trigger method,
+        * which works because it changes user IDs on the fly.)
+        *
+        * XXX are there any other show-stopper conditions to check?
+        */
+       if (pg_class_aclcheck(RelationGetRelid(rel), GetUserId(), ACL_SELECT) != 
ACLCHECK_OK)
+               return false;
+       if (pg_class_aclcheck(RelationGetRelid(pkrel), GetUserId(), ACL_SELECT) != 
ACLCHECK_OK) 
+               return false;
+ 
+       /*----------
+        * The query string built is:
+        *  SELECT fk.keycols FROM ONLY relname fk 
+        *   LEFT OUTER JOIN ONLY pkrelname pk 
+        *   ON (pk.pkkeycol1=fk.keycol1 [AND ...])
+        *   WHERE pk.pkkeycol1 IS NULL AND
+        * For MATCH unspecified:
+        *   (fk.keycol1 IS NOT NULL [AND ...])
+        * For MATCH FULL:
+        *   (fk.keycol1 IS NOT NULL [OR ...])
+        *----------
+        */
+ 
+       sprintf(querystr, "SELECT ");
+       sep="";
+       foreach(list, fkconstraint->fk_attrs)
+       {
+               quoteOneName(attname, strVal(lfirst(list)));
+               snprintf(querystr + strlen(querystr), sizeof(querystr) - 
strlen(querystr),
+                                "%sfk.%s", sep, attname);
+               sep = ", ";
+       }
+ 
+       quoteRelationName(pkrelname, pkrel);
+       quoteRelationName(relname, rel);
+       snprintf(querystr + strlen(querystr), sizeof(querystr) - strlen(querystr),
+                        " FROM ONLY %s fk LEFT OUTER JOIN ONLY %s pk ON (",
+                        relname, pkrelname);
+ 
+       sep="";
+       for (list=fkconstraint->pk_attrs, list2=fkconstraint->fk_attrs; 
+                list != NIL && list2 != NIL; 
+                list=lnext(list), list2=lnext(list2))
+       {
+               quoteOneName(attname, strVal(lfirst(list)));
+               quoteOneName(fkattname, strVal(lfirst(list2)));
+               snprintf(querystr + strlen(querystr), sizeof(querystr) - 
strlen(querystr),
+                                "%spk.%s=fk.%s",
+                                sep, attname, fkattname);
+               sep = " AND ";
+       }
+       /*
+        * It's sufficient to test any one pk attribute for null to detect a
+        * join failure.
+        */
+       quoteOneName(attname, strVal(lfirst(fkconstraint->pk_attrs)));
+       snprintf(querystr + strlen(querystr), sizeof(querystr) - strlen(querystr),
+                        ") WHERE pk.%s IS NULL AND (", attname);
+ 
+       sep="";
+       foreach(list, fkconstraint->fk_attrs)
+       {
+               quoteOneName(attname, strVal(lfirst(list)));
+               snprintf(querystr + strlen(querystr), sizeof(querystr) - 
strlen(querystr),
+                                "%sfk.%s IS NOT NULL",
+                                sep, attname);
+               switch (fkconstraint->fk_matchtype)
+               {
+                       case FKCONSTR_MATCH_UNSPECIFIED:
+                               sep=" AND ";
+                               break;
+                       case FKCONSTR_MATCH_FULL:
+                               sep=" OR ";
+                               break;
+                       case FKCONSTR_MATCH_PARTIAL:
+                               ereport(ERROR,
+                                               
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+                                                errmsg("MATCH PARTIAL not yet 
implemented")));
+                               break;
+                       default:
+                               elog(ERROR, "unrecognized match type: %d",
+                                        fkconstraint->fk_matchtype);
+                               break;
+               }
+       }
+       snprintf(querystr + strlen(querystr), sizeof(querystr) - strlen(querystr),
+                        ")");
+ 
+       if (SPI_connect() != SPI_OK_CONNECT)
+               elog(ERROR, "SPI_connect failed");
+ 
+       /*
+        * Generate the plan.  We don't need to cache it, and there are no
+        * arguments to the plan. 
+        */
+       qplan = SPI_prepare(querystr, 0, NULL);
+ 
+       /*
+        * Run the plan.  For safety we force a current query snapshot to be
+        * used.  (In serializable mode, this arguably violates serializability,
+        * but we really haven't got much choice.)  We need at most one tuple
+        * returned, so pass limit = 1.
+        */
+       spi_result = SPI_execp_current(qplan, NULL, NULL, true, 1);
  
+       /* Check result */
+       if (spi_result != SPI_OK_SELECT)
+               elog(ERROR, "SPI_execp_current returned %d", spi_result);
+ 
+       /* Did we find a tuple violating the constraint? */
+       if (SPI_processed > 0)
+       {
+               HeapTuple       tuple = SPI_tuptable->vals[0];
+               TupleDesc       tupdesc = SPI_tuptable->tupdesc;
+               int                     nkeys = length(fkconstraint->fk_attrs);
+               int                     i;
+               RI_QueryKey     qkey;
+ 
+               /*
+                * If it's MATCH FULL, and there are any nulls in the FK keys,
+                * complain about that rather than the lack of a match.  MATCH FULL
+                * disallows partially-null FK rows.
+                */
+               if (fkconstraint->fk_matchtype == FKCONSTR_MATCH_FULL)
+               {
+                       bool    isnull = false;
+ 
+                       for (i = 1; i <= nkeys; i++)
+                       {
+                               (void) SPI_getbinval(tuple, tupdesc, i, &isnull);
+                               if (isnull)
+                                       break;
+                       }
+                       if (isnull)
+                               ereport(ERROR,
+                                               
(errcode(ERRCODE_FOREIGN_KEY_VIOLATION),
+                                                errmsg("insert or update on table 
\"%s\" violates foreign key constraint \"%s\"",
+                                                               
RelationGetRelationName(rel),
+                                                               constrname),
+                                                errdetail("MATCH FULL does not allow 
mixing of null and nonnull key values.")));
+               }
+ 
+               /*
+                * Although we didn't cache the query, we need to set up a fake
+                * query key to pass to ri_ReportViolation.
+                */
+               MemSet(&qkey, 0, sizeof(qkey));
+               qkey.constr_queryno = RI_PLAN_CHECK_LOOKUPPK;
+               qkey.nkeypairs = nkeys;
+               for (i = 0; i < nkeys; i++)
+                       qkey.keypair[i][RI_KEYPAIR_FK_IDX] = i + 1;
+ 
+               ri_ReportViolation(&qkey, constrname,
+                                                  pkrel, rel,
+                                                  tuple, tupdesc,
+                                                  false);
+       }
+ 
+       if (SPI_finish() != SPI_OK_FINISH)
+               elog(ERROR, "SPI_finish failed");
+ 
+       return true;
+ }
  
  
  /* ----------
***************
*** 2905,2910 ****
--- 3102,3108 ----
                ri_ReportViolation(qkey, constrname ? constrname : "",
                                                   pk_rel, fk_rel,
                                                   new_tuple ? new_tuple : old_tuple,
+                                                  NULL,
                                                   true);
  
        /* XXX wouldn't it be clearer to do this part at the caller? */
***************
*** 2913,2918 ****
--- 3111,3117 ----
                ri_ReportViolation(qkey, constrname,
                                                   pk_rel, fk_rel,
                                                   new_tuple ? new_tuple : old_tuple,
+                                                  NULL,
                                                   false);
  
        return SPI_processed != 0;
***************
*** 2950,2956 ****
  static void
  ri_ReportViolation(RI_QueryKey *qkey, const char *constrname,
                                   Relation pk_rel, Relation fk_rel,
!                                  HeapTuple violator, bool spi_err)
  {
  #define BUFLENGTH     512
        char            key_names[BUFLENGTH];
--- 3149,3156 ----
  static void
  ri_ReportViolation(RI_QueryKey *qkey, const char *constrname,
                                   Relation pk_rel, Relation fk_rel,
!                                  HeapTuple violator, TupleDesc tupdesc,
!                                  bool spi_err)
  {
  #define BUFLENGTH     512
        char            key_names[BUFLENGTH];
***************
*** 2958,2964 ****
        char       *name_ptr = key_names;
        char       *val_ptr = key_values;
        bool            onfk;
-       Relation        rel;
        int                     idx,
                                key_idx;
  
--- 3158,3163 ----
***************
*** 2972,2989 ****
                                 errhint("This is most likely due to a rule having 
rewritten the query.")));
  
        /*
!        * rel is set to where the tuple description is coming from.
         */
        onfk = (qkey->constr_queryno == RI_PLAN_CHECK_LOOKUPPK);
        if (onfk)
        {
-               rel = fk_rel;
                key_idx = RI_KEYPAIR_FK_IDX;
        }
        else
        {
-               rel = pk_rel;
                key_idx = RI_KEYPAIR_PK_IDX;
        }
  
        /*
--- 3171,3191 ----
                                 errhint("This is most likely due to a rule having 
rewritten the query.")));
  
        /*
!        * Determine which relation to complain about.  If tupdesc wasn't
!        * passed by caller, assume the violator tuple came from there.
         */
        onfk = (qkey->constr_queryno == RI_PLAN_CHECK_LOOKUPPK);
        if (onfk)
        {
                key_idx = RI_KEYPAIR_FK_IDX;
+               if (tupdesc == NULL)
+                       tupdesc = fk_rel->rd_att;
        }
        else
        {
                key_idx = RI_KEYPAIR_PK_IDX;
+               if (tupdesc == NULL)
+                       tupdesc = pk_rel->rd_att;
        }
  
        /*
***************
*** 3008,3015 ****
                char       *name,
                                   *val;
  
!               name = SPI_fname(rel->rd_att, fnum);
!               val = SPI_getvalue(violator, rel->rd_att, fnum);
                if (!val)
                        val = "null";
  
--- 3210,3217 ----
                char       *name,
                                   *val;
  
!               name = SPI_fname(tupdesc, fnum);
!               val = SPI_getvalue(violator, tupdesc, fnum);
                if (!val)
                        val = "null";
  
*** src/include/commands/trigger.h.orig Sun Aug  3 23:01:31 2003
--- src/include/commands/trigger.h      Sun Oct  5 16:11:44 2003
***************
*** 197,201 ****
--- 197,204 ----
   * in utils/adt/ri_triggers.c
   */
  extern bool RI_FKey_keyequal_upd(TriggerData *trigdata);
+ extern bool RI_Initial_Check(FkConstraint *fkconstraint, 
+                                                        Relation rel, 
+                                                        Relation pkrel);
  
  #endif   /* TRIGGER_H */
---------------------------(end of broadcast)---------------------------
TIP 9: the planner will ignore your desire to choose an index scan if your
      joining column's datatypes do not match

Reply via email to