On Sat, Jan 14, 2012 at 08:18:48PM +0100, Marco Nenciarini wrote:
> This is our latest version of the patch. Gabriele, Gianni and I have
> discussed a lot and decided to send an initial patch which uses EACH
> REFERENCES instead of ARRAY REFERENCES. The reason behind this is that
> ARRAY REFERENCES generates an ambiguous grammar, and we all agreed that
> EACH REFERENCES makes sense (and the same time does not introduce any
> new keyword). This is however open for discussion.

I greatly like that name; it would still make sense for other aggregate types,
should we ever expand its use.  Please complete the name change: the
documentation, catalog entries, etc should all call them something like "each
foreign key constraints" (I don't particularly like that exact wording).

You currently forbid multi-column EACH FKs.  I agree that we should allow only
one array column per FK; with more, the set of required PK rows would be
something like the Cartesian product of the elements of array columns.
However, there are no definitional problems, at least for NO ACTION, around a
FK constraint having one array column and N scalar columns.  Whether or not
you implement that now, let's choose a table_constraint syntax leaving that
opportunity open.  How about:
        FOREIGN KEY(col_a, EACH col_b, col_c) REFERENCES pktable (a, b, c)

You've identified that we cannot generally implement the ON DELETE ARRAY
CASCADE action on multidimensional arrays.  This patch chooses to downgrade to
ON DELETE ARRAY SET NULL in that case.  My initial reaction is that it would
be better to forbid multidimensional arrays in the column when the delete
action is ON DELETE ARRAY SET NULL.  That's not satisfying, either, because it
makes the definition of conforming data depend on the ON DELETE action.  Do we
have other options?

> --------------- --------- ---------
>                |   ON    |   ON    |
> Action         | DELETE  | UPDATE  |
> --------------- --------- ---------
> CASCADE        |   Row   |  Error  |
> SET NULL       |   Row   |   Row   |
> SET DEFAULT    |   Row   |   Row   |
> ARRAY CASCADE  | Element | Element |
> ARRAY SET NULL | Element | Element |
> NO ACTION      |    -    |    -    |
> RESTRICT       |    -    |    -    |
> --------------- --------- --------- 

To complete the ARRAY -> EACH transition, I would suggest names like CASCADE
EACH/SET EACH NULL.

I like the extensive test cases you have included.  There's one more thing
they should do: leave tables having EACH REFERENCES relationships in the
regression database.  This way, pg_dump tests of the regression database will
exercise pg_dump with respect to this feature.

The patch emits several warnings:

