Author: spadkins
Date: Thu Dec  4 15:16:56 2008
New Revision: 12137

Modified:
   p5ee/trunk/App-Repository/lib/App/Repository/Oracle.pm
   p5ee/trunk/App-Repository/t/DBI-import-ora.t

Log:
added support for sqlldr in import_rows() for Oracle

Modified: p5ee/trunk/App-Repository/lib/App/Repository/Oracle.pm
==============================================================================
--- p5ee/trunk/App-Repository/lib/App/Repository/Oracle.pm      (original)
+++ p5ee/trunk/App-Repository/lib/App/Repository/Oracle.pm      Thu Dec  4 
15:16:56 2008
@@ -12,6 +12,8 @@
 
 use strict;
 use Data::Dumper;
+use Digest::SHA qw(sha1_hex);
+use Date::Format;
 
 =head1 NAME
 
@@ -236,153 +238,371 @@
     &App::sub_exit() if ($App::trace);
 }
 
-#############################################################################
-# METHODS
-#############################################################################
+# $insert_sql = $rep->_mk_insert_rows_sql ($table, [EMAIL PROTECTED], [EMAIL 
PROTECTED], \%options);
+# i.e. $options->{replace}
+sub TBD_mk_insert_rows_sql {
+    &App::sub_entry if ($App::trace);
+    my ($self, $table, $cols, $rows, $options) = @_;
 
-=head1 Methods: Import/Export Data From File
+    $self->_load_table_metadata($table) if (!defined 
$self->{table}{$table}{loaded});
+    my $dbh = $self->{dbh};
 
-=cut
+    if ($#$cols == -1) {
+        if ($#$rows > -1 && ref($rows->[0]) eq "ARRAY") {
+            die "_mk_insert_rows_sql(): no columns specified";
+        }
+        else {
+            $cols = [ keys %{$rows->[0]} ];
+        }
+    }
+    my $column_defs = $self->{table}{$table}{column};
 
-#############################################################################
-# import_rows()
-#############################################################################
-
-=head2 import_rows()
-
-    * Signature: $rep->import_rows($table, $file);
-    * Signature: $rep->import_rows($table, $file, $options);
-    * Param:     $table        string
-    * Param:     $file         string
-    * Param:     $options      named
-    * Param:     columns       ARRAY     names of columns of the fields in the 
file
-    * Param:     import_method string    [basic=invokes generic superclass to 
do work,
-                                          insert=loads with multiple-row 
inserts,
-                                          <otherwise>=use "load data infile"]
-    * Param:     local         boolean   file is on client machine rather than 
database server
-    * Param:     replace       boolean   rows should replace existing rows 
based on unique indexes
-    * Param:     field_sep     char      character which separates the fields 
in the file (can by "\t")
-    * Param:     field_quote   char      character which optionally encloses 
the fields in the file (i.e. '"')
-    * Param:     field_escape  char      character which escapes the quote 
chars within quotes (i.e. "\")
-    * Return:    void
-    * Throws:    App::Exception::Repository
-    * Since:     0.01
-
-    Note: If you want to call this with $options->{local}, you will probably
-    need to make sure that mysql_local_infile=1 is in your DSN.  This might
-    require a line like the following in your "app.conf" file.
-
-      dbioptions = mysql_local_infile=1
-
-    Sample Usage: 
-
-    $rep->import_rows("usr","usr.dat");
-
-    # root:x:0:0:root:/root:/bin/bash
-    $rep->import_rows("usr", "/etc/passwd" ,{
-        field_sep => ":",
-        columns => [ "username", "password", "uid", "gid", "comment", 
"home_directory", "shell" ],
-    });
+    my $insert = $options->{replace} ? "replace" : "insert";
+    my $sql = "$insert into $table\n  (" . join(", ", @$cols) . ")\nvalues\n";
+    my ($value, $colnum, $quoted, $row, $col);
+    if ($rows && $#$rows > -1 && ref($rows->[0]) eq "ARRAY") {
+        for (my $rownum = 0; $rownum <= $#$rows; $rownum++) {
+            $row = $rows->[$rownum];
+            for ($colnum = 0; $colnum <= $#$cols; $colnum++) {
+                $col = $cols->[$colnum];
+                $value = $row->[$colnum];
+                if (!defined $value) {
+                    $value = "NULL";
+                }
+                else {
+                    $quoted = (defined $column_defs->{$col}{quoted}) ? 
($column_defs->{$col}{quoted}) : ($value !~ /^-?[0-9.]+$/);
+                    if ($quoted) {
+                        $value = $dbh->quote($value);
+                    }
+                }
+                if ($column_defs->{$col}{dbexpr_update}) {
+                    $value = sprintf($column_defs->{$col}{dbexpr_update}, 
$value);
+                }
+                $sql .= ($colnum == 0) ? "  ($value" : ", $value";
+            }
+            $sql .= ($rownum < $#$rows) ? "),\n" : ")\n";
+        }
+    }
+    else {  # if $row is a HASH or OBJECT ...
+        for (my $rownum = 0; $rownum <= $#$rows; $rownum++) {
+            $row = $rows->[$rownum];
+            for ($colnum = 0; $colnum <= $#$cols; $colnum++) {
+                $col = $cols->[$colnum];
+                $value = $row->{$col};
+                if (!defined $value) {
+                    $value = "NULL";
+                }
+                else {
+                    $quoted = (defined $column_defs->{$col}{quoted}) ? 
($column_defs->{$col}{quoted}) : ($value !~ /^-?[0-9.]+$/);
+                    if ($quoted) {
+                        $value = $dbh->quote($value);
+                    }
+                }
+                if ($column_defs->{$col}{dbexpr_update}) {
+                    $value = sprintf($column_defs->{$col}{dbexpr_update}, 
$value);
+                }
+                $sql .= ($colnum == 0) ? "  ($value" : ", $value";
+            }
+            $sql .= ($rownum < $#$rows) ? "),\n" : ")\n";
+        }
+    }
+    if (!$options->{replace} && $options->{update}) {
+        my $update = $options->{update};
+        $sql .= "on duplicate key update";
+        my $first_update_column = 1;
+        for ($colnum = 0; $colnum <= $#$cols; $colnum++) {
+            $col = $cols->[$colnum];
+            if (!ref($update) || $update->{$col}) {
+                $sql .= "," if (!$first_update_column);
+                $first_update_column = 0;
+                $sql .= "\n   $col = values($col)";
+            }
+        }
+        $sql .= "\n";
+    }
+    &App::sub_exit($sql) if ($App::trace);
+    $sql;
+}
 
-=cut
+# $nrows = $rep->_insert_rows ($table, [EMAIL PROTECTED], [EMAIL PROTECTED]);
+sub TBD_insert_rows {
+    &App::sub_entry if ($App::trace);
+    my ($self, $table, $cols, $rows, $options) = @_;
+    $self->{error} = "";
+    my ($sql, $retval, $nrows_this_insert);
+   
+    my $dbh = $self->{dbh};
+    return 0 if (!defined $dbh);
 
+    my $nrows = 0;
+    my $ok = 1;
+    my $context_options = $self->{context}{options};
+    my $debug_sql = $context_options->{debug_sql};
+    my $explain_sql = $context_options->{explain_sql};
+    my ($timer, $elapsed_time);
+    if ($debug_sql) {
+        $timer = $self->_get_timer();
+    }
+    my $rows_ref = ref($rows);
+    if ($rows_ref eq "ARRAY") {
+        my $maxrows = $options->{maxrows} || 100;
+        my $rownum = 0;
+        my (@current_rows, $rownum2);
+        while ($rownum <= $#$rows) {
+            $rownum2 = $rownum + $maxrows - 1;
+            $rownum2 = $#$rows if ($rownum2 > $#$rows);
+            @current_rows = @{$rows}[($rownum .. $rownum2)];
+            $nrows_this_insert = $#current_rows + 1;
+
+            $sql = $self->_mk_insert_rows_sql($table, $cols, [EMAIL 
PROTECTED], $options);
+            if ($debug_sql) {
+                print $App::DEBUG_FILE "DEBUG_SQL: _insert_rows()\n";
+                print $App::DEBUG_FILE $sql;
+            }
+            ### TODO: make this work with regex for retry
+            $retval = $dbh->do($sql);
+            if ($debug_sql) {
+                print $App::DEBUG_FILE "DEBUG_SQL: retval [$retval] 
$DBI::errstr\n";
+                print $App::DEBUG_FILE "\n";
+            }
 
-#############################################################################
-# export_rows()
-#############################################################################
-
-=head2 export_rows()
-
-    * Signature: $rep->export_rows($table, $file);
-    * Signature: $rep->export_rows($table, $file, $options);
-    * Param:     $table        string
-    * Param:     $file         string
-    * Param:     $options      named
-    * Param:     columns       ARRAY     names of columns of the fields in the 
file
-    * Param:     export_method string    [basic=invokes generic superclass to 
do work]
-    * Param:     field_sep     char      character which separates the fields 
in the file (can by "\t")
-    * Param:     field_quote   char      character which optionally encloses 
the fields in the file (i.e. '"')
-    * Param:     field_escape  char      character which escapes the quote 
chars within quotes (i.e. "\")
-    * Return:    void
-    * Throws:    App::Exception::Repository
-    * Since:     0.01
-
-    Sample Usage: 
-
-    $rep->export_rows("usr","usr.dat");
-
-    # root:x:0:0:root:/root:/bin/bash
-    $rep->export_rows("usr", "passwd.dat" ,{
-        field_sep => ":",
-        columns => [ "username", "password", "uid", "gid", "comment", 
"home_directory", "shell" ],
-    });
+            # The MySQL "insert ... on duplicate key update ..." statement 
returns 2 rows affected
+            # when the insert gets a collision and causes an update.  So we 
have to make this
+            # adjustment.  I don't know if it affects the "replace ..." 
statement in a similar way,
+            # but I figure this can't hurt.
+            if ($options->{update} || $options->{replace}) {
+                if ($retval > $nrows_this_insert) {
+                    $retval = $nrows_this_insert;
+                }
+            }
 
-=cut
+            $nrows += $retval;
+            $rownum += $maxrows;
+        }
+        if ($nrows != $#$rows + 1) {
+            $ok = 0;
+        }
+        $self->{numrows} = $nrows;
+    }
+    else {
+        my ($fh);
+        if (!$rows_ref) {
+            my $file = $rows;          # assume it is a file name
+            open(App::Repository::MySQL::FILE, $file) || die "Unable to open 
$file for reading: $!";
+            $fh = \*App::Repository::MySQL::FILE;
+        }
+        else {
+            $fh = $rows;               # assume it is a file handle
+        }
+        $rows = [];                    # we will be refilling this buffer
+        my %options = ( %$options );   # make a copy so it can be modified
+        $options{maxrows} = 100;
+        $nrows = 0;
+        while (1) {
+            $rows = $self->_read_rows_from_file($fh, $cols, \%options);
+            last if ($#$rows == -1);
+            $sql = $self->_mk_insert_rows_sql($table, $cols, $rows, $options);
+            if ($debug_sql) {
+                print $App::DEBUG_FILE "DEBUG_SQL: _insert_rows()\n";
+                print $App::DEBUG_FILE $sql;
+            }
+            ### TODO: make this work with regex for retry
+            $retval = $dbh->do($sql);
+            if ($debug_sql) {
+                print $App::DEBUG_FILE "DEBUG_SQL: retval [$retval] 
$DBI::errstr\n";
+                print $App::DEBUG_FILE "\n";
+            }
 
-#SELECT ... INTO OUTFILE is the complement of LOAD DATA INFILE; the syntax for 
the
-#export_options part of the statement consists of the same FIELDS and LINES 
clauses
-#that are used with the LOAD DATA INFILE statement.
-#See Section 13.2.5, .LOAD DATA INFILE Syntax..
-
-#SELECT
-#    [ALL | DISTINCT | DISTINCTROW ]
-#      [HIGH_PRIORITY]
-#      [STRAIGHT_JOIN]
-#      [SQL_SMALL_RESULT] [SQL_BIG_RESULT] [SQL_BUFFER_RESULT]
-#      [SQL_CACHE | SQL_NO_CACHE] [SQL_CALC_FOUND_ROWS]
-#    select_expr, ...
-#    [INTO OUTFILE 'file_name' export_options
-#      | INTO DUMPFILE 'file_name']
-#    [FROM table_references
-#      [WHERE where_definition]
-#      [GROUP BY {col_name | expr | position}
-#        [ASC | DESC], ... [WITH ROLLUP]]
-#      [HAVING where_definition]
-#      [ORDER BY {col_name | expr | position}
-#        [ASC | DESC] , ...]
-#      [LIMIT {[offset,] row_count | row_count OFFSET offset}]
-#      [PROCEDURE procedure_name(argument_list)]
-#      [FOR UPDATE | LOCK IN SHARE MODE]]
+            $nrows += $retval;
+            if ($retval != $#$rows + 1) {
+                $ok = 0;
+                last;
+            }
+        }
+        $self->{numrows} = $nrows;
+        if (!$rows_ref) {
+            close(App::Repository::MySQL::FILE);
+        }
+    }
+    if ($debug_sql) {
+        $elapsed_time = $self->_read_timer($timer);
+        print $App::DEBUG_FILE "DEBUG_SQL: total rows [$nrows] ($elapsed_time 
sec)\n";
+    }
+    $self->{sql} = $sql;
+    $self->{numrows} = $nrows;
+    &App::sub_exit($nrows) if ($App::trace);
+    return($nrows);
+}
 
-sub export_rows {
+sub import_rows {
     &App::sub_entry if ($App::trace);
-    my ($self, $table, $params, $file, $options) = @_;
+    my ($self, $table, $columns, $file, $options) = @_;
+    $columns = $self->_get_default_columns($table) if (!$columns);
 
-    if ($options->{export_method} && $options->{export_method} eq "basic") {
-        $self->SUPER::export_rows($table, $file, $options);
-    }
+    my $nrows = 0;
+    my $import_method = $options->{import_method} || $self->{import_method} || 
"";
+    if ($import_method eq "basic") {
+        $nrows = $self->SUPER::import_rows($table, $columns, $file, $options);
+    }
+    # DOESN'T WORK YET
+    #elsif ($import_method eq "insert") {
+    #    $nrows = $self->insert_rows($table, $columns, $file, $options);
+    #}
     else {
-        my $columns = $options->{columns} || $self->{table}{$table}{columns};
-        my $where_clause = $self->_mk_where_clause($table, $params, $options);
-        my $sql = "select\n   " . join(",\n   ", @$columns);
-        $sql .= "\n$where_clause" if ($where_clause); 
-        $sql .= "\ninto outfile '$file'";
-        if ($options->{field_sep} || $options->{field_quote} || 
$options->{field_escape}) {
-            $sql .= "\nfields";
-            $sql .= "\n   terminated by '$options->{field_sep}'" if 
($options->{field_sep});
-            $sql .= "\n   optionally enclosed by '$options->{field_quote}'" if 
($options->{field_quote});
-            $sql .= "\n   escaped by '$options->{field_escape}'" if 
($options->{field_escape});
-        }
-        $sql .= "\n";
         my $context_options = $self->{context}{options};
+        my $prefix    = $context_options->{prefix};
         my $debug_sql = $context_options->{debug_sql};
         my ($timer, $elapsed_time);
         if ($debug_sql) {
             $timer = $self->_get_timer();
-            print $App::DEBUG_FILE "DEBUG_SQL: export_rows()\n";
-            print $App::DEBUG_FILE $sql;
+            print $App::DEBUG_FILE "DEBUG_SQL: import_rows()\n";
+            print $App::DEBUG_FILE "$table (", join(",", @$columns), ")\n";
         }
-        my ($retval);
-        eval {
-    print STDERR "\n".("HERE"x12).Dumper($sql);
-            $retval = $self->{dbh}->do($sql);
-        };
+
+        my $datfile = $file;
+
+        my $filebase = $datfile;
+        $filebase =~ s!.*/!!;
+        $filebase =~ s/\.dat$//;
+
+        my $dbname = $self->{dbname};
+        my $column_hash = sha1_hex(join(",", @$columns));
+
+        my $ctlfile = 
"$prefix/data/app/Repository/$dbname/$table.$column_hash.ctl";
+        if (! -f $ctlfile) {
+            mkdir("$prefix/data") if (! -d "$prefix/data");
+            mkdir("$prefix/data/app") if (! -d "$prefix/data/app");
+            mkdir("$prefix/data/app/Repository") if (! -d 
"$prefix/data/app/Repository");
+            mkdir("$prefix/data/app/Repository/$dbname") if (! -d 
"$prefix/data/app/Repository/$dbname");
+            $self->_write_import_control_file($ctlfile, $table, $columns, 
$options);
+        }
+
+        my $datetime = time2str("%Y%m%d-%H%M%S", time());
+
+        mkdir("$prefix/log/import") if (! -d "$prefix/log/import");
+        my $badfile = "$prefix/log/import/$table-$datetime-$$.bad";
+        my $logfile = "$prefix/log/import/$table-$datetime-$$.log";
+        my $outfile = "$prefix/log/import/$table-$datetime-$$.out";
+
+        #my $sqlldr_options = " direct=TRUE parallel=TRUE silent";
+        my $sqlldr_options = "";
+        if ($context_options->{"app.Repository.$dbname.$table.import_rows"}) {
+            $sqlldr_options .= " " if ($sqlldr_options);
+            $sqlldr_options .= "rows=" . 
$context_options->{"app.Repository.$dbname.$table.import_rows"};
+        }
+        if 
($context_options->{"app.Repository.$dbname.$table.import_bindsize"}) {
+            $sqlldr_options .= " " if ($sqlldr_options);
+            $sqlldr_options .= "bindsize=" . 
$context_options->{"app.Repository.$dbname.$table.import_bindsize"};
+        }
+
+        my $cmd = "sqlldr userid=$self->{dbuser}/$self->[EMAIL PROTECTED] 
data=$datfile control=$ctlfile bad=$badfile log=$logfile errors=0 
$sqlldr_options > $outfile 2>&1";
+        #print STDERR "sqlldr userid=$self->{dbuser}/$self->[EMAIL PROTECTED] 
data=$datfile control=$ctlfile bad=$badfile log=$logfile errors=0 
$sqlldr_options > $outfile 2>&1\n";
+        my $rc = system($cmd);
+        my $exit_value  = $rc >> 8;
+
         if ($debug_sql) {
             $elapsed_time = $self->_read_timer($timer);
-            print $App::DEBUG_FILE "DEBUG_SQL: export_rows=[$retval] 
($elapsed_time sec) $DBI::errstr : [EMAIL PROTECTED]";
+            print $App::DEBUG_FILE "DEBUG_SQL: import_rows=[$nrows] 
($elapsed_time sec) $DBI::errstr\n";
+        }
+
+        my $badfile_size = (-s $badfile || 0);
+        if ($rc || $badfile_size) {
+            # failed
+            die "ERROR: sqlldr [$datfile] ($filebase) failed. rc=[$rc] 
exit=[$exit_value] badsize=[$badfile_size] : $table-$datetime-$$\n";
+        }
+        else {
+            my $import_log_results = $self->_read_import_log_file($logfile);
+            $nrows = $import_log_results->{rows_loaded} || 0;
+            if (!$nrows ||
+                $import_log_results->{rows_error} ||
+                $import_log_results->{rows_rejected} ||
+                $import_log_results->{rows_discarded}) {
+                die "ERROR: sqlldr [$datfile] ($filebase) failed. 
rows_loaded=[$nrows] rows_error=[$import_log_results->{rows_error}] 
rows_rejected=[$import_log_results->{rows_rejected}] 
rows_discarded=[$import_log_results->{rows_discarded}] : $table-$datetime-$$\n";
+            }
+            unlink($badfile);
+            unlink($logfile);
+            unlink($outfile);
+        }
+    }
+
+    &App::sub_exit($nrows) if ($App::trace);
+    return($nrows);
+}
+
+#  120 Rows successfully loaded.
+#  0 Rows not loaded due to data errors.
+#  0 Rows not loaded because all WHEN clauses were failed.
+#  0 Rows not loaded because all fields were null.
+#
+#
+#Space allocated for bind array:                 255936 bytes(248 rows)
+#Read   buffer bytes: 1048576
+#
+#Total logical records skipped:          0
+#Total logical records read:           120
+#Total logical records rejected:         0
+#Total logical records discarded:        0
+
+sub _read_import_log_file {
+    &App::sub_entry if ($App::trace);
+    my ($self, $logfile) = @_;
+    open(FILE, "< $logfile") || die "Unable to open $logfile: $!";
+    my $import_log_results = {};
+    while (<FILE>) {
+        chomp;
+        if (/^ *(\d+) Rows successfully loaded/) {
+            $import_log_results->{rows_loaded} = $1;
+        }
+        elsif (/^ *(\d+) Rows not loaded due to data errors/) {
+            $import_log_results->{rows_error} = $1;
+        }
+        elsif (/^Total logical records skipped: *(\d+)/) {
+            $import_log_results->{rows_skipped} = $1;
+        }
+        elsif (/^Total logical records read: *(\d+)/) {
+            $import_log_results->{rows_read} = $1;
+        }
+        elsif (/^Total logical records rejected: *(\d+)/) {
+            $import_log_results->{rows_rejected} = $1;
+        }
+        elsif (/^Total logical records discarded: *(\d+)/) {
+            $import_log_results->{rows_discarded} = $1;
+        }
+    }
+    close(FILE);
+    &App::sub_exit($import_log_results) if ($App::trace);
+    return($import_log_results);
+}
+
+sub _write_import_control_file {
+    &App::sub_entry if ($App::trace);
+    my ($self, $ctlfile, $table, $columns, $options) = @_;
+    my $table_def = $self->get_table_def($table);
+    my $field_sep = $options->{field_sep} || "|";
+    open(FILE, "> $ctlfile") || die "Unable to open $ctlfile: $!";
+    print FILE <<EOF;
+Load Data
+Append
+Into Table $table
+Fields Terminated By '$field_sep'
+(
+EOF
+    my ($column, $type);
+    for (my $i = 0; $i <= $#$columns; $i++) {
+        $column = $columns->[$i];
+        print FILE "   $column";
+        $type = $table_def->{column}{$column}{type};
+        if ($type eq "date") {
+            print FILE ' date "YYYY-MM-DD"';
+        }
+        elsif ($type eq "datetime") {
+            print FILE ' date "YYYY-MM-DD HH24:MI:SS"';
         }
+        print FILE "," if ($i < $#$columns);
+        print FILE "\n";
     }
-    
+    print FILE ")\n";
+    close(FILE);
     &App::sub_exit() if ($App::trace);
 }
 
@@ -391,10 +611,10 @@
 
#+----+-------------+-------+-------+-------------------------------------+-------------------+---------+-------------+------+-------+
 #|  1 | SIMPLE      | t1    | const | hotel_prop_ds_ak1,hotel_prop_ds_ie1 | 
hotel_prop_ds_ak1 |       9 | const,const |    1 |       |
 
#+----+-------------+-------+-------+-------------------------------------+-------------------+---------+-------------+------+-------+
-sub explain_sql {
+sub TBDexplain_sql {
     my ($self, $sql) = @_;
     my $dbh = $self->{dbh};
-    # TODO: Make this work for Oracle
+    # NOTE: MySQL "explain" only works for "select".
     # We convert "update" and "delete" to "select" to explain them.
     if (defined $dbh) {
         if ($sql =~ s/^delete/select */is) {

Modified: p5ee/trunk/App-Repository/t/DBI-import-ora.t
==============================================================================
--- p5ee/trunk/App-Repository/t/DBI-import-ora.t        (original)
+++ p5ee/trunk/App-Repository/t/DBI-import-ora.t        Thu Dec  4 15:16:56 2008
@@ -74,7 +74,7 @@
     is($rep->get("test_person",2,"first_name"), "susan",   "2nd row got in 
[susan]");
 
     is($rep->import_rows("test_person", ["age","first_name","gender","state"],
-        "$t_dir/files/DBI-import.01.dat", {field_sep => "|", import_method => 
"insert"}),
+        "$t_dir/files/DBI-import.01.dat", {field_sep => "|"}),
         120,
         "import from file [files/DBI-import.01.dat]");
     is($rep->get("test_person",3,"first_name"), "mike",    "3rd row got in 
[mike]");

Reply via email to