2011/9/28, Merlin Moncure <mmonc...@gmail.com>:
>
> I disagree.  unnest() and array_agg() (or, even better, array()
> constructor syntax) are an absolute joy to work with and thinking in a
> more functional way, which is usually the key to making things run
> quickly.  Also both functions are trivial to emulate in userland for
> compatibility.  Arrays of composites IIRC only go back to 8.3 so  that
> would be a true stopper for any solution in that vein.

Ok, tastes are tastes: I hate to make two or three more levels of
subqueries. Regarding arrays of composites, that would be perfectly
solved if we use no composite at all! Instead of a field with an array
of a composite of three instrinsics, use three fields, each of an
intrinsic type. See your proposal:

>> create type audit_field_t as (field text, old_value text, new_value text);

Instad, in the audit table you may use:

..., field smallint[], before text[], after text[],...

Note the smallint in field, that means I really want to keep the
reference to the "field" table. That is for the two reasons I had
mentioned earlier (to reduce space: 2 bytes of type "smallint" against
variable size of type "text"; and to keep track of names been used
too). You can also set up something like this if you like dimensions:

..., field smallint[], values text[][],...

Implying that the first dimension is the "before" value and the second
one is for the "after" value. Any of these prevents us from using
composites and makes the box a little wider and simpler. Even further,
I would like to keep the logging "on demand":

..., field smallint[], is_pk boolean[], { before text[], after text[]
| values text[][] },...

You know what are the braces and pipe for...
So, at the end, we have the entire "audet" table inside the "audit"
table, as a series of arrays. We got a real compact table with only
enough data to fully log the changes which triggered the event. No
less, no more.
At this point we know querying this table will be much more slow and
rotation will have to be done more frequently. If we dump>restore the
table somewhere else we will still be able to split the table in the
original two ones, and make indexes, cluster them, and query as
desired. But this can get so complicated that maybe I should implement
a function doing all this. In an event, we are getting less
responsiveness because of this. But a couple of mins more may not be a
problem for most cases. I'm just trying to summarize.

As a rule of thumb, you may need to run a cron job every night or so
to check if 'select count(*) from audit' is bigger than X then rotate
the table (or maybe each X days/weeks/etc.). The smaller the X, the
bigger responsiveness _in_ some cases: if we know an interval in time
we will just have to dump>restore those logs. In other cases this
would not be of much help: if you need to track a tupple to the very
beggining of the times, you'll have a lot of work to do
dumping>restoring (and so forth... remember to split the table,
indexing...). Still, rotation seems to be a good practice, and you can
include in the cron job the dump/restore part into another server and
then delete the old table. That would save a lot of space in your
production environment.

> As for the rest of it, I'd be looking to try and come up with an all
> sql implementation.  Also you should give an honest comparison between
> what you've come up with vs. this:
> http://pgfoundry.org/projects/tablelog/.
>
> merlin
>

"All SQL implementation"? Didn't we agree that's not possible in
pg<=8.4? then what do you mean by that?

