Changeset: 49dcdaff8bf0 for MonetDB
URL: https://dev.monetdb.org/hg/MonetDB/rev/49dcdaff8bf0
Modified Files:
        sql/backends/monet5/rel_bin.c
        sql/include/sql_catalog.h
        sql/server/rel_schema.c
        sql/server/sql_mvc.c
        sql/server/sql_mvc.h
        sql/server/sql_parser.y
        sql/server/sql_partition.c
        sql/storage/sql_storage.h
        sql/storage/store.c
Branch: check
Log Message:

implement CHECK constraint as a key type instead of as a column property


diffs (truncated from 695 to 300 lines):

diff --git a/sql/backends/monet5/rel_bin.c b/sql/backends/monet5/rel_bin.c
--- a/sql/backends/monet5/rel_bin.c
+++ b/sql/backends/monet5/rel_bin.c
@@ -4923,7 +4923,7 @@ sql_insert_triggers(backend *be, sql_tab
 }
 
 static void
-sql_insert_check(backend *be, sql_table *t, sql_rel *inserts, list *refs)
+sql_insert_check(backend *be, sql_key *key, sql_rel *inserts, list *refs)
 {
        mvc *sql = be->mvc;
        node *m, *n;
@@ -4933,25 +4933,23 @@ sql_insert_check(backend *be, sql_table 
 
        sql_subtype *bt = sql_bind_localtype("bit");
 
-       for (n = ol_first_node(t->columns), m = exps->h; n && m;
+       for (n = key->columns->h, m = exps->h; n && m;
                n = n->next, m = m->next) {
                sql_exp *i = m->data;
-               sql_column *c = n->data;
-               if (c->check) {
-                       i->alias.rname= sa_strdup(sql->sa, t->base.name);
-                       i->alias.name= sa_strdup(sql->sa, c->base.name);
-
-                       int pos = 0;
-                       sql_rel* rel = rel_read(sql, sa_strdup(sql->sa, 
c->check), &pos, sa_list(sql->sa));
-                       rel->l = inserts;
-                       stmt* s = subrel_bin(be, rel, refs);
-                       s = stmt_uselect(be, column(be, s), stmt_atom(be, 
atom_zero_value(sql->sa, bt)), cmp_equal, NULL, 0, 1);
-                       sql_subfunc *cnt = sql_bind_func(sql, "sys", "count", 
sql_bind_localtype("void"), NULL, F_AGGR, true, true);
-                       s = stmt_aggr(be, s, NULL, NULL, cnt, 1, 0, 1);
-                       char *msg = sa_message(sql->sa, SQLSTATE(40002) "INSERT 
INTO: CHECK constraint violated for column %s.%s", c->t->base.name, 
c->base.name);
-                       (void)stmt_exception(be, s, msg, 00001);
-               }
-       }
+               sql_column *c = ((sql_kc*) n->data)->c;
+               i->alias.rname= sa_strdup(sql->sa, c->t->base.name);
+               i->alias.name= sa_strdup(sql->sa, c->base.name);
+       }
+
+       int pos = 0;
+       sql_rel* rel = rel_read(sql, sa_strdup(sql->sa, key->check), &pos, 
sa_list(sql->sa));
+       rel->l = inserts;
+       stmt* s = subrel_bin(be, rel, refs);
+       s = stmt_uselect(be, column(be, s), stmt_atom(be, 
atom_zero_value(sql->sa, bt)), cmp_equal, NULL, 0, 1);
+       sql_subfunc *cnt = sql_bind_func(sql, "sys", "count", 
sql_bind_localtype("void"), NULL, F_AGGR, true, true);
+       s = stmt_aggr(be, s, NULL, NULL, cnt, 1, 0, 1);
+       char *msg = sa_message(sql->sa, SQLSTATE(40002) "INSERT INTO: CHECK 
constraint violated: %s", key->base.name);
+       (void)stmt_exception(be, s, msg, 00001);
 }
 
 static sql_table *
@@ -5032,7 +5030,11 @@ rel2bin_insert(backend *be, sql_rel *rel
        if (idx_ins)
                pin = refs_find_rel(refs, prel);
 
-       sql_insert_check(be, t, rel->r, refs);
+       for (n = ol_first_node(t->keys); n; n = n->next) {
+               sql_key * key = n->data;
+               if (key->type == ckey)
+                       sql_insert_check(be, key, rel->r, refs);
+       }
 
        if (!sql_insert_check_null(be, t, inserts->op4.lval))
                return NULL;
@@ -5916,28 +5918,22 @@ sql_update_triggers(backend *be, sql_tab
 }
 
 static void
-sql_update_check(backend *be, sql_table *t, sql_rel *rupdates, stmt **updates, 
list *refs)
+sql_update_check(backend *be, sql_key * key, sql_rel *rupdates, list *refs)
 {
+       /*  TODO: this won't work for general table check constraints involving 
updates to a strict subset of check columns*/
        mvc *sql = be->mvc;
-       node *n;
        sql_subfunc *cnt = sql_bind_func(sql, "sys", "count", 
sql_bind_localtype("void"), NULL, F_AGGR, true, true);
        sql_subtype *bt = sql_bind_localtype("bit");
 
-       for (n = ol_first_node(t->columns); n; n = n->next) {
-               sql_column *c = n->data;
-
-               if (updates[c->colnr] && c->check) {
-
-                       int pos = 0;
-                       sql_rel* rel = rel_read(sql, sa_strdup(sql->sa, 
c->check), &pos, sa_list(sql->sa));
-                       rel->l = rupdates;
-                       stmt* s = subrel_bin(be, rel, refs);
-                       s = stmt_uselect(be, column(be, s), stmt_atom(be, 
atom_zero_value(sql->sa, bt)), cmp_equal, NULL, 0, 1);
-                       s = stmt_aggr(be, s, NULL, NULL, cnt, 1, 0, 1);
-                       char *msg = sa_message(sql->sa, SQLSTATE(40002) 
"UPDATE: CHECK constraint violated for column %s.%s", c->t->base.name, 
c->base.name);
-                       (void)stmt_exception(be, s, msg, 00001);
-               }
-       }
+
+       int pos = 0;
+       sql_rel* rel = rel_read(sql, sa_strdup(sql->sa, key->check), &pos, 
sa_list(sql->sa));
+       rel->l = rupdates;
+       stmt* s = subrel_bin(be, rel, refs);
+       s = stmt_uselect(be, column(be, s), stmt_atom(be, 
atom_zero_value(sql->sa, bt)), cmp_equal, NULL, 0, 1);
+       s = stmt_aggr(be, s, NULL, NULL, cnt, 1, 0, 1);
+       char *msg = sa_message(sql->sa, SQLSTATE(40002) "UPDATE: CHECK 
constraint violated: %s", key->base.name);
+       (void)stmt_exception(be, s, msg, 00001);
 }
 
 static void
@@ -6069,7 +6065,12 @@ rel2bin_update(backend *be, sql_rel *rel
                if (c)
                        updates[c->colnr] = bin_find_column(be, update, ce->l, 
ce->r);
        }
-       sql_update_check(be, t, rel->r, updates, refs);
+
+       for (m = ol_first_node(t->keys); m; m = m->next) {
+               sql_key * key = m->data;
+               if (key->type == ckey)
+                       sql_update_check(be, key, rel->r, refs);
+       }
        sql_update_check_null(be, t, updates);
 
        /* check keys + get idx */
diff --git a/sql/include/sql_catalog.h b/sql/include/sql_catalog.h
--- a/sql/include/sql_catalog.h
+++ b/sql/include/sql_catalog.h
@@ -534,7 +534,8 @@ typedef enum key_type {
        pkey,
        ukey, /* default behavior is that NULLS are distinct, e.g. there can be 
multiple null values in a column with regular UNIQUE constraint */
        fkey,
-       unndkey /* NULLS are not distinct, i.e. NULLS act as regular values for 
uniqueness checks */
+       unndkey, /* NULLS are not distinct, i.e. NULLS act as regular values 
for uniqueness checks */
+       ckey     /* CHECK constraint behaves as a key type*/
 } key_type;
 
 typedef struct sql_kc {
@@ -571,6 +572,7 @@ typedef struct sql_key {    /* pkey, ukey, 
        sql_base base;
        key_type type;          /* pkey, ukey, fkey */
        sql_idx *idx;           /* idx to accelerate key check */
+       char *check; /* check condition*/
 
        struct list *columns;   /* list of sql_kc */
        struct sql_table *t;
@@ -627,7 +629,6 @@ typedef struct sql_column {
        char unique;            /* 0 NOT UNIQUE, 1 SUB_UNIQUE, 2 UNIQUE */
        int drop_action;        /* only used for alter statements */
        char *storage_type;
-       char *check; /* check condition*/
        size_t dcount;
        void *min;
        void *max;
diff --git a/sql/server/rel_schema.c b/sql/server/rel_schema.c
--- a/sql/server/rel_schema.c
+++ b/sql/server/rel_schema.c
@@ -391,6 +391,30 @@ cleanup:
        return res;
 }
 
+static
+key_type token2key_type(int token) {
+               switch (token) {
+               case SQL_UNIQUE:                                        return 
ukey;
+               case SQL_UNIQUE_NULLS_NOT_DISTINCT:     return unndkey;
+               case SQL_PRIMARY_KEY:                           return pkey;
+               case SQL_CHECK:                                         return 
ckey;
+               }
+               assert(0);
+               return -1;
+}
+
+static
+str serialize_check_plan(sql_query *query, symbol *s, sql_table *t) {
+
+       mvc *sql = query->sql;
+       exp_kind ek = {type_value, card_value, FALSE};
+       sql_rel* rel = rel_basetable(sql, t, t->base.name);
+       sql_exp *e = rel_logical_value_exp(query, &rel, s->data.sym, sql_sel, 
ek);
+       rel = rel_project_exp(sql, e);
+       str check = rel2str(sql, rel);
+       return check;
+}
+
 static int
 column_constraint_type(sql_query *query, const char *name, symbol *s, 
sql_schema *ss, sql_table *t, sql_column *cs, bool isDeclared, int *used)
 {
@@ -404,8 +428,9 @@ column_constraint_type(sql_query *query,
        switch (s->token) {
        case SQL_UNIQUE:
        case SQL_UNIQUE_NULLS_NOT_DISTINCT:
-       case SQL_PRIMARY_KEY: {
-               key_type kt = (s->token == SQL_UNIQUE) ? ukey : (s->token == 
SQL_UNIQUE_NULLS_NOT_DISTINCT) ? unndkey : pkey;
+       case SQL_PRIMARY_KEY:
+       case SQL_CHECK: {
+               key_type kt = token2key_type(s->token);
                sql_key *k;
                const char *ns = name;
 
@@ -431,7 +456,13 @@ column_constraint_type(sql_query *query,
                        (void) sql_error(sql, 02, SQLSTATE(42000) "CONSTRAINT 
%s: an index named '%s' already exists, and it would conflict with the key", kt 
== pkey ? "PRIMARY KEY" : "UNIQUE", name);
                        return res;
                }
-               switch (mvc_create_ukey(&k, sql, t, name, kt)) {
+               char* check = NULL;
+               if (kt == ckey) {
+                       if ((check = serialize_check_plan(query, s, t)) == 
NULL) {
+                               /*TODO error*/
+                       }
+               }
+               switch (mvc_create_ukey(&k, sql, t, name, kt, check)) {
                        case -1:
                                (void) sql_error(sql, 02, SQLSTATE(HY013) 
MAL_MALLOC_FAIL);
                                return res;
@@ -602,27 +633,6 @@ column_constraint_type(sql_query *query,
                }
                res = SQL_OK;
        }       break;
-       case SQL_CHECK: {
-               
-               exp_kind ek = {type_value, card_value, FALSE};
-               sql_rel* rel3 = rel_basetable(sql, t, t->base.name);
-               sql_exp *e = rel_logical_value_exp(query, &rel3, s->data.sym, 
sql_sel, ek);
-               sql_rel *rel = rel_project_exp(sql, e);
-               char* check = rel2str(sql, rel);
-
-               switch (mvc_check(sql, cs, check)) {
-                       case -1:
-                               (void) sql_error(sql, 02, SQLSTATE(HY013) 
MAL_MALLOC_FAIL);
-                               return SQL_ERR;
-                       case -2:
-                       case -3:
-                               (void) sql_error(sql, 02, SQLSTATE(42000) 
"CHECK CONSTRAINT: transaction conflict detected");
-                               return SQL_ERR;
-                       default:
-                               break;
-               }
-               res = SQL_OK;
-       }       break;
        default:{
                res = SQL_ERR;
        }
@@ -879,15 +889,17 @@ table_foreign_key(mvc *sql, const char *
 }
 
 static int
-table_constraint_type(mvc *sql, const char *name, symbol *s, sql_schema *ss, 
sql_table *t)
+table_constraint_type(sql_query *query, const char *name, symbol *s, 
sql_schema *ss, sql_table *t)
 {
+       mvc *sql = query->sql;
        int res = SQL_OK;
 
        switch (s->token) {
        case SQL_UNIQUE:
        case SQL_UNIQUE_NULLS_NOT_DISTINCT:
-       case SQL_PRIMARY_KEY: {
-               key_type kt = (s->token == SQL_PRIMARY_KEY ? pkey : s->token == 
SQL_UNIQUE ? ukey : unndkey);
+       case SQL_PRIMARY_KEY:
+       case SQL_CHECK: {
+               key_type kt = token2key_type(s->token);
                dnode *nms = s->data.lval->h;
                sql_key *k;
                const char *ns = name;
@@ -914,8 +926,14 @@ table_constraint_type(mvc *sql, const ch
                        (void) sql_error(sql, 02, SQLSTATE(42000) "CONSTRAINT 
%s: an index named '%s' already exists, and it would conflict with the key", kt 
== pkey ? "PRIMARY KEY" : "UNIQUE", name);
                        return SQL_ERR;
                }
+               char* check = NULL;
+               if (kt == ckey) {
+                       if ((check = serialize_check_plan(query, s, t)) == 
NULL) {
+                               /*TODO error*/
+                       }
+               }
 
-               switch (mvc_create_ukey(&k, sql, t, name, kt)) {
+               switch (mvc_create_ukey(&k, sql, t, name, kt, check)) {
                        case -1:
                                (void) sql_error(sql, 02, SQLSTATE(HY013) 
MAL_MALLOC_FAIL);
                                return SQL_ERR;
@@ -926,6 +944,7 @@ table_constraint_type(mvc *sql, const ch
                        default:
                                break;
                }
+               /* TODO: iterate over all columns in case of CHECK constraint*/
                for (; nms; nms = nms->next) {
                        char *nm = nms->data.sval;
                        sql_column *c = mvc_bind_column(sql, t, nm);
@@ -963,10 +982,6 @@ table_constraint_type(mvc *sql, const ch
        case SQL_FOREIGN_KEY:
                res = table_foreign_key(sql, name, s, ss, t);
                break;
-       case SQL_CHECK: {
-               (void) sql_error(sql, 02, SQLSTATE(42000) "CONSTRAINT CHECK: 
check constraints not supported");
-               return SQL_ERR;
-       }       break;
        default:
                res = SQL_ERR;
        }
@@ -978,8 +993,9 @@ table_constraint_type(mvc *sql, const ch
 }
 
 static int
-table_constraint(mvc *sql, symbol *s, sql_schema *ss, sql_table *t)
+table_constraint(sql_query *query, symbol *s, sql_schema *ss, sql_table *t)
_______________________________________________
checkin-list mailing list -- checkin-list@monetdb.org
To unsubscribe send an email to checkin-list-le...@monetdb.org

Reply via email to