heap.c: In function `StoreRelCheck':
heap.c:1947: warning: passing argument 17 of `CreateConstraintEntry' makes 
integer from pointer without a cast
index.c: In function `index_constraint_create':
index.c:1160: warning: passing argument 17 of `CreateConstraintEntry' makes 
integer from pointer without a cast
In file included from gram.y:13051:
scan.c: In function `yy_try_NUL_trans':
scan.c:16243: warning: unused variable `yyg'
trigger.c: In function `CreateTrigger':
trigger.c:454: warning: passing argument 17 of `CreateConstraintEntry' makes 
integer from pointer without a cast
typecmds.c: In function `domainAddConstraint':
typecmds.c:2960: warning: passing argument 17 of `CreateConstraintEntry' makes 
integer from pointer without a cast
arrayfuncs.c: In function `array_remove':
arrayfuncs.c:5197: warning: unused variable `dimresult'
ri_triggers.c: In function `RI_FKey_check':
ri_triggers.c:484: warning: too many arguments for format

This test case, copied from my previous review except for updating the syntax,
still fails:

BEGIN;
CREATE TABLE parent (c int PRIMARY KEY);
CREATE TABLE child  (c int[]);
INSERT INTO parent VALUES (1);
INSERT INTO child VALUES ('{3,1,2}');
ALTER TABLE child ADD FOREIGN KEY (c) EACH REFERENCES parent; -- should error
INSERT INTO child VALUES ('{3,1,2}'); -- does error, as expected
ROLLBACK;

Most of my code comments concern minor matters:

> *** a/doc/src/sgml/ddl.sgml
> --- b/doc/src/sgml/ddl.sgml

> *************** CREATE TABLE order_items (
> *** 852,857 ****
> --- 882,931 ----
>      </para>
>   
>      <para>
> +     When working with foreign key arrays, you have two more
> +     options that can be used with your
> +     <literal>EACH REFERENCES</literal> definition:
> +     <literal>ARRAY CASCADE</literal> and
> +     <literal>ARRAY SET NULL</literal>. Depending on
> +     the triggering action (<command>DELETE</command> or
> +     <command>UPDATE</command>) on the referenced table,
> +     every element in the referencing array will be either
> +     deleted/updated or set to NULL.
> +     For more detailed information on foreign key arrays
> +     options and special cases, please refer to the
> +     documentation for <xref linkend="sql-createtable">.
> +    </para>
> + 
> + <para>
> +     For instance, in the example below, a <command>DELETE</command>
> +     from the <literal>countries</literal> table will remove
> +     the referencing elements in the <literal>citizenship_ids</literal>
> +     array.
> + 
> + <programlisting>
> + CREATE TABLE countries (
> +     country_id integer PRIMARY KEY,
> +     name text,
> +     ...
> + );
> + 
> + CREATE TABLE people (
> +     person_id integer PRIMARY KEY,
> +     first_name text,
> +     last_name text,
> +     ...
> +     citizenship_ids integer[] <emphasis>EACH REFERENCES countries
> +             ON DELETE ARRAY CASCADE ON UPDATE ARRAY CASCADE</emphasis>
> + );
> + </programlisting>    
> + 
> +     Consequently, an <command>UPDATE</command> of
> +     the <literal>country_id</literal> column will be propagated
> +     to any element of the <literal>citizenship_ids</literal>
> +     field in the <literal>people</literal> table.
> +    </para>
> + 
> +    <para>

I would leave off this second example.

> *************** SELECT NULLIF(value, '(none)') ...
> *** 10452,10457 ****
> --- 10480,10490 ----
>      </note>
>   
>      <para>
> +     When using <function>array_remove</function> with multi-dimensional
> +     arrays, elements will be set to NULL as fallback measure.
> +    </para>

If we do keep this behavior, I would give this function a less-natural name
and not document it.

> *** a/doc/src/sgml/ref/create_table.sgml
> --- b/doc/src/sgml/ref/create_table.sgml
> *************** CREATE [ [ GLOBAL | LOCAL ] { TEMPORARY
> *** 51,57 ****
>     DEFAULT <replaceable>default_expr</replaceable> |
>     UNIQUE <replaceable class="PARAMETER">index_parameters</replaceable> |
>     PRIMARY KEY <replaceable class="PARAMETER">index_parameters</replaceable> 
> |
> !   REFERENCES <replaceable class="PARAMETER">reftable</replaceable> [ ( 
> <replaceable class="PARAMETER">refcolumn</replaceable> ) ] [ MATCH FULL | 
> MATCH PARTIAL | MATCH SIMPLE ]
>       [ ON DELETE <replaceable class="parameter">action</replaceable> ] [ ON 
> UPDATE <replaceable class="parameter">action</replaceable> ] }
>   [ DEFERRABLE | NOT DEFERRABLE ] [ INITIALLY DEFERRED | INITIALLY IMMEDIATE ]
>   
> --- 51,57 ----
>     DEFAULT <replaceable>default_expr</replaceable> |
>     UNIQUE <replaceable class="PARAMETER">index_parameters</replaceable> |
>     PRIMARY KEY <replaceable class="PARAMETER">index_parameters</replaceable> 
> |
> !   {EACH} REFERENCES <replaceable class="PARAMETER">reftable</replaceable> [ 
> ( <replaceable class="PARAMETER">refcolumn</replaceable> ) ] [ MATCH FULL | 
> MATCH PARTIAL | MATCH SIMPLE ]
>       [ ON DELETE <replaceable class="parameter">action</replaceable> ] [ ON 
> UPDATE <replaceable class="parameter">action</replaceable> ] }
>   [ DEFERRABLE | NOT DEFERRABLE ] [ INITIALLY DEFERRED | INITIALLY IMMEDIATE ]

Use square brackets, not curly brackets, around optional terms.

>   
> *************** CREATE [ [ GLOBAL | LOCAL ] { TEMPORARY
> *** 62,68 ****
>     UNIQUE ( <replaceable class="PARAMETER">column_name</replaceable> [, ... 
> ] ) <replaceable class="PARAMETER">index_parameters</replaceable> |
>     PRIMARY KEY ( <replaceable class="PARAMETER">column_name</replaceable> [, 
> ... ] ) <replaceable class="PARAMETER">index_parameters</replaceable> |
>     EXCLUDE [ USING <replaceable class="parameter">index_method</replaceable> 
> ] ( <replaceable class="parameter">exclude_element</replaceable> WITH 
> <replaceable class="parameter">operator</replaceable> [, ... ] ) <replaceable 
> class="parameter">index_parameters</replaceable> [ WHERE ( <replaceable 
> class="parameter">predicate</replaceable> ) ] |
> !   FOREIGN KEY ( <replaceable class="PARAMETER">column_name</replaceable> [, 
> ... ] ) REFERENCES <replaceable class="PARAMETER">reftable</replaceable> [ ( 
> <replaceable class="PARAMETER">refcolumn</replaceable> [, ... ] ) ]
>       [ MATCH FULL | MATCH PARTIAL | MATCH SIMPLE ] [ ON DELETE <replaceable 
> class="parameter">action</replaceable> ] [ ON UPDATE <replaceable 
> class="parameter">action</replaceable> ] }
>   [ DEFERRABLE | NOT DEFERRABLE ] [ INITIALLY DEFERRED | INITIALLY IMMEDIATE ]
>   
> --- 62,68 ----
>     UNIQUE ( <replaceable class="PARAMETER">column_name</replaceable> [, ... 
> ] ) <replaceable class="PARAMETER">index_parameters</replaceable> |
>     PRIMARY KEY ( <replaceable class="PARAMETER">column_name</replaceable> [, 
> ... ] ) <replaceable class="PARAMETER">index_parameters</replaceable> |
>     EXCLUDE [ USING <replaceable class="parameter">index_method</replaceable> 
> ] ( <replaceable class="parameter">exclude_element</replaceable> WITH 
> <replaceable class="parameter">operator</replaceable> [, ... ] ) <replaceable 
> class="parameter">index_parameters</replaceable> [ WHERE ( <replaceable 
> class="parameter">predicate</replaceable> ) ] |
> !   FOREIGN KEY ( <replaceable class="PARAMETER">column_name</replaceable> [, 
> ... ] ) {EACH} REFERENCES <replaceable 
> class="PARAMETER">reftable</replaceable> [ ( <replaceable 
> class="PARAMETER">refcolumn</replaceable> [, ... ] ) ]

Likewise.

> *** a/src/backend/catalog/information_schema.sql
> --- b/src/backend/catalog/information_schema.sql

> *************** CREATE VIEW referential_constraints AS
> *** 1173,1183 ****
>   
>              CAST(
>                CASE con.confdeltype WHEN 'c' THEN 'CASCADE'
>                                     WHEN 'n' THEN 'SET NULL'
>                                     WHEN 'd' THEN 'SET DEFAULT'
>                                     WHEN 'r' THEN 'RESTRICT'
>                                     WHEN 'a' THEN 'NO ACTION' END
> !              AS character_data) AS delete_rule
>   
>       FROM (pg_namespace ncon
>             INNER JOIN pg_constraint con ON ncon.oid = con.connamespace
> --- 1175,1189 ----
>   
>              CAST(
>                CASE con.confdeltype WHEN 'c' THEN 'CASCADE'
> +                                   WHEN 'C' THEN 'ARRAY CASCADE'
>                                     WHEN 'n' THEN 'SET NULL'
> +                                   WHEN 'N' THEN 'ARRAY SET NULL'
>                                     WHEN 'd' THEN 'SET DEFAULT'
>                                     WHEN 'r' THEN 'RESTRICT'
>                                     WHEN 'a' THEN 'NO ACTION' END
> !              AS character_data) AS delete_rule,
> ! 
> !            CAST(con.confisarray AS boolean) AS is_array

No need for that cast.

> *** a/src/backend/commands/tablecmds.c
> --- b/src/backend/commands/tablecmds.c

> *************** ATAddForeignKeyConstraint(AlteredTableIn
> *** 5688,5693 ****
> --- 5689,5728 ----
>                               (errcode(ERRCODE_INVALID_FOREIGN_KEY),
>                                errmsg("number of referencing and referenced 
> columns for foreign key disagree")));
>   
> +     /* Enforce foreign key array restrictions */
> +     if (fkconstraint->fk_array)
> +     {
> +             /*
> +              * Foreign key array must not be part of a multi-column foreign 
> key
> +              */
> +             if (numpks > 1)
> +                     ereport(ERROR,
> +                             (errcode(ERRCODE_INVALID_FOREIGN_KEY),
> +                             errmsg("foreign key arrays must not be part of 
> a multi-column foreign key")));
> + 
> +             /*
> +              * ON UPDATE CASCADE action is not supported on FKA
> +              */
> +             if (fkconstraint->fk_upd_action == FKCONSTR_ACTION_CASCADE)
> +                     ereport(ERROR,
> +                             (errcode(ERRCODE_INVALID_FOREIGN_KEY),
> +                             errmsg("ON UPDATE CASCADE action is not 
> supported on foreign key arrays")));

Add a HINT about using ARRAY CASCADE.

> *************** ATAddForeignKeyConstraint(AlteredTableIn
> *** 5736,5775 ****
>                                eqstrategy, opcintype, opcintype, opfamily);
>   
>               /*
> !              * Are there equality operators that take exactly the FK type? 
> Assume
> !              * we should look through any domain here.
>                */
> !             fktyped = getBaseType(fktype);
>   
> !             pfeqop = get_opfamily_member(opfamily, opcintype, fktyped,
> !                                                                      
> eqstrategy);
> !             if (OidIsValid(pfeqop))
> !                     ffeqop = get_opfamily_member(opfamily, fktyped, fktyped,
>                                                                               
>  eqstrategy);
> -             else
> -                     ffeqop = InvalidOid;    /* keep compiler quiet */
>   
> !             if (!(OidIsValid(pfeqop) && OidIsValid(ffeqop)))
>               {
>                       /*
> !                      * Otherwise, look for an implicit cast from the FK 
> type to the
> !                      * opcintype, and if found, use the primary equality 
> operator.
> !                      * This is a bit tricky because opcintype might be a 
> polymorphic
> !                      * type such as ANYARRAY or ANYENUM; so what we have to 
> test is
> !                      * whether the two actual column types can be 
> concurrently cast to
> !                      * that type.  (Otherwise, we'd fail to reject 
> combinations such
> !                      * as int[] and point[].)
>                        */
> !                     Oid                     input_typeids[2];
> !                     Oid                     target_typeids[2];
>   
> !                     input_typeids[0] = pktype;
> !                     input_typeids[1] = fktype;
> !                     target_typeids[0] = opcintype;
> !                     target_typeids[1] = opcintype;
> !                     if (can_coerce_type(2, input_typeids, target_typeids,
> !                                                             
> COERCION_IMPLICIT))
> !                             pfeqop = ffeqop = ppeqop;
>               }
>   
>               if (!(OidIsValid(pfeqop) && OidIsValid(ffeqop)))
> --- 5771,5861 ----
>                                eqstrategy, opcintype, opcintype, opfamily);
>   
>               /*
> !              *  Discover the equality operators
>                */
> !             if (fkconstraint->fk_array)
> !             {
> !                     /*
> !                      * Are there equality operators that take exactly the 
> FK element type?
> !                      * Assume we should look through any domain here.
> !                      */
> !                     Oid fk_element_type=get_base_element_type(fktype);
> !                     if (!OidIsValid(fk_element_type))
> !                             ereport(ERROR,
> !                                     (errcode(ERRCODE_DATATYPE_MISMATCH),
> !                                      errmsg("foreign key constraint \"%s\" "
> !                                                     "cannot be implemented",
> !                                                     fkconstraint->conname),
> !                                      errdetail("Key columns \"%s\" is not 
> an array type: %s",
> !                                                        
> strVal(list_nth(fkconstraint->fk_attrs, i)),
> !                                                        
> format_type_be(fktype))));

Use a detail message like "Type of key column "%s" is not an array type: %s".

>   
> !                     ffeqop = ARRAY_EQ_OP;
> ! 
> !                     pfeqop = get_opfamily_member(opfamily, opcintype, 
> fk_element_type,
>                                                                               
>  eqstrategy);
>   
> !                     if (!(OidIsValid(pfeqop)))
> !                     {
> !                             /*
> !                              * Otherwise, look for an implicit cast from 
> the FK type to the
> !                              * opcintype, and if found, use the primary 
> equality operator.
> !                              * This is a bit tricky because opcintype might 
> be a polymorphic
> !                              * type such as ANYARRAY or ANYENUM; so what we 
> have to test is
> !                              * whether the two actual column types can be 
> concurrently cast to
> !                              * that type.  (Otherwise, we'd fail to reject 
> combinations such
> !                              * as int[] and point[].)
> !                              */
> !                             Oid                     input_typeids[2];
> !                             Oid                     target_typeids[2];
> ! 
> !                             input_typeids[0] = pktype;
> !                             input_typeids[1] = fk_element_type;
> !                             target_typeids[0] = opcintype;
> !                             target_typeids[1] = opcintype;
> !                             if (can_coerce_type(2, input_typeids, 
> target_typeids,
> !                                                                     
> COERCION_IMPLICIT))
> !                                     pfeqop = ppeqop;
> !                     }
> !             }
> !             else
>               {
>                       /*
> !                      * Are there equality operators that take exactly the 
> FK type? Assume
> !                      * we should look through any domain here.
>                        */
> !                     fktyped = getBaseType(fktype);
>   
> !                     pfeqop = get_opfamily_member(opfamily, opcintype, 
> fktyped,
> !                                                                             
>  eqstrategy);
> !                     if (OidIsValid(pfeqop))
> !                             ffeqop = get_opfamily_member(opfamily, fktyped, 
> fktyped,
> !                                                                             
>          eqstrategy);
> !                     else
> !                             ffeqop = InvalidOid;    /* keep compiler quiet 
> */
> ! 
> !                     if (!(OidIsValid(pfeqop) && OidIsValid(ffeqop)))
> !                     {
> !                             /*
> !                              * Otherwise, look for an implicit cast from 
> the FK type to the
> !                              * opcintype, and if found, use the primary 
> equality operator.
> !                              * This is a bit tricky because opcintype might 
> be a polymorphic
> !                              * type such as ANYARRAY or ANYENUM; so what we 
> have to test is
> !                              * whether the two actual column types can be 
> concurrently cast to
> !                              * that type.  (Otherwise, we'd fail to reject 
> combinations such
> !                              * as int[] and point[].)
> !                              */
> !                             Oid                     input_typeids[2];
> !                             Oid                     target_typeids[2];
> ! 
> !                             input_typeids[0] = pktype;
> !                             input_typeids[1] = fktype;
> !                             target_typeids[0] = opcintype;
> !                             target_typeids[1] = opcintype;
> !                             if (can_coerce_type(2, input_typeids, 
> target_typeids,
> !                                                                     
> COERCION_IMPLICIT))
> !                                     pfeqop = ffeqop = ppeqop;
> !                     }
>               }

Please reduce the level of textual code duplication here.

> *** a/src/backend/commands/trigger.c
> --- b/src/backend/commands/trigger.c

> *************** ConvertTriggerToFK(CreateTrigStmt *stmt,
> *** 861,876 ****
> --- 862,881 ----
>       switch (funcoid)
>       {
>               case F_RI_FKEY_CASCADE_UPD:
> +             case F_RI_FKEY_ARRCASCADE_UPD:
>               case F_RI_FKEY_RESTRICT_UPD:
>               case F_RI_FKEY_SETNULL_UPD:
> +             case F_RI_FKEY_ARRSETNULL_UPD:
>               case F_RI_FKEY_SETDEFAULT_UPD:
>               case F_RI_FKEY_NOACTION_UPD:
>                       funcnum = 0;
>                       break;
>   
>               case F_RI_FKEY_CASCADE_DEL:
> +             case F_RI_FKEY_ARRCASCADE_DEL:
>               case F_RI_FKEY_RESTRICT_DEL:
>               case F_RI_FKEY_SETNULL_DEL:
> +             case F_RI_FKEY_ARRSETNULL_DEL:
>               case F_RI_FKEY_SETDEFAULT_DEL:
>               case F_RI_FKEY_NOACTION_DEL:
>                       funcnum = 1;

We don't need to support these clauses in ConvertTriggerToFK(); no ancient
dumps will bear them.  A comment would be enough.  On the other hand, your
changes here are simple enough.  Maybe it's better to add the support as you
have than to explain why it's absent.

> *** a/src/backend/utils/adt/arrayfuncs.c
> --- b/src/backend/utils/adt/arrayfuncs.c
> *************** array_unnest(PG_FUNCTION_ARGS)
> *** 5174,5176 ****
> --- 5174,5599 ----
>               SRF_RETURN_DONE(funcctx);
>       }
>   }
> + 
> + /*
> +  * Remove any occurrence of an element from an array
> +  *
> +  * If used on a multi-dimensional array the matching elements will be 
> replaced with NULLs as fallback.
> +  *
> +  */
> + Datum
> + array_remove(PG_FUNCTION_ARGS)
> + {
> +     ArrayType  *v;
> +     Datum           old_value = PG_GETARG_DATUM(1);
> +     Oid                     element_type;
> +     ArrayType  *result;
> +     Datum      *values;
> +     bool       *nulls;
> +     Datum           elt;
> +     int                     ndim;
> +     int                *dim;
> +     int                     nitems;
> +     int                *dimresult;
> +     int                     nresult;
> +     int                     i;
> +     int32           nbytes = 0;
> +     int32           dataoffset;
> +     bool            hasnulls;
> +     int                     typlen;
> +     bool            typbyval;
> +     char            typalign;
> +     char       *s;
> +     bits8      *bitmap;
> +     int                     bitmask;
> +     Oid                     collation = PG_GET_COLLATION();
> +     TypeCacheEntry *typentry;
> +     FunctionCallInfoData locfcinfo;
> + 
> +     /*
> +      * If the first argument is null
> +      * return NULL
> +      */
> +     if (PG_ARGISNULL(0))
> +             PG_RETURN_NULL();
> + 
> +     v = PG_GETARG_ARRAYTYPE_P(0);
> + 
> +     /*
> +      * If second argument is NULL, no match is possible
> +      * so return the first argument unchanged
> +      */
> +     if (PG_ARGISNULL(1))
> +             PG_RETURN_ARRAYTYPE_P(v);
> + 
> +     ndim = ARR_NDIM(v);
> + 
> +     /*
> +      * If used on a multi-dimensional array the matching elements
> +      * will be replaced with NULLs as fallback.
> +      */
> +     if (ndim > 1) {
> +             fcinfo->nargs = 3;
> +             fcinfo->argnull[2]=true;
> +             return array_replace(fcinfo);
> +     }
> + 
> +     dim = ARR_DIMS(v);
> +     element_type = ARR_ELEMTYPE(v);
> +     nitems = ArrayGetNItems(ndim, dim);
> + 
> +     /* Check for empty array */
> +     if (nitems <= 0)
> +     {
> +             /* Return empty array */
> +             PG_RETURN_ARRAYTYPE_P(construct_empty_array(element_type));
> +     }
> + 
> +     /*
> +      * We arrange to look up the equality function only once per series of
> +      * calls, assuming the element type doesn't change underneath us.  The
> +      * typcache is used so that we have no memory leakage when being used
> +      * as an index support function.
> +      */

The second sentence of the comment does not apply in this function.

> +     typentry = (TypeCacheEntry *) fcinfo->flinfo->fn_extra;
> +     if (typentry == NULL ||
> +             typentry->type_id != element_type)
> +     {
> +             typentry = lookup_type_cache(element_type,
> +                                                                      
> TYPECACHE_EQ_OPR_FINFO);
> +             if (!OidIsValid(typentry->eq_opr_finfo.fn_oid))
> +                     ereport(ERROR,
> +                                     (errcode(ERRCODE_UNDEFINED_FUNCTION),
> +                     errmsg("could not identify an equality operator for 
> type %s",
> +                                format_type_be(element_type))));
> +             fcinfo->flinfo->fn_extra = (void *) typentry;
> +     }
> +     typlen = typentry->typlen;
> +     typbyval = typentry->typbyval;
> +     typalign = typentry->typalign;
> + 
> +     /*
> +      * apply the operator to each pair of array elements.
> +      */
> +     InitFunctionCallInfoData(locfcinfo, &typentry->eq_opr_finfo, 2,
> +                                                      collation, NULL, NULL);
> + 
> +     /* Allocate temporary arrays for new values */
> +     values = (Datum *) palloc(nitems * sizeof(Datum));
> +     nulls = (bool *) palloc(nitems * sizeof(bool));
> + 
> +     /* Loop over source data */
> +     s = ARR_DATA_PTR(v);
> +     bitmap = ARR_NULLBITMAP(v);
> +     bitmask = 1;
> +     hasnulls = false;
> +     nresult=0;
> + 
> +     for (i = 0; i < nitems; i++)
> +     {
> +             bool            isNull;
> +             bool            oprresult;
> +             bool            skip;
> + 
> +             /* Get source element, checking for NULL */
> +             if (bitmap && (*bitmap & bitmask) == 0)
> +             {
> +                     isNull = true;
> +                     skip = false;
> +             }
> +             else
> +             {
> +                     elt = fetch_att(s, typbyval, typlen);
> +                     s = att_addlength_datum(s, typlen, elt);
> +                     s = (char *) att_align_nominal(s, typalign);
> +                     isNull = false;
> + 
> +                     /*
> +                      * Apply the operator to the element pair
> +                      */
> +                     locfcinfo.arg[0] = elt;
> +                     locfcinfo.arg[1] = old_value;
> +                     locfcinfo.argnull[0] = false;
> +                     locfcinfo.argnull[1] = false;
> +                     locfcinfo.isnull = false;
> +                     oprresult = 
> DatumGetBool(FunctionCallInvoke(&locfcinfo));
> +                     if (!oprresult)
> +                     {
> +                             values[nresult] = elt;
> +                             skip = false;
> +                     }
> +                     else {
> +                             skip = true;
> +                     }

Remove braces.

> +             }
> + 
> +             if (!skip)
> +             {
> +                     nulls[nresult] = isNull;
> +                     if (isNull)
> +                             hasnulls = true;
> +                     else
> +                     {
> +                             /* Ensure data is not toasted */
> +                             if (typlen == -1)
> +                                     values[nresult] = 
> PointerGetDatum(PG_DETOAST_DATUM(values[nresult]));

Shouldn't be needed; we just pulled the values from an array.

> +                             /* Update total result size */
> +                             nbytes = att_addlength_datum(nbytes, typlen, 
> values[nresult]);
> +                             nbytes = att_align_nominal(nbytes, typalign);
> +                             /* check for overflow of total request */
> +                             if (!AllocSizeIsValid(nbytes))
> +                                     ereport(ERROR,
> +                                                     
> (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
> +                                                      errmsg("array size 
> exceeds the maximum allowed (%d)",
> +                                                                     (int) 
> MaxAllocSize)));

This test should never fail.  Either convert it to an Assert or just add a
comment to that effect.

> +                     }
> +                     nresult++;
> +             }
> + 
> +             /* advance bitmap pointer if any */
> +             if (bitmap)
> +             {
> +                     bitmask <<= 1;
> +                     if (bitmask == 0x100)
> +                     {
> +                             bitmap++;
> +                             bitmask = 1;
> +                     }
> +             }
> +     }
> + 
> +     /* Allocate and initialize the result array */

Is it worth tracking whether we didn't find anything to remove and just
returning the old array in that case?

> +     if (hasnulls)
> +     {
> +             dataoffset = ARR_OVERHEAD_WITHNULLS(ndim, nresult);
> +             nbytes += dataoffset;
> +     }
> +     else
> +     {
> +             dataoffset = 0;                 /* marker for no null bitmap */
> +             nbytes += ARR_OVERHEAD_NONULLS(ndim);
> +     }
> +     result = (ArrayType *) palloc0(nbytes);
> +     SET_VARSIZE(result, nbytes);
> +     result->ndim = ndim;
> +     result->dataoffset = dataoffset;
> +     result->elemtype = element_type;
> +     memcpy(ARR_DIMS(result), ARR_DIMS(v), 2 * ndim * sizeof(int));
> + 
> +     /* Adjust the final length */
> +     ARR_DIMS(result)[0] = nresult;
> + 
> +     /*
> +      * Note: do not risk trying to pfree the results of the called function
> +      */

The comment does not apply here; a comparison function has no result to free.

> +     CopyArrayEls(result,
> +                              values, nulls, nresult,
> +                              typlen, typbyval, typalign,
> +                              false);
> + 
> +     pfree(values);
> +     pfree(nulls);
> + 
> +     PG_RETURN_ARRAYTYPE_P(result);
> + }
> + 
> + /*
> +  * Replace any occurrence of an element in an array
> +  */
> + Datum
> + array_replace(PG_FUNCTION_ARGS)
> + {
> +     ArrayType  *v;
> +     Datum           old_value = PG_GETARG_DATUM(1);
> +     Datum           new_value = PG_GETARG_DATUM(2);
> +     bool            new_value_isnull = PG_ARGISNULL(2);
> +     Oid                     element_type;
> +     ArrayType  *result;
> +     Datum      *values;
> +     bool       *nulls;
> +     Datum           elt;
> +     int                *dim;
> +     int                     ndim;
> +     int                     nitems;
> +     int                     i;
> +     int32           nbytes = 0;
> +     int32           dataoffset;
> +     bool            hasnulls;
> +     int                     typlen;
> +     bool            typbyval;
> +     char            typalign;
> +     char       *s;
> +     bits8      *bitmap;
> +     int                     bitmask;
> +     Oid                     collation = PG_GET_COLLATION();
> +     TypeCacheEntry *typentry;
> +     FunctionCallInfoData locfcinfo;
> + 
> +     /*
> +      * If the first argument is null
> +      * return NULL
> +      */
> +     if (PG_ARGISNULL(0))
> +             PG_RETURN_NULL();
> + 
> +     v = PG_GETARG_ARRAYTYPE_P(0);
> + 
> +     /*
> +      * If second argument is NULL, no match is possible
> +      * so return the first argument unchanged
> +      */
> +     if (PG_ARGISNULL(1))
> +             PG_RETURN_ARRAYTYPE_P(v);
> + 
> +     ndim = ARR_NDIM(v);
> +     dim = ARR_DIMS(v);
> +     element_type = ARR_ELEMTYPE(v);
> +     nitems = ArrayGetNItems(ndim, dim);
> + 
> +     /* Check for empty array */
> +     if (nitems <= 0)
> +     {
> +             /* Return empty array */
> +             PG_RETURN_ARRAYTYPE_P(construct_empty_array(element_type));
> +     }
> + 
> +     /*
> +      * We arrange to look up the equality function only once per series of
> +      * calls, assuming the element type doesn't change underneath us.  The
> +      * typcache is used so that we have no memory leakage when being used
> +      * as an index support function.
> +      */

The second sentence does not apply here, either.

> +     typentry = (TypeCacheEntry *) fcinfo->flinfo->fn_extra;
> +     if (typentry == NULL ||
> +             typentry->type_id != element_type)
> +     {
> +             typentry = lookup_type_cache(element_type,
> +                                                                      
> TYPECACHE_EQ_OPR_FINFO);
> +             if (!OidIsValid(typentry->eq_opr_finfo.fn_oid))
> +                     ereport(ERROR,
> +                                     (errcode(ERRCODE_UNDEFINED_FUNCTION),
> +                     errmsg("could not identify an equality operator for 
> type %s",
> +                                format_type_be(element_type))));
> +             fcinfo->flinfo->fn_extra = (void *) typentry;
> +     }
> +     typlen = typentry->typlen;
> +     typbyval = typentry->typbyval;
> +     typalign = typentry->typalign;
> + 
> +     /*
> +      * apply the operator to each pair of array elements.
> +      */
> +     InitFunctionCallInfoData(locfcinfo, &typentry->eq_opr_finfo, 2,
> +                                                      collation, NULL, NULL);
> + 
> +     /* Allocate temporary arrays for new values */
> +     values = (Datum *) palloc(nitems * sizeof(Datum));
> +     nulls = (bool *) palloc(nitems * sizeof(bool));
> + 
> +     /* Loop over source data */
> +     s = ARR_DATA_PTR(v);
> +     bitmap = ARR_NULLBITMAP(v);
> +     bitmask = 1;
> +     hasnulls = false;
> + 
> +     for (i = 0; i < nitems; i++)
> +     {
> +             bool            isNull;
> +             bool            oprresult;
> + 
> +             /* Get source element, checking for NULL */
> +             if (bitmap && (*bitmap & bitmask) == 0)
> +             {
> +                     isNull = true;
> +             }
> +             else
> +             {
> +                     elt = fetch_att(s, typbyval, typlen);
> +                     s = att_addlength_datum(s, typlen, elt);
> +                     s = (char *) att_align_nominal(s, typalign);
> +                     isNull = false;
> + 
> +                     /*
> +                      * Apply the operator to the element pair
> +                      */
> +                     locfcinfo.arg[0] = elt;
> +                     locfcinfo.arg[1] = old_value;
> +                     locfcinfo.argnull[0] = false;
> +                     locfcinfo.argnull[1] = false;
> +                     locfcinfo.isnull = false;
> +                     oprresult = 
> DatumGetBool(FunctionCallInvoke(&locfcinfo));
> +                     if (!oprresult)
> +                     {
> +                             values[i] = elt;
> +                     }
> +                     else if (!new_value_isnull)
> +                     {
> +                             values[i] = new_value;
> +                     }
> +                     else {
> +                             isNull = true;
> +                     }

Remove the braces around single-statement blocks.

> +             }
> + 
> +             nulls[i] = isNull;
> +             if (isNull)
> +                     hasnulls = true;
> +             else
> +             {
> +                     /* Ensure data is not toasted */
> +                     if (typlen == -1)
> +                             values[i] = 
> PointerGetDatum(PG_DETOAST_DATUM(values[i]));

The only value that might need a detoast is new_value.  Do so once at the top
of the function.

> +                     /* Update total result size */
> +                     nbytes = att_addlength_datum(nbytes, typlen, values[i]);
> +                     nbytes = att_align_nominal(nbytes, typalign);
> +                     /* check for overflow of total request */
> +                     if (!AllocSizeIsValid(nbytes))
> +                             ereport(ERROR,
> +                                             
> (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
> +                                              errmsg("array size exceeds the 
> maximum allowed (%d)",
> +                                                             (int) 
> MaxAllocSize)));
> +             }
> + 
> +             /* advance bitmap pointer if any */
> +             if (bitmap)
> +             {
> +                     bitmask <<= 1;
> +                     if (bitmask == 0x100)
> +                     {
> +                             bitmap++;
> +                             bitmask = 1;
> +                     }
> +             }
> +     }
> + 
> +     /* Allocate and initialize the result array */
> +     if (hasnulls)
> +     {
> +             dataoffset = ARR_OVERHEAD_WITHNULLS(ndim, nitems);
> +             nbytes += dataoffset;
> +     }
> +     else
> +     {
> +             dataoffset = 0;                 /* marker for no null bitmap */
> +             nbytes += ARR_OVERHEAD_NONULLS(ndim);
> +     }
> +     result = (ArrayType *) palloc0(nbytes);
> +     SET_VARSIZE(result, nbytes);
> +     result->ndim = ndim;
> +     result->dataoffset = dataoffset;
> +     result->elemtype = element_type;
> +     memcpy(ARR_DIMS(result), ARR_DIMS(v), 2 * ndim * sizeof(int));
> + 
> +     /*
> +      * Note: do not risk trying to pfree the results of the called function
> +      */

The comment does not apply here; a comparison function has no result to free.

> +     CopyArrayEls(result,
> +                              values, nulls, nitems,
> +                              typlen, typbyval, typalign,
> +                              false);
> + 
> +     pfree(values);
> +     pfree(nulls);
> + 
> +     PG_RETURN_ARRAYTYPE_P(result);
> + }
> diff --git a/src/backend/utils/adt/ri_triggers.c 
> b/src/backend/utils/adt/ri_triggers.c
> index 03a974a..58a92e1 100644
> *** a/src/backend/utils/adt/ri_triggers.c
> --- b/src/backend/utils/adt/ri_triggers.c

> *************** typedef struct RI_ConstraintInfo
> *** 106,111 ****
> --- 110,116 ----
>       NameData        conname;                /* name of the FK constraint */
>       Oid                     pk_relid;               /* referenced relation 
> */
>       Oid                     fk_relid;               /* referencing relation 
> */
> +     bool            confisarray;

Comment that member.

> + Datum
> + RI_FKey_arrcascade_upd(PG_FUNCTION_ARGS)
> + {

These belong earlier in the file, adjacent to the other PK trigger functions.

> +                             /* ----------
> +                              * The query string built is
> +                              *      UPDATE ONLY <fktable> SET fkatt1 = 
> array_replace(fkatt1, $n, $1) [, ...]
> +                              *                      WHERE $n = fkatt1 [AND 
> ...]
> +                              * The type id's for the $ parameters are those 
> of the
> +                              * corresponding PK attributes.
> +                              * ----------
> +                              */

RI_FKey_cascade_upd() has this to say at the corresponding juncture:

                                /* ----------
                                 * The query string built is
                                 *      UPDATE ONLY <fktable> SET fkatt1 = $1 
[, ...]
                                 *                      WHERE $n = fkatt1 [AND 
...]
                                 * The type id's for the $ parameters are those 
of the
                                 * corresponding PK attributes.  Note that we 
are assuming
                                 * there is an assignment cast from the PK to 
the FK type;
                                 * else the parser will fail.
                                 * ----------
                                 */

Since we're matching a polymorphic function here, the types of the second and
third arguments must precisely match the element type of the first argument.
Even when an implicit cast exists, we must insert cast syntax.  Test case:

BEGIN;
CREATE TABLE parent (c smallint PRIMARY KEY);
CREATE TABLE child  (c int[] EACH REFERENCES parent
        ON UPDATE ARRAY CASCADE
        ON DELETE ARRAY CASCADE);
INSERT INTO parent VALUES (1), (2);
INSERT INTO child VALUES ('{1,2}');
UPDATE parent SET c = 3 WHERE c = 2;
DELETE FROM parent WHERE c = 1;
ROLLBACK;

> +                             initStringInfo(&querybuf);
> +                             initStringInfo(&qualbuf);
> +                             quoteRelationName(fkrelname, fk_rel);
> +                             appendStringInfo(&querybuf, "UPDATE ONLY %s 
> SET", fkrelname);
> +                             querysep = "";
> +                             qualsep = "WHERE";
> +                             for (i = 0, j = riinfo.nkeys; i < riinfo.nkeys; 
> i++, j++)
> +                             {
> +                                     Oid                     pk_type = 
> RIAttType(pk_rel, riinfo.pk_attnums[i]);
> +                                     Oid                     fk_type = 
> RIAttType(fk_rel, riinfo.fk_attnums[i]);
> + 
> +                                     quoteOneName(attname,
> +                                                              
> RIAttName(fk_rel, riinfo.fk_attnums[i]));
> +                                     appendStringInfo(&querybuf,
> +                                                                      "%s %s 
> = array_replace(%s, $%d, $%d)",
> +                                                                      
> querysep, attname, attname, j + 1, i + 1);

Break this line to keep it within 78 columns.  Otherwise, pgindent will
reverse-indent it.  There are some other examples in your patch.  Please run
your patch through this command, inspect the output, and fix any that don't
belong:

        <patchfile expand -t4 | awk 'length > 78'

> *** a/src/include/catalog/pg_constraint.h
> --- b/src/include/catalog/pg_constraint.h
> *************** CATALOG(pg_constraint,2606)
> *** 78,83 ****
> --- 78,84 ----
>        * constraint.  Otherwise confrelid is 0 and the char fields are spaces.
>        */
>       Oid                     confrelid;              /* relation referenced 
> by foreign key */
> +     bool            confisarray;    /* true if an EACH REFERENCE foreign 
> key */
>       char            confupdtype;    /* foreign key's ON UPDATE action */
>       char            confdeltype;    /* foreign key's ON DELETE action */
>       char            confmatchtype;  /* foreign key's match type */

Putting the field at this location adds three bytes of padding.  Please locate
it elsewhere to avoid that.

> *** a/src/include/catalog/pg_proc.h
> --- b/src/include/catalog/pg_proc.h

> *************** DESCR("referential integrity ON DELETE N
> *** 1976,1981 ****
> --- 1981,1995 ----
>   DATA(insert OID = 1655 (  RI_FKey_noaction_upd PGNSP PGUID 12 1 0 0 0 f f f 
> t f v 0 0 2279 "" _null_ _null_ _null_ _null_ RI_FKey_noaction_upd _null_ 
> _null_ _null_ ));
>   DESCR("referential integrity ON UPDATE NO ACTION");
>   
> + DATA(insert OID = 3144 (  RI_FKey_arrcascade_del    PGNSP PGUID 12 1 0 0 0 
> f f f t f v 0 0 2279 "" _null_ _null_ _null_ _null_ RI_FKey_arrcascade_del 
> _null_ _null_ _null_ ));
> + DESCR("referential integrity ON DELETE CASCADE");
> + DATA(insert OID = 3145 (  RI_FKey_arrcascade_upd    PGNSP PGUID 12 1 0 0 0 
> f f f t f v 0 0 2279 "" _null_ _null_ _null_ _null_ RI_FKey_arrcascade_upd 
> _null_ _null_ _null_ ));
> + DESCR("referential integrity ON UPDATE CASCADE");
> + DATA(insert OID = 3146 (  RI_FKey_arrsetnull_del    PGNSP PGUID 12 1 0 0 0 
> f f f t f v 0 0 2279 "" _null_ _null_ _null_ _null_ RI_FKey_arrsetnull_del 
> _null_ _null_ _null_ ));
> + DESCR("referential integrity ON DELETE SET NULL");
> + DATA(insert OID = 3147 (  RI_FKey_arrsetnull_upd    PGNSP PGUID 12 1 0 0 0 
> f f f t f v 0 0 2279 "" _null_ _null_ _null_ _null_ RI_FKey_arrsetnull_upd 
> _null_ _null_ _null_ ));
> + DESCR("referential integrity ON UPDATE SET NULL");

Those descriptions need to reflect the actual clauses involved.

Thanks,
nm

-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to