About project "tablelog", I didn't really try it, but read it's
documentation and seems not appropiate at all for my case. First of
all, it's propose seems to be to log everything in a table to be able
to restore it later as of any time in the past. My propose is to log
to run analysis. Also, it needs to create one table per logged table,
consisting of the same structure of the logged table (without
constraints) plus three, four or five columns for control (depending
on usage, four or five recommended). I have a lot of tables to log
(hundreds!) with small changes to each of them; that means to
duplicate the amount of tables for a few changes. Speaking of
compactness... It also logs everything, not only changed values.
It is written in C, so I assume it runs much, much faster (specially
needed for highly transactional DBs). But it's not proven to be binary
safe (which I don't remember what that is). Bugs: nothing known.

So, if you need to be able to restore your table as of any time, use
tablelog. If you need to run analysis on who did what, use my option.


Finally attaching the code!

Cheers.

-- 
Diego Augusto Molina
diegoaugustomol...@gmail.com

ES: Por favor, evite adjuntar documentos de Microsoft Office. Serán
desestimados.
EN: Please, avoid attaching Microsoft Office documents. They shall be discarded.
LINK: http://www.gnu.org/philosophy/no-word-attachments.html
		/* Created by Diego Augusto Molina in 2011 for Tucuman Government, Argentina. */

/*
		Needs:
  You must execute the following accordingly to your needs:
    
  CREATE TRUSTED PROCEDURAL LANGUAGE 'plpgsql';
  CREATE TRUSTED PROCEDURAL LANGUAGE 'plperl';


		Description:
  The "audit" trigger is the trigger which should be called by any table we want to audit. If it
	receives arguments, they will be interpreted as the primary key of the table. If any of the
	arguments is not a column of the table, or no arguments is received at all, a probing process is
	taken which ends up determining the pk of the table. Thus, it is better to create the trigger
	calling this function with no arguments. See the TODO list.


  	Usage of the "audit" trigger:
  CREATE TRIGGER <tg_name> AFTER {INSERT | UPDATE | DELETE} [ OR ...] ON <table_name>
    FOR EACH ROW EXECUTE PROCEDURE audit.audit( { <column_list> | <nothing_at_all> } );


		Known issues:
	1) You don't want to use this trigger on a table which has a column of some type that doesn't have
		a cast to string. That would cause a runtime error killing your transaction! See TODO	#2. Be
		easy, most 'common' types have a cast to string by default.


		TODO:
	1) In phase 3 of the "audit" trigger ('P3'), instead of asking '( scalar keys %pk == 0 )' each
		time, put an 'else'.
	1') If the pk was not passed as argument, at the end of the probing execute an 'alter trigger' so
		that next time there's no probing at all. This would be unfriendly with modifications in the
		table definition, which should carry an update in the trigger (putting no arguments at all would
		imply probing again for the next time).
	2) Manage the logging of columns of types which don't have a cast to string. Option 1 is to
		prevent the logging of such columns at all. Option 2 is more complex: Search for a way to save
		the binary contents of the columns instead of the formatted content. The table 'field' would
		have an extra column of type 'type' and that would help describing the field audited. That would
		solve the problem with strange fields (assuming that _any_ value can be converted to it's
		binary/internal representation). This may carry some extra complexity, maybe needing extra
		tables holding information about types.
	3) Make this function receive only two parameters: two arrays of type name[], the first holding
		the set of columns which are the primary key, the second one is the set of columns which in
		addition to the pk one's are to be registered. Note that pk columns will always be registered
		because that will identify the tuple modified.
	4) Support for TRUNCATE event.

*/

CREATE ROLE auditor NOSUPERUSER NOINHERIT NOCREATEDB NOCREATEROLE NOLOGIN;
CREATE ROLE audit   NOSUPERUSER NOINHERIT NOCREATEDB NOCREATEROLE   LOGIN ENCRYPTED PASSWORD 'test.1234';
CREATE SCHEMA audit AUTHORIZATION audit;
ALTER ROLE auditor SET search_path=audit;
ALTER ROLE audit SET search_path=audit;
SET search_path to audit;
SET SESSION AUTHORIZATION audit;

CREATE SEQUENCE seq_audit INCREMENT 1 START 0 CACHE 1 CYCLE MINVALUE -9223372036854775808 MAXVALUE 9223372036854775807;

CREATE SEQUENCE seq_elems INCREMENT 1 START 0 CACHE 1 CYCLE MINVALUE -32768 MAXVALUE 32767;

CREATE TABLE field (
  id smallint NOT NULL DEFAULT nextval('seq_elems'::regclass),
  value name NOT NULL,
  CONSTRAINT field_pk PRIMARY KEY (id) WITH (FILLFACTOR=100),
  CONSTRAINT field_uq_value UNIQUE (value) WITH (FILLFACTOR=100)
) WITH (OIDS=FALSE);
GRANT SELECT ON TABLE field TO auditor;

CREATE TABLE client_inet (
  id smallint NOT NULL DEFAULT nextval('seq_elems'::regclass),
  value inet NOT NULL DEFAULT inet_client_addr(),
  CONSTRAINT dir_inet_pk PRIMARY KEY (id) WITH (FILLFACTOR=100),
  CONSTRAINT dir_inet_uq_value UNIQUE (value) WITH (FILLFACTOR=95)
) WITH (OIDS=FALSE);
GRANT SELECT ON TABLE client_inet TO auditor;

CREATE TABLE schema (
  id smallint NOT NULL DEFAULT nextval('seq_elems'::regclass),
  value name NOT NULL,
  CONSTRAINT schema_pk PRIMARY KEY (id) WITH (FILLFACTOR=100),
  CONSTRAINT schema_uq_value UNIQUE (value) WITH (FILLFACTOR=100)
) WITH (OIDS=FALSE);
GRANT SELECT ON TABLE schema TO auditor;

CREATE TABLE table (
  id smallint NOT NULL DEFAULT nextval('seq_elems'::regclass),
  value name NOT NULL,
  CONSTRAINT table_pk PRIMARY KEY (id) WITH (FILLFACTOR=100),
  CONSTRAINT table_uq_value UNIQUE (value) WITH (FILLFACTOR=100)
) WITH (OIDS=FALSE);
GRANT SELECT ON TABLE table TO auditor;

CREATE TABLE user (
  id smallint NOT NULL DEFAULT nextval('seq_elems'::regclass),
  value name NOT NULL DEFAULT "current_user"(),
  CONSTRAINT user_pk PRIMARY KEY (id) WITH (FILLFACTOR=100),
  CONSTRAINT user_uq_value UNIQUE (value) WITH (FILLFACTOR=95)
) WITH (OIDS=FALSE);
GRANT SELECT ON TABLE user TO auditor;

CREATE TABLE audit (
  id bigint,
  type character(1),
  tstmp timestamp with time zone DEFAULT now(),
  schema smallint,
  table smallint,
  user smallint,
  client_inet smallint,
  client_port integer DEFAULT inet_client_port(),
  pid integer DEFAULT pg_backend_pid()
) WITH (OIDS=FALSE);
GRANT SELECT ON TABLE audit TO auditor;

CREATE TABLE audet (
  id bigint,
  field smallint,
  is_pk boolean,
  before text,
  after text
) WITH (OIDS=FALSE);
GRANT SELECT ON TABLE audet TO auditor;

CREATE OR REPLACE FUNCTION tgf_ins_audet() RETURNS trigger AS
$BODY$
    begin
      execute E'insert into audet_' || tg_argv[0] || E'
        (
          id,
          field,
          is_pk,
          before,
          after
        ) values
        (
          '||coalesce(new.id::text,'NULL')||E',
          '||coalesce(new.field::text,'NULL')||E',
          '||coalesce(new.is_pk::text,'NULL')||E',
          '||coalesce(quote_literal(new.before),'NULL')||E',
          '||coalesce(quote_literal(new.after),'NULL')||E'
        )';
      return null;
  end;$BODY$ LANGUAGE plpgsql VOLATILE;
ALTER FUNCTION tgf_ins_audet() SET search_path=auditoria;

CREATE OR REPLACE FUNCTION tgf_ins_audit() RETURNS trigger AS
$BODY$
    begin
      execute E'insert into audit_' || tg_argv[0] || E'
        (
          id,
          type,
          tstmp,
          schema,
          table,
          user,
          client_inet,
          client_port,
          pid
        ) values
        (
          '||coalesce(new.id::text,'NULL')||E',
          '||coalesce(quote_literal(new.type),'NULL')||E',
          '||coalesce(quote_literal(new.tstmp),'NULL')||E',
          '||coalesce(new.schema::text,'NULL')||E',
          '||coalesce(new.table::text,'NULL')||E',
          '||coalesce(new.user::text,'NULL')||E',
          '||coalesce(new.client_inet::text,'NULL')||E',
          '||coalesce(new.client_port::text,'NULL')||E',
          '||coalesce(new.pid::text,'NULL')||E'
        )';
      return null;
  end;$BODY$ LANGUAGE plpgsql VOLATILE;
ALTER FUNCTION tgf_ins_audit() SET search_path=auditoria;

CREATE OR REPLACE FUNCTION rotate(character) RETURNS void AS
$BODY$
    declare
      first_execution boolean := false;
      cur_start char(8) := null;
      cur_tstmp_min timestamp with time zone;
      cur_tstmp_max timestamp with time zone;
      cur_id_min bigint;
      cur_id_max bigint;
      new_start  char(8);
    begin
        /* Determine the creation tstmp of the tables ========================================================================= */
      select substring(max(c.relname::text) from $1 || E'_(........)') into cur_start
        from
          pg_namespace n inner join
          pg_class c on (n.oid = c.relnamespace)
        where
          n.nspname = 'audit'::name and
          c.relname::text like $1 || '_%';
      if cur_start is null then
        first_execution := true;
        cur_start := '';
      end if;
      new_start := cast(to_char(current_timestamp,'YYYYMMDD') as name);
      
      case $1
        when 'audit' then /* if I'm rotating the table audit ================================================================== */
            /* current table */
          if not first_execution then
            execute 'select min(tstmp), max(tstmp) from audit_' || cur_start
              into cur_tstmp_min, cur_tstmp_max;
            execute $$
              alter index idx_audit_$$|| cur_start ||$$_id            set (fillfactor = 100);
              alter index idx_audit_$$|| cur_start ||$$_tstmp         set (fillfactor = 100);
              alter index idx_audit_$$|| cur_start ||$$_schema__table set (fillfactor = 100);
              alter index idx_audit_$$|| cur_start ||$$_user          set (fillfactor = 100);
              
              cluster audit_$$|| cur_start ||$$;
              analyze audit_$$|| cur_start ||$$;
              alter table audit_$$|| cur_start ||$$ add
                constraint audit_$$|| cur_start ||$$_ck_exclusion
                check (
                  tstmp >= '$$|| cur_tstmp_min ||$$'
                    and
                  tstmp <= '$$|| cur_tstmp_max ||$$'
                )
              $$;
          end if;
          
          execute $$
              /* new table */
            create table audit_$$|| new_start ||$$ () inherits (audit);
            create index idx_audit_$$|| new_start ||$$_id            on audit_$$|| new_start ||$$ using btree (id)            with (fillfactor = 99);
            create index idx_audit_$$|| new_start ||$$_tstmp         on audit_$$|| new_start ||$$ using btree (tstmp)         with (fillfactor = 99);
            create index idx_audit_$$|| new_start ||$$_schema__tabla on audit_$$|| new_start ||$$ using btree (schema, table) with (fillfactor = 95);
            create index idx_audit_$$|| new_start ||$$_user          on audit_$$|| new_start ||$$ using btree (usuario)       with (fillfactor = 95); 
            cluster audit_$$|| new_start ||$$ using idx_audit_$$|| new_start ||$$_tstmp;
            
              /* Parent table */
            drop trigger if exists tg_audit_$$|| cur_start ||$$ on audit;
            create trigger tg_audit_$$|| new_start ||$$
              before insert
              on audit
              for each row
              execute procedure tgf_ins_audit('$$|| new_start ||$$');
          $$;
        when 'audet' then /* if I'm rotating the table audet ================================================================== */
            /* current table */
          if not first_execution then
            execute 'select min(id), max(id) from audet_' || cur_start
              into cur_id_min, cur_id_max;
            execute $$
              alter index idx_audet_$$|| cur_start ||$$_id   set (fillfactor = 100);
              alter index idx_audet_$$|| cur_start ||$$_fieldpk set (fillfactor = 100);
              
              cluster audet_$$|| cur_start ||$$;
              analyze audet_$$|| cur_start ||$$;
              alter table audet_$$|| cur_start ||$$ add
                constraint audet_$$|| cur_start ||$$_ck_exclusion
                check (
                  id >= '$$|| cur_id_min ||$$'
                    and
                  id <= '$$|| cur_id_max ||$$'
                );
              
                /* Parent table */
              drop trigger tg_audet_$$|| cur_start ||$$ on audet;
            $$;
          end if;
          
          execute $$
              /* new table */
            create table audet_$$|| new_start ||$$ () inherits (audet);
            create index idx_audet_$$|| new_start ||$$_id      on audet_$$|| new_start ||$$ using btree (id) with (fillfactor = 99);
            create index idx_audet_$$|| new_start ||$$_fieldpk on audet_$$|| new_start ||$$ using btree (field) with (fillfactor = 99) where es_pk;
            cluster audet_$$|| new_start ||$$ using idx_audet_$$|| new_start ||$$_id;
            
              /* Parent table */
            create trigger tg_audet_$$|| new_start ||$$
              before insert
              on audet
              for each row
              execute procedure tgf_ins_audet('$$|| new_start ||$$');
          $$;
        else /* if I got a wrong argument ===================================================================================== */
          raise notice E'Error: expected \'audit\' o \'audet\'. Got \'%\'.', $1;
          return;
      end case;
  end;$BODY$ LANGUAGE plpgsql VOLATILE SECURITY DEFINER;
ALTER FUNCTION rotate(character) SET search_path=auditoria;

CREATE OR REPLACE FUNCTION audit() RETURNS trigger AS $BODY$ #/*

      # P0. Declarations and general definitions ###################################################
      ##############################################################################################

    my $elog_pref = "(schm:$_TD->{table_schema};tab:$_TD->{table_name};trg:$_TD->{name};evt:$_TD->{event}):";
    my $rv = "";                            # Query execution
    my $val = "";                           # Iterating value
    my %tables = (                          # Value of the respective tables inserted in "audit"
      "user" => 'pg_catalog."session_user"()',
      "table" => "'$_TD->{table_name}'",
      "schema" => "'$_TD->{table_schema}'",
      "client_inet" => "pg_catalog.inet_client_addr()"
    );
    my $id = "";                            # Id of the tuple inserted in "audit"
    my $field = "";                         # Field id
    my $is_pk = 0;                          # Determines if a field is part of the PK
    my $before = "";                        # Value of a field in OLD
    my $after = "";                         # Value of a field in NEW
    my %cols = ();                          # Columns of the table
    my %pk = ();                            # Primary key

      # Copy columns from some available transitional variable -------------------------------------
    if (exists $_TD->{new}){
      %cols = %{$_TD->{new}};
    } else {
      %cols = %{$_TD->{old}};
    }

      # P1. Create necessary tuples in user, table, schema and client_inet #########################
      ##############################################################################################

    foreach $val (keys %tables){
      $rv = spi_exec_query("select id from $val where value = $tables{$val}");
      if ( $rv->{status} != SPI_OK_SELECT ){
        elog(ERROR, "$elog_pref Error querying table '$val'.");
      }
      if ( $rv->{processed} == 1 ){
        $tables{$val} = $rv->{rows}[0]->{id};
      } else {
        $rv = spi_exec_query("insert into $val (value) values ($tables{$val}) returning id");
        if ( $rv->{status} != SPI_OK_INSERT_RETURNING ){
          elog(ERROR, "$elog_pref Error inserting in table '$val'.");
        }
        $tables{$val} = $rv->{rows}[0]->{id};
      }
    }

      # P2. Insert in audit ########################################################################
      ##############################################################################################

    $rv = spi_exec_query("select nextval('seq_audit'::regclass) as id");
    if ( $rv->{status} != SPI_OK_SELECT ){
      elog(ERROR, "$elog_pref Error querying next value of sequence 'seq_audit'.");
    }
    $id = $rv->{rows}[0]->{id};
    $rv = spi_exec_query("insert into audit (id, type, schema, table, user, client_inet)
        values (
          $id,
          substring('$_TD->{event}', 1, 1),
          $tables{'schema'},
          $tables{'table'},
          $tables{'user'},
          $tables{'client_inet'}
          )
      ");
    if ($rv->{status} != SPI_OK_INSERT){
      elog(ERROR,"$elog_pref Error inserting tuple in table 'audit'.");
    }

      # P3. Determine PK of the table ##############################################################
      ##############################################################################################

    if ( scalar keys %pk == 0){
        #  Criterion 1: if got params, each of them is a column of the table, and all of them make
        # up the pk of the table -------------------------------------------------------------------
        elog(DEBUG, "$elog_pref Searching pk in the trigger's params.");
      if ($_TD->{argc} > 0){
        ARGS: foreach $val ( @{$_TD->{args}} ){
          if (exists $cols{$val}){
            $pk{$val} = "-";
          } else {
            %pk = ();
            elog(DEBUG, "$elog_pref The column '$val' given as argument does not exist. Skipping to next criterion.");
            last ARGS;
          }
        }
      }
    }
    if ( scalar keys %pk == 0 ) {
        #  Criterion 2: search the pk in the system catalogs ---------------------------------------
      elog(DEBUG, "$elog_pref Searching pk in system catalogs.");
      $rv = spi_exec_query("
        select a.attname from
          ( select cl.oid, unnest(c.conkey) as att
            from
              pg_catalog.pg_constraint c inner join
              pg_catalog.pg_class cl on (c.conrelid = cl.oid)
            where
              c.contype = 'p'
              and cl.oid = $_TD->{relid}
          ) as c inner join
          pg_catalog.pg_attribute a on (c.att = a.attnum and c.oid = a.attrelid)
      ");
      if ( $rv->{status} == SPI_OK_SELECT ){
        if ( $rv->{processed} > 0 ){
          foreach $val ($rv->{rows}){
            $pk{$val->{attname}} = "-";
          }
        }
      } else {
        elog(DEBUG, "$elog_pref Error querying the system catalogs. Skipping to next criterion.");
      }
    }
    if ( scalar keys %pk == 0) {
        #  Criterion 3: if the table has OIDs, use that as pk and emit a warning -------------------
        elog(DEBUG, "$elog_pref Searching OIDs in the table.");
        $rv = spi_exec_query("select * from pg_catalog.pg_class where oid = $_TD->{relid} and relhasoids = true");
        if( $rv->{status} == SPI_OK_SELECT ){
          if ( $rv->{processed} > 0 ){
            %pk = ("oid","-");
            elog(DEBUG, "$elog_pref Using OIDs as table pk for '$_TD->{table_name}' because no previous criterion could find one.");
          }
        } else {
          elog(DEBUG, "$elog_pref Error querying the system catalogs. Skipping to next criterion.");
        }
      }
    if ( scalar keys %pk == 0){
        #  Default criterion: all tuples -----------------------------------------------------------
      elog(DEBUG, "$elog_pref Could not find a suitable pk. Logging every column.");
      %pk = %cols;
    }

      # P4. Insert in audet ########################################################################
      ##############################################################################################

    foreach $val (keys %cols){
      $is_pk = 0 + exists($pk{$val});
      if ( $_TD->{event} ne "UPDATE" || $is_pk || $_TD->{new}{$val} ne $_TD->{old}{$val} ){
        $before = (exists $_TD->{old}) ? "'".$_TD->{old}{$val}."'" : "NULL";
        $after  = (exists $_TD->{new}) ? "'".$_TD->{new}{$val}."'" : "NULL";
        if ( $_TD->{event} eq "UPDATE" && $_TD->{new}{$val} eq $_TD->{old}{$val}){
            #   We don't save the previous state of the column which is part of the pk while updating
            # if it hasn't changed.
          $before = "NULL";
        }
        $rv = spi_exec_query("select id from field where value = '$val'");
        if ( $rv->{status} != SPI_OK_SELECT ){
          elog(ERROR, "$elog_pref Error querying table 'field'.");
        }
        if ( $rv->{processed} > 0 ){
          $field = $rv->{rows}[0]->{id};
        } else {
          $rv = spi_exec_query("insert into field (value) values ('$val') returning id");
          if ( $rv->{status} != SPI_OK_INSERT_RETURNING ){
            elog(ERROR, "$elog_pref Error executing insert returning in table 'field'.");
          }
          $field = $rv->{rows}[0]->{id};
        }
        $rv = spi_exec_query("insert into audet (id, field, is_pk, before, after)
          values ($id, $field, cast($is_pk as boolean), cast($before as text), cast($after as text))");
        if ( $rv->{status} ne SPI_OK_INSERT ){
          elog(ERROR, "$elog_pref Error inserting tuples in table 'audet'.");
        }
      }
    }

      # P5. Finishing ##############################################################################
      ##############################################################################################

    return;

#*/$BODY$ LANGUAGE plperl VOLATILE SECURITY DEFINER;
ALTER FUNCTION audit() SET search_path=auditoria;

	-- Let's create the first tables
SELECT rotate('audit');
SELECT rotate('audet');

-- 
Sent via pgsql-general mailing list (pgsql-general@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-general

Reply via email to