[When I first sent this it bounced with the message:

<[EMAIL PROTECTED]>:
ezmlm-send: fatal: Sorry, after removing unacceptable MIME parts from your message I 
was left with nothing (#5.7.0)

Not sure what that means, but I'll try again with the patch inline
instead of as an attachment.]

Tim Bunce <[EMAIL PROTECTED]> writes:

> How about this... Don't implement execute_array, just use the DBI default.

Ok, I have merged my DBD::Oracle execute_array() patch into a subversion
checkout and simplified it to implement execute_for_fetch() instead of
execute_array() (included below).

I get segfaults in make test t/30long.t, but I get that both with and
without the patch.

I will

 - Add docs for the problem with Oracle < 8.1.5.
 - Add some tests in t/20select.t.
 - Check up on the ArrayTupleStatus elements; it seems a third 'state'
   element has been added to every error tuple.

Anything else you would like before including it?

And finally for some meaningless timings. With the patch I ran the
following in less than one second:

    $s = $dbh->prepare("INSERT INTO mytable VALUES(?)");
    $s->execute_array(undef, [0..9999]);

while this ran in 7 seconds:

    $s->execute($_) for (0..9999);

While real-life speedups may be more modest, this is still encouraging!

 - Kristian.

-- 
Kristian Nielsen   [EMAIL PROTECTED]
Development Manager, Sifira A/S

-----------------------------------------------------------------------
Index: oci8.c
===================================================================
--- oci8.c      (revision 174)
+++ oci8.c      (working copy)
@@ -137,7 +137,7 @@
     ) {
        if (debug >= 4 || recno>1/*XXX temp*/)
            PerlIO_printf(DBILOGFP, "    OCIErrorGet after %s (er%ld:%s): %d, %ld: 
%s\n",
-               what, (long)recno,
+               what ? what : "<NULL>", (long)recno,
                    (eg_status==OCI_SUCCESS) ? "ok" : oci_status_name(eg_status),
                    status, (long)eg_errcode, errbuf);
        errcode = eg_errcode;
@@ -360,6 +360,30 @@
 {
     phs_t *phs = octxp;
     STRLEN phs_len;
+    AV *tuples_av;
+    SV *sv;
+    AV *av;
+    SV **sv_p;
+
+    /* Check for bind values supplied by tuple array. */
+    tuples_av = phs->imp_sth->bind_tuples;
+    if(tuples_av) {
+      /* NOTE: we already checked the validity in ora_st_bind_for_array_exec(). */
+      sv_p = av_fetch(tuples_av, phs->imp_sth->rowwise ? iter : phs->idx, 0);
+      av = (AV*)SvRV(*sv_p);
+      sv_p = av_fetch(av, phs->imp_sth->rowwise ? phs->idx : iter, 0);
+      sv = *sv_p;
+      if(SvOK(sv)) {
+        *bufpp = SvPV(sv, phs_len);
+        phs->alen = (phs->alen_incnull) ? phs_len+1 : phs_len;
+        phs->indp = 0;
+      } else {
+        *bufpp = SvPVX(sv);
+        phs->alen = 0;
+        phs->indp = -1;
+      }
+    }
+    else
     if (phs->desc_h) {
        *bufpp  = phs->desc_h;
        phs->alen = 0;
@@ -383,7 +407,7 @@
        PerlIO_printf(DBILOGFP, "       in  '%s' [%lu,%lu]: len %2lu, ind %d%s\n",
                phs->name, ul_t(iter), ul_t(index), ul_t(phs->alen), phs->indp,
                (phs->desc_h) ? " via descriptor" : "");
-    if (index > 0 || iter > 0)
+    if (!tuples_av && (index > 0 || iter > 0))
        croak("Arrays and multiple iterations not currently supported by DBD::Oracle 
(in %d/%d)", index,iter);
     return OCI_CONTINUE;
 }
Index: Oracle.pm
===================================================================
--- Oracle.pm   (revision 174)
+++ Oracle.pm   (working copy)
@@ -841,7 +841,36 @@
 
 {   package DBD::Oracle::st; # ====== STATEMENT ======
 
-    # all done in XS
+    sub execute_for_fetch {
+        my ($sth, $fetch_tuple_sub, $tuple_status) = @_;
+        my $row_count = 0;
+        my $batch_size = ($sth->{ora_array_chunk_size} ||= 1000);
+        my $tuple_batch_status;
+
+        if(defined($tuple_status)) {
+            @$tuple_status = ();
+           $tuple_batch_status = [ ];
+       }
+        while (1) {
+            my @tuple_batch;
+            for (my $i = 0; $i < $batch_size; $i++) {
+                push @tuple_batch, [ @{$fetch_tuple_sub->() || last} ];
+            }
+            last unless @tuple_batch;
+            my $res = ora_execute_array($sth,
+                                        [EMAIL PROTECTED],
+                                        scalar(@tuple_batch),
+                                        $tuple_batch_status);
+            if(defined($res) && defined($row_count)) {
+                $row_count += $res;
+            } else {
+                $row_count = undef;
+            }
+            push @$tuple_status, @$tuple_batch_status
+                if defined($tuple_status);
+        }
+        return $row_count;
+    }
 }
 
 1;
@@ -1221,6 +1250,26 @@
 
 =back
 
+=head2 Statement Handle Attributes
+
+=over 4
+
+=item C<ora_array_chunk_size>
+
+Because of OCI limitations, DBD::Oracle needs to buffer up rows of
+bind values in its C<execute_for_fetch> implementation. This attribute
+sets the number of rows to buffer at a time (default value is 1000).
+
+The C<execute_for_fetch> function will collect (at most) this many
+rows in an array, send them of to the DB for execution, then go back
+to collect the next chunk of rows and so on. This attribute can be
+used to limit or extend the number of rows processed at a time.
+
+Note that this attribute also applies to C<execute_array>, since that
+method is implemented using C<execute_for_fetch>.
+
+=back
+
 =head2 Prepare Attributes
 
 These attributes may be used in the C<\%attr> parameter of the
Index: dbdimp.c
===================================================================
--- dbdimp.c    (revision 174)
+++ dbdimp.c    (working copy)
@@ -752,6 +752,7 @@
     int idx=0;
     char *style="", *laststyle=Nullch;
     STRLEN namelen;
+    phs_t *phs;
 
     /* allocate room for copy of statement with spare capacity */
     /* for editing '?' or ':1' into ':p1' so we can use obndrv.        */
@@ -853,8 +854,10 @@
        if (imp_sth->all_params_hv == NULL)
            imp_sth->all_params_hv = newHV();
        phs_sv = newSVpv((char*)&phs_tpl, sizeof(phs_tpl)+namelen+1);
+        phs = (phs_t*)(void*)SvPVX(phs_sv);
        hv_store(imp_sth->all_params_hv, start, namelen, phs_sv, 0);
-       strcpy( ((phs_t*)(void*)SvPVX(phs_sv))->name, start);
+        phs->idx = idx-1;       /* Will be 0 for :1, -1 for :foo. */
+       strcpy(phs->name, start);
     }
     *dest = '\0';
     if (imp_sth->all_params_hv) {
@@ -1636,9 +1639,358 @@
 }
 
 
+static int
+do_bind_array_exec(sth, imp_sth, phs)
+    SV *sth;
+    imp_sth_t *imp_sth;
+    phs_t *phs;
+{
+    sword status;
 
+    OCIBindByName_log_stat(imp_sth->stmhp, &phs->bndhp, imp_sth->errhp,
+            (text*)phs->name, (sb4)strlen(phs->name),
+            0,
+            phs->maxlen ? (sb4)phs->maxlen : 1, /* else bind "" fails  */
+            (ub2)phs->ftype, 0,
+            NULL,      /* ub2 *alen_ptr not needed with OCIBindDynamic */
+            0,
+            0,      /* max elements that can fit in allocated array    */
+            NULL,      /* (ptr to) current number of elements in array */
+            (ub4)OCI_DATA_AT_EXEC,
+            status);
+    if (status != OCI_SUCCESS) {
+        oci_error(sth, imp_sth->errhp, status, "OCIBindByName");
+        return 0;
+    }
+    OCIBindDynamic_log(phs->bndhp, imp_sth->errhp,
+                       (dvoid *)phs, dbd_phs_in,
+                       (dvoid *)phs, dbd_phs_out, status);
+    if (status != OCI_SUCCESS) {
+        oci_error(sth, imp_sth->errhp, status, "OCIBindDynamic");
+        return 0;
+    }
+    return 1;
+}
 
+static void
+init_bind_for_array_exec(phs)
+    phs_t *phs;
+{
+    if (phs->sv == &sv_undef) {        /* first bind for this placeholder  */
+        phs->is_inout = 0;
+        phs->maxlen = 1;
+        /* treat Oracle7 SQLT_CUR as SQLT_RSET for Oracle8     */          
+        if (phs->ftype==102)                                            
+            phs->ftype = 116;                                           
+        /* some types require the trailing null included in the length. */
+        /* SQLT_STR=5=STRING, SQLT_AVC=97=VARCHAR      */
+        phs->alen_incnull = (phs->ftype==SQLT_STR || phs->ftype==SQLT_AVC);
+    }
+}
+
+/* Re-bind placeholders for array execute, to get the correct
+   max-length values. */
+static int
+ora_st_bind_for_array_exec(sth, imp_sth, tuples_av, exe_count, param_count, 
columns_av)
+    SV *sth;
+    imp_sth_t *imp_sth;
+    AV *tuples_av;
+    ub4 exe_count;
+    int param_count;
+    AV *columns_av;
+{
+    int i, j;
+    char namebuf[30];
+    SV **sv_p;
+    SV *sv;
+    AV *av;
+    phs_t **phs;
+    STRLEN len;
+    sword status;
+
+    phs = safemalloc(param_count*sizeof(*phs));
+    memset(phs, 0, param_count*sizeof(*phs));
+
+    /* Loop over values, computing maximum lengths. */
+    if(columns_av) {
+        /* Column-wise operation; tuples_av holds a list of columns. */
+        for(i = 0; i < param_count; i++) {
+            char *name;
+
+            sv_p = av_fetch(columns_av, i, 0);
+            if(sv_p == NULL) {
+                Safefree(phs);
+                croak("Missing column entry %d", i);
+            }
+            sv = *sv_p;
+            if (DBIS->debug >= 9)
+                PerlIO_printf(DBILOGFP, "   arrbind %d->'%s'\n", i, neatsvpv(sv,0));
+            name = SvPV(sv, len);
+            sv_p = hv_fetch(imp_sth->all_params_hv, name, len, 0);
+            if (sv_p == NULL) {
+                Safefree(phs);
+                croak("Can't execute for non-existent placeholder %s",
+                      neatsvpv(sv,0));
+            }
+            phs[i] = (phs_t*)(void*)SvPVX(*sv_p); /* placeholder struct        */
+            phs[i]->idx = i;
+            init_bind_for_array_exec(phs[i]);
+
+            sv_p = av_fetch(tuples_av, i, 0);
+            if(sv_p == NULL) {
+                Safefree(phs);
+                croak("Cannot fetch column %d from tuple array", i);
+            }
+            sv = *sv_p;
+            if(!SvROK(sv) || SvTYPE(SvRV(sv)) != SVt_PVAV) {
+                Safefree(phs);
+                croak("Not an array ref in column %d", i);
+            }
+            av = (AV*)SvRV(sv);
+
+            for(j = 0; j < exe_count; j++) {
+                sv_p = av_fetch(av, j, 0);
+                if(sv_p == NULL) {
+                    Safefree(phs);
+                    croak("Cannot fetch value for param %d in element %d", i, j);
+                }
+                sv = *sv_p;
+
+                /* Find the value length, and increase maxlen if needed. */
+                if(SvROK(sv)) {
+                    Safefree(phs);
+                    croak("Can't bind a reference (%s) for param %d, entry %d",
+                          neatsvpv(sv,0), i, j);
+                }
+                SvPV(sv, len);
+                if(len > phs[i]->maxlen)
+                    phs[i]->maxlen = len;
+            }
+
+            if(!do_bind_array_exec(sth, imp_sth, phs[i])) {
+                Safefree(phs);
+                return 0;
+            }              
+        }
+    } else {
+        /* Row-wise operation; tuples_av holds a list of bind value tuples. */
+        for(j = 0; j < exe_count; j++) {
+            sv_p = av_fetch(tuples_av, j, 0);
+            if(sv_p == NULL) {
+                Safefree(phs);
+                croak("Cannot fetch tuple %d", j);
+            }
+            sv = *sv_p;
+            if(!SvROK(sv) || SvTYPE(SvRV(sv)) != SVt_PVAV) {
+                Safefree(phs);
+                croak("Not an array ref in element %d", j);
+            }
+            av = (AV*)SvRV(sv);
+            for(i = 0; i < param_count; i++) {
+                if(!phs[i]) {
+                    SV **phs_svp;
+
+                    sprintf(namebuf, ":p%d", i+1);
+                    phs_svp = hv_fetch(imp_sth->all_params_hv,
+                                       namebuf, strlen(namebuf), 0);
+                    if (phs_svp == NULL) {
+                        Safefree(phs);
+                        croak("Can't execute for non-existent placeholder :%d", i);
+                    }
+                    phs[i] = (phs_t*)(void*)SvPVX(*phs_svp); /* placeholder struct    
 */
+                    if(phs[i]->idx < 0) {
+                        Safefree(phs);
+                        croak("Placeholder %d not of ?/:1 type", i);
+                    }
+                    init_bind_for_array_exec(phs[i]);
+                }
+
+                sv_p = av_fetch(av, phs[i]->idx, 0);
+                if(sv_p == NULL) {
+                    Safefree(phs);
+                    croak("Cannot fetch value for param %d in entry %d", i, j);
+                }
+                sv = *sv_p;
+
+                /* Find the value length, and increase maxlen if needed. */
+                if(SvROK(sv)) {
+                    Safefree(phs);
+                    croak("Can't bind a reference (%s) for param %d, entry %d",
+                          neatsvpv(sv,0), i, j);
+                }
+                SvPV(sv, len);
+                if(len > phs[i]->maxlen)
+                    phs[i]->maxlen = len;
+
+                /* Do OCI bind calls on last iteration. */
+                if(j == exe_count - 1) {
+                  if(!do_bind_array_exec(sth, imp_sth, phs[i])) {
+                    Safefree(phs);
+                    return 0;
+                  }              
+                }
+            }
+            /* ToDo: Maybe extract common code from here and dbd_bind_ph() and put
+               it in some function(s) that can be called from both places. */
+        }
+    }
+
+    Safefree(phs);
+    return 1;                   /* Success. */
+}
+
 int
+ora_st_execute_array(sth, imp_sth, tuples, tuples_status, columns, exe_count)
+    SV *sth;
+    imp_sth_t *imp_sth;
+    SV *tuples;
+    SV *tuples_status;
+    SV *columns;
+    ub4 exe_count;
+{
+    dTHR;
+    ub4 row_count = 0;
+    int debug = DBIS->debug;
+    D_imp_dbh_from_sth;
+    sword status, exe_status;
+    int is_select = (imp_sth->stmt_type == OCI_STMT_SELECT);
+    AV *tuples_av, *tuples_status_av, *columns_av;
+    ub4 oci_mode;
+    ub4 num_errs;
+    int i;
+    int autocommit = DBIc_has(imp_dbh,DBIcf_AutoCommit);
+
+    if (debug >= 2)
+       PerlIO_printf(DBILOGFP, "    ora_st_execute_array %s count=%d (%s %s %s)...\n",
+                      oci_stmt_type_name(imp_sth->stmt_type), exe_count,
+                      neatsvpv(tuples,0), neatsvpv(tuples_status,0),
+                      neatsvpv(columns, 0));
+
+    if (is_select) {
+        croak("ora_st_execute_array(): SELECT statement not supported "
+              "for array operation.");
+    }
+
+    if (imp_sth->out_params_av || imp_sth->has_lobs) {
+        croak("ora_st_execute_array(): Output placeholders and LOBs not "
+              "supported for array operation.");
+    }
+
+    /* Check that the `tuples' parameter is an array ref, find the length,
+       and store it in the statement handle for the OCI callback. */
+    if(!SvROK(tuples) || SvTYPE(SvRV(tuples)) != SVt_PVAV) {
+        croak("ora_st_execute_array(): Not an array reference.");
+    }
+    tuples_av = (AV*)SvRV(tuples);
+
+    /* Check the `columns' parameter. */
+    if(SvTRUE(columns)) {
+        if(!SvROK(columns) || SvTYPE(SvRV(columns)) != SVt_PVAV) {
+          croak("ora_st_execute_array(): columns not an array reference.");
+        }
+        columns_av = (AV*)SvRV(columns);
+    } else {
+        columns_av = NULL;
+    }
+
+    /* Check the `tuples_status' parameter. */
+    if(SvTRUE(tuples_status)) {
+        if(!SvROK(tuples_status) || SvTYPE(SvRV(tuples_status)) != SVt_PVAV) {
+          croak("ora_st_execute_array(): tuples_status not an array reference.");
+        }
+        tuples_status_av = (AV*)SvRV(tuples_status);
+        av_fill(tuples_status_av, exe_count - 1);
+        /* Fill in 'unknown' exe count in every element (know not how to get
+           individual execute row counts from OCI). */
+        for(i = 0; i < exe_count; i++) {
+            av_store(tuples_status_av, i, newSViv((IV)-1));
+        }
+    } else {
+        tuples_status_av = NULL;
+    }
+
+    /* Nothing to do if no tuples. */
+    if(exe_count <= 0)
+      return 0;
+
+    /* Ensure proper OCIBindByName() calls for all placeholders. */
+    if(!ora_st_bind_for_array_exec(sth, imp_sth, tuples_av, exe_count,
+                                   DBIc_NUM_PARAMS(imp_sth), columns_av))
+        return -2;
+
+    /* Store array of bind typles, for use in OCIBindDynamic() callback. */
+    imp_sth->bind_tuples = tuples_av;
+    imp_sth->rowwise = (columns_av == NULL);
+
+    oci_mode = OCI_BATCH_ERRORS;
+    if(autocommit)
+        oci_mode |= OCI_COMMIT_ON_SUCCESS;
+    OCIStmtExecute_log_stat(imp_sth->svchp, imp_sth->stmhp, imp_sth->errhp,
+                            exe_count, 0, 0, 0, oci_mode, exe_status);
+    imp_sth->bind_tuples = NULL;
+
+    if (exe_status != OCI_SUCCESS) {
+       oci_error(sth, imp_sth->errhp, exe_status, 
ora_sql_error(imp_sth,"OCIStmtExecute"));
+        if(exe_status != OCI_SUCCESS_WITH_INFO)
+            return -2;
+    }
+
+    OCIAttrGet_stmhp_stat(imp_sth, &num_errs, 0, OCI_ATTR_NUM_DML_ERRORS, status);
+    if (debug >= 6)
+       PerlIO_printf(DBILOGFP, "    ora_st_execute_array %d errors in batch.\n",
+                      num_errs);
+    if(num_errs && tuples_status_av) {
+        OCIError *row_errhp, *tmp_errhp;
+        ub4 row_off;
+        SV *err_svs[2];
+        AV *err_av;
+        sb4 err_code;
+
+        err_svs[0] = newSViv((IV)0);
+        err_svs[1] = newSVpvn("", 0);
+        OCIHandleAlloc_ok(imp_sth->envhp, &row_errhp, OCI_HTYPE_ERROR, status);
+        OCIHandleAlloc_ok(imp_sth->envhp, &tmp_errhp, OCI_HTYPE_ERROR, status);
+        for(i = 0; i < num_errs; i++) {
+            OCIParamGet_log_stat(imp_sth->errhp, OCI_HTYPE_ERROR,
+                                 tmp_errhp, (dvoid *)&row_errhp,
+                                 (ub4)i, status);
+            OCIAttrGet_log_stat(row_errhp, OCI_HTYPE_ERROR, &row_off, 0, 
+                                OCI_ATTR_DML_ROW_OFFSET, imp_sth->errhp, status);
+            if (debug >= 6)
+                PerlIO_printf(DBILOGFP, "    ora_st_execute_array error in row %d.\n",
+                              row_off);
+            sv_setpv(err_svs[1], "");
+            err_code = oci_error_get(row_errhp, exe_status, NULL, err_svs[1], debug);
+            sv_setiv(err_svs[0], (IV)err_code);
+            av_store(tuples_status_av, row_off,
+                     newRV_noinc((SV *)(av_make(2, err_svs))));
+        }
+        OCIHandleFree_log_stat(tmp_errhp, OCI_HTYPE_ERROR,  status);
+        OCIHandleFree_log_stat(row_errhp, OCI_HTYPE_ERROR,  status);
+
+        /* Do a commit here if autocommit is set, since Oracle
+           doesn't do that for us when some rows are in error. */
+        if(autocommit) {
+            OCITransCommit_log_stat(imp_sth->svchp, imp_sth->errhp,
+                                    OCI_DEFAULT, status);
+            if (status != OCI_SUCCESS) {
+                oci_error(sth, imp_sth->errhp, status, "OCITransCommit");
+                return -2;
+            }
+        }
+    }
+
+    if(num_errs) {
+        return -2;
+    } else {
+        ub4 row_count = 0;
+       OCIAttrGet_stmhp_stat(imp_sth, &row_count, 0, OCI_ATTR_ROW_COUNT, status);
+        return row_count;
+    }
+}
+
+
+int
 dbd_st_blob_read(sth, imp_sth, field, offset, len, destrv, destoffset)
     SV *sth;
     imp_sth_t *imp_sth;
Index: dbdimp.h
===================================================================
--- dbdimp.h    (revision 174)
+++ dbdimp.h    (working copy)
@@ -111,6 +111,9 @@
     int        has_lobs;
     lob_refetch_t *lob_refetch;
     int        disable_finish; /* fetched cursors can core dump in finish */
+    AV          *bind_tuples;   /* Bind tuples in array execute, or NULL */
+    int         rowwise;        /* If true, bind_tuples is list of */
+                                /* tuples, otherwise list of columns. */
 
     /* Input Details   */
     char      *statement;      /* sql (see sth_scan)           */
@@ -199,6 +202,7 @@
     ub4   desc_t;      /* OCI type of desc_h                   */
     ub4   alen;
     ub2 arcode;
+    int   idx;          /* 0-based index for ?/:1 style, or -1  */
 
     sb2 indp;          /* null indicator                       */
     char *progv;
@@ -253,6 +257,8 @@
              dvoid **bufpp, ub4 **alenpp, ub1 *piecep,
              dvoid **indpp, ub2 **rcodepp));
 int dbd_rebind_ph_rset _((SV *sth, imp_sth_t *imp_sth, phs_t *phs));
+int ora_st_execute_array _((SV *sth, imp_sth_t *imp_sth, SV *tuples,
+                            SV *tuples_status, SV *columns, ub4 exe_count));
 
 void * oci_db_handle(imp_dbh_t *imp_dbh, int handle_type, int flags);
 void * oci_st_handle(imp_sth_t *imp_sth, int handle_type, int flags);
Index: Oracle.xs
===================================================================
--- Oracle.xs   (revision 174)
+++ Oracle.xs   (working copy)
@@ -90,6 +90,33 @@
 
 
 void
+ora_execute_array(sth, tuples, exe_count, tuples_status, cols=&sv_undef)
+    SV *        sth
+    SV *        tuples
+    int         exe_count
+    SV *        tuples_status
+    SV *        cols
+    PREINIT:
+    D_imp_sth(sth);
+    int retval;
+    CODE:
+    /* XXX Need default bindings if any phs are so far unbound(?) */
+    /* XXX this code is duplicated in selectrow_arrayref above  */
+    if (DBIc_ROW_COUNT(imp_sth) > 0) /* reset for re-execute */
+        DBIc_ROW_COUNT(imp_sth) = 0;
+    retval = ora_st_execute_array(sth, imp_sth, tuples, tuples_status,
+                                  cols, (ub4)exe_count);
+    /* XXX Handle return value ... like DBI::execute_array(). */
+    /* remember that dbd_st_execute must return <= -2 for error */
+    if (retval == 0)            /* ok with no rows affected     */
+        XST_mPV(0, "0E0");      /* (true but zero)              */
+    else if (retval < -1)       /* -1 == unknown number of rows */
+        XST_mUNDEF(0);          /* <= -2 means error            */
+    else
+        XST_mIV(0, retval);     /* typically 1, rowcount or -1  */
+
+
+void
 cancel(sth)
     SV *        sth
     CODE:

Reply via email to