cvsuser     06/02/28 19:39:51

  Modified:    App-WorkQueue/lib/App WorkQueue.pm
               App-WorkQueue/lib/App/WorkQueue Repository.pm
  Log:
  get Repository queue to work
  
  Revision  Changes    Path
  1.4       +152 -103  p5ee/App-WorkQueue/lib/App/WorkQueue.pm
  
  Index: WorkQueue.pm
  ===================================================================
  RCS file: /cvs/public/p5ee/App-WorkQueue/lib/App/WorkQueue.pm,v
  retrieving revision 1.3
  retrieving revision 1.4
  diff -u -r1.3 -r1.4
  --- WorkQueue.pm      9 Feb 2006 17:57:15 -0000       1.3
  +++ WorkQueue.pm      1 Mar 2006 03:39:51 -0000       1.4
  @@ -19,6 +19,7 @@
   use strict;
   
   my $verbose = $App::options{verbose};
  +$verbose = 0 if (!$verbose || $verbose < 5);
   
   =head1 NAME
   
  @@ -71,6 +72,8 @@
   my $GCONSTR_LIMITS       = 1;
   my $GCONSTR_COUNT_ATTRIB = 2;
   my $GCONSTR_LIMIT_ATTRIB = 3;
  +my $GCONSTR_VALUE_ATTRIB = 4;
  +my $GCONSTR_HARD_LIMIT   = 5;
   
   my $CONSTR_COUNTS        = 0;
   my $CONSTR_LIMITS        = 1;
  @@ -114,15 +117,18 @@
   
       $self->SUPER::_init($args);
       $self->{data} = [];
  -    $self->{type} = ($self->{columns}) ? "ARRAY" : "HASH";
  +    if (!$self->{type}) {
  +        $self->{type} = $self->{queue_type} || ($self->{columns} ? "ARRAY" : 
"HASH");
  +    }
   
       die "status_attrib not set on queue"     if (!$self->{status_attrib});
       #die "STATUS_UNBUFFERED not set on queue" if 
(!$self->{STATUS_UNBUFFERED});
       die "STATUS_UNACQUIRED not set on queue" if 
(!$self->{STATUS_UNACQUIRED});
       die "STATUS_ACQUIRED not set on queue"   if (!$self->{STATUS_ACQUIRED});
       die "STATUS_RELEASED not set on queue"   if (!$self->{STATUS_RELEASED});
  +    die "id_attrib or auto_id_attrib not set on queue" if 
(!$self->{id_attrib} && !$self->{auto_id_attrib});
   
  -    my $id_attrib = $self->{id_attrib} || die "id_attrib not set on queue";
  +    my $id_attrib = $self->{id_attrib};
       if ($self->{type} eq "ARRAY") {
           my $colidx = $self->_colidx();
           my $id_attribs = [split(/,/,$id_attrib)];
  @@ -136,10 +142,36 @@
       else {
           $self->{id_attribs} = [split(/,/,$id_attrib)];
       }
  +
  +    if ($self->{auto_id_attrib}) {
  +        if ($self->{type} eq "ARRAY") {
  +            my $colidx  = $self->_colidx();
  +            my $attrib  = $self->{auto_id_attrib};
  +            my $columns = $self->{columns};
  +            if (defined $colidx->{$attrib}) {
  +                $self->{auto_id_idx} = $colidx->{$attrib};
  +            }
  +            else {
  +                push(@$columns, $attrib);
  +                $colidx->{$attrib} = $#$columns;
  +                $self->{auto_id_idx} = $#$columns;
  +            }
  +        }
  +    }
  +
  +    $self->{status_counts} = {};
  +
       if ($self->{sort_spec}) {
           $self->_analyze_sort_spec();
       }
  -    $self->{verbose} = $App::options{verbose};
  +
  +    $self->{verbose} = $verbose;
  +    &App::sub_exit() if ($App::trace);
  +}
  +
  +sub shutdown {
  +    &App::sub_entry if ($App::trace);
  +    my ($self) = @_;
       &App::sub_exit() if ($App::trace);
   }
   
  @@ -166,7 +198,7 @@
   primitive operations for this work queue are:
   
    * $q->push($entry)
  - * $entry = $q->acquire(\%params)
  + * $entry = $q->acquire()
    * $q->release($entry)
    * $q->unacquire($entry)
    * @entries = $q->locate(\%params, \%options)
  @@ -235,13 +267,14 @@
           my $inserted = 0;
           if ($release_lowest) {
               my $removed = 0;
  -            my ($ent);
  +            my ($ent, $cmp);
               my $resource_key = $self->_resource_key($entry);
               for (my $i = $#$entries; $i >= 0; $i--) {
                   $ent = $entries->[$i];
                   if ($self->_resource_key($ent) eq $resource_key) {
                       # IF not inserted yet AND new entry is lower or same 
priority THEN
  -                    if (!$inserted && $self->_compare_entries($entry, $ent) 
> -1) {
  +                    $cmp = $self->_compare_entries($entry, $ent);
  +                    if (!$inserted && $cmp > -1) {
                           if (!$removed) {   # "insert and remove self"
                               $inserted = 1;
                               $removed  = 1;
  @@ -369,7 +402,6 @@
   =head2 acquire()
   
       * Signature: $entry = $q->acquire();
  -    * Signature: $entry = $q->acquire($sort_spec);
       * Param:     $sort_spec         string
       * Return:    $entry             HASH/ARRAY
       * Throws:    App::Exception::WorkQueue
  @@ -383,7 +415,7 @@
   
   sub acquire {
       &App::sub_entry if ($App::trace);
  -    my ($self, $sort_spec) = @_;
  +    my ($self) = @_;
       my $entry = undef;
       die "acquire() not implemented";
       &App::sub_exit($entry) if ($App::trace);
  @@ -392,7 +424,7 @@
   
   sub _acquire_in_mem {
       &App::sub_entry if ($App::trace);
  -    my ($self, $sort_spec) = @_;
  +    my ($self) = @_;
   
       my ($entry);
       my $STATUS_UNACQUIRED = $self->{STATUS_UNACQUIRED};
  @@ -405,25 +437,39 @@
               my $colidx = $self->_colidx();
               my $status_idx = $colidx->{$status_attrib};
               die "status_attribute [$status_attrib] does not exist on 
elements of this queue" if (!defined $status_idx);
  +            my ($acquired);
               foreach my $e (@$entries) {
                   # print ">>> ENTRY: status=[$e->[$status_idx]] 
idx=[$status_idx] (should be $STATUS_UNACQUIRED)\n";
                   next if ($e->[$status_idx] ne $STATUS_UNACQUIRED);
                   if ($self->_acquire_resources($e)) {
                       $entry = $e;
  -                    
$self->update($entry,[$self->{status_attrib}],[$self->{STATUS_ACQUIRED}]);
  -                    print "ACQUIRED[M]: [", join("|",@$e), "]\n" if 
($verbose);
  -                    last;
  +                    $acquired = $self->_acquire_entry($entry);
  +                    if ($acquired) {
  +                        print "ACQUIRED[M]: [", join("|",@$e), "]\n" if 
($verbose);
  +                        last;
  +                    }
  +                    else {
  +                        $self->_release_resources($entry);
  +                        $self->_maintain_queue_buffers("release",$entry);
  +                    }
                   }
               }
           }
           else {
  +            my ($acquired);
               foreach my $e (@$entries) {
                   next if ($e->{$status_attrib} ne $STATUS_UNACQUIRED);
                   if ($self->_acquire_resources($e)) {
                       $entry = $e;
  -                    
$self->update($entry,[$self->{status_attrib}],[$self->{STATUS_ACQUIRED}]);
  -                    print "ACQUIRED[M]: {", join("|",%$e), "}\n" if 
($verbose);
  -                    last;
  +                    $acquired = $self->_acquire_entry($entry);
  +                    if ($acquired) {
  +                        print "ACQUIRED[M]: {", join("|",%$e), "}\n" if 
($verbose);
  +                        last;
  +                    }
  +                    else {
  +                        $self->_release_resources($entry);
  +                        $self->_maintain_queue_buffers("release",$entry);
  +                    }
                   }
               }
           }
  @@ -434,6 +480,46 @@
       return($entry);
   }
   
  +sub _acquire_entry {
  +    &App::sub_entry if ($App::trace);
  +    my ($self, $entry) = @_;
  +    my $acquired = 1;
  +    
$self->update($entry,[$self->{status_attrib}],[$self->{STATUS_ACQUIRED}]);
  +    &App::sub_exit($acquired) if ($App::trace);
  +    return($acquired);
  +}
  +
  +sub acquired_entries {
  +    &App::sub_entry if ($App::trace);
  +    my ($self) = @_;
  +
  +    my (@entries);
  +    my $STATUS_UNACQUIRED = $self->{STATUS_UNACQUIRED};
  +    my $STATUS_UNBUFFERED = $self->{STATUS_UNBUFFERED};
  +    my $status_attrib     = $self->{status_attrib};
  +    my $entries           = $self->{data};
  +
  +    if ($self->{type} eq "ARRAY") {
  +        my $colidx = $self->_colidx();
  +        my $status_idx = $colidx->{$status_attrib};
  +        foreach my $entry (@$entries) {
  +            if ($entry->[$status_idx] ne $STATUS_UNACQUIRED && 
$entry->[$status_idx] ne $STATUS_UNBUFFERED) {
  +                CORE::push(@entries, $entry);
  +            }
  +        }
  +    }
  +    else {
  +        foreach my $entry (@$entries) {
  +            if ($entry->{$status_attrib} ne $STATUS_UNACQUIRED && 
$entry->{$status_attrib} ne $STATUS_UNBUFFERED) {
  +                CORE::push(@entries, $entry);
  +            }
  +        }
  +    }
  +
  +    &App::sub_exit([EMAIL PROTECTED]) if ($App::trace);
  +    return([EMAIL PROTECTED]);
  +}
  +
   #############################################################################
   # release()
   #############################################################################
  @@ -520,26 +606,38 @@
   
   sub _array_to_key {
       my ($self, $entry) = @_;
  -    my $id_indexes = $self->{id_indexes};
       my ($key);
  -    if ($#$id_indexes == 0) {
  -        $key = $entry->[$id_indexes->[0]];
  +    my $auto_id_idx = $self->{auto_id_idx};
  +    if (defined $auto_id_idx) {
  +        $key = $entry->[$auto_id_idx];
       }
       else {
  -        $key = join(":", @[EMAIL PROTECTED]);
  +        my $id_indexes = $self->{id_indexes};
  +        if ($#$id_indexes == 0) {
  +            $key = $entry->[$id_indexes->[0]];
  +        }
  +        else {
  +            $key = join(":", @[EMAIL PROTECTED]);
  +        }
       }
       return($key);
   }
   
   sub _hash_to_key {
       my ($self, $entry) = @_;
  -    my $id_attribs = $self->{id_attribs};
       my ($key);
  -    if ($#$id_attribs == 0) {
  -        $key = $entry->{$id_attribs->[0]};
  +    my $auto_id_idx = $self->{auto_id_idx};
  +    if (defined $auto_id_idx) {
  +        $key = $entry->[$auto_id_idx];
       }
       else {
  -        $key = join(":", @[EMAIL PROTECTED]);
  +        my $id_attribs = $self->{id_attribs};
  +        if ($#$id_attribs == 0) {
  +            $key = $entry->{$id_attribs->[0]};
  +        }
  +        else {
  +            $key = join(":", @[EMAIL PROTECTED]);
  +        }
       }
       return($key);
   }
  @@ -580,7 +678,6 @@
       my $STATUS_ACQUIRED   = $self->{STATUS_ACQUIRED};
       my $STATUS_UNACQUIRED = $self->{STATUS_UNACQUIRED};
       my $status_attrib     = $self->{status_attrib};
  -    my $id_attrib         = $self->{id_attrib};
       my $data              = $self->{data};
       my ($e, $ent, $entry_key);
   
  @@ -595,7 +692,7 @@
                   if ($ent->[$status_idx] eq $STATUS_ACQUIRED) {
                       $self->_release_resources($ent);
                   }
  -                
$self->update($ent,[$self->{status_attrib}],[$self->{STATUS_UNACQUIRED}]);
  +                $self->_unacquire_entry($ent);
                   print "UNACQUIRED[M]: [", join("|",@$ent), "]\n" if 
($verbose);
                   last;
               }
  @@ -609,7 +706,7 @@
                   if ($ent->{$status_attrib} eq $STATUS_ACQUIRED) {
                       $self->_release_resources($ent);
                   }
  -                
$self->update($ent,[$self->{status_attrib}],[$self->{STATUS_UNACQUIRED}]);
  +                $self->_unacquire_entry($ent);
                   print "UNACQUIRED[M]: {", join("|",%$ent), "}\n" if 
($verbose);
                   last;
               }
  @@ -619,6 +716,13 @@
       &App::sub_exit() if ($App::trace);
   }
   
  +sub _unacquire_entry {
  +    &App::sub_entry if ($App::trace);
  +    my ($self, $entry) = @_;
  +    
$self->update($entry,[$self->{status_attrib}],[$self->{STATUS_UNACQUIRED}]);
  +    &App::sub_exit() if ($App::trace);
  +}
  +
   #############################################################################
   # locate()
   #############################################################################
  @@ -770,7 +874,7 @@
               $status = $entry->{$status_attrib};
           }
       }
  -    my $status_counts = $self->_status_counts();
  +    my $status_counts = $self->{status_counts};
       $status_counts->{$status} += $count;
       if ($from_status) {
           $status_counts->{$from_status} -= $count;
  @@ -781,6 +885,14 @@
       &App::sub_exit() if ($App::trace);
   }
   
  +sub schedule_entry_acquisition {
  +    &App::sub_entry if ($App::trace);
  +    my $self = shift;
  +    my %event = @_;
  +    $self->{acquisition_event} = \%event;
  +    &App::sub_exit() if ($App::trace);
  +}
  +
   #############################################################################
   # print()
   #############################################################################
  @@ -836,7 +948,7 @@
       $fh = \*STDOUT if (!$fh);
   
       foreach my $c (@{$self->{global_constraints}}) {
  -        printf $fh "     GLOBAL CONSTRAINT: %4d/%4d\n", $c->[0]{$c->[2]}, 
$c->[1]{$c->[3]};
  +        printf $fh "     GLOBAL CONSTRAINT: %4d/%4d [%s]\n", 
$c->[0]{$c->[2]}, $c->[1]{$c->[3]}, $c->[4];
       }
       foreach my $c (@{$self->{constraints}}) {
           my (%seen);
  @@ -924,7 +1036,7 @@
       &App::sub_entry if ($App::trace);
       my ($self, $fh) = @_;
       $fh = \*STDOUT if (!$fh);
  -    my $status_counts = $self->_status_counts();
  +    my $status_counts = $self->{status_counts};
       foreach my $status (sort keys %$status_counts) {
           printf $fh "   %7s => %8d\n", $status, $status_counts->{$status};
       }
  @@ -937,8 +1049,8 @@
       $fh = \*STDOUT if (!$fh);
       my $resource_counts = $self->_resource_counts();
       foreach my $resource_key (sort keys %{$resource_counts->{total}}) {
  -        printf $fh "   %7s => %8d=tot %8d=buf %8d=BUFSIZE\n",
  -            $resource_key,
  +        printf $fh "   %9s => %8d=tot %8d=buf %8d=BUFSIZE\n",
  +            "[$resource_key]",
               $resource_counts->{total}{$resource_key},
               $resource_counts->{buffer}{$resource_key},
               $self->{BUFFER_SIZE};
  @@ -1002,13 +1114,15 @@
   sub _num_entries_from_status_counts {
       &App::sub_entry if ($App::trace);
       my ($self, $status) = @_;
  -    my $status_counts = $self->_status_counts();
  +    my $status_counts = $self->{status_counts};
       my ($num);
       if ($status) {
           $num = $status_counts->{$status};
       }
       else {
  -        $num = $status_counts->{N} + $status_counts->{W};
  +        $num = $status_counts->{$self->{STATUS_UNBUFFERED}};
  +        $num += $status_counts->{$self->{STATUS_UNACQUIRED}}
  +            if ($self->{STATUS_UNBUFFERED} ne $self->{STATUS_UNACQUIRED});
       }
       $num = 0 if (!defined $num);
       &App::sub_exit($num) if ($App::trace);
  @@ -1047,7 +1161,7 @@
   
   sub set_global_constraint {
       &App::sub_entry if ($App::trace);
  -    my ($self, $counts, $limits, $count_attrib, $limit_attrib) = @_;
  +    my ($self, $counts, $limits, $count_attrib, $limit_attrib, 
$value_attrib, $hard_limit) = @_;
   
       my $global_constraints = $self->{global_constraints};
       if (!$global_constraints) {
  @@ -1055,7 +1169,7 @@
           $self->{global_constraints} = $global_constraints;
       }
   
  -    CORE::push(@$global_constraints, [ $counts, $limits, $count_attrib, 
$limit_attrib ]);
  +    CORE::push(@$global_constraints, [ $counts, $limits, $count_attrib, 
$limit_attrib, $value_attrib, $hard_limit ]);
   
       &App::sub_exit() if ($App::trace);
   }
  @@ -1491,16 +1605,6 @@
       &App::sub_exit() if ($App::trace);
   }
   
  -sub _status_counts {
  -    my ($self) = @_;
  -    my $status_counts = $self->{status_counts};
  -    if (!$status_counts) {
  -        $status_counts = {};
  -        $self->{status_counts} = $status_counts;
  -    }
  -    return($status_counts);
  -}
  -
   sub _resource_counts {
       my ($self) = @_;
       my $resource_counts = $self->{resource_counts};
  @@ -1559,7 +1663,7 @@
   
   sub _maintain_queue_buffers {
       &App::sub_entry if ($App::trace);
  -    my ($self, $op, $entry) = @_;
  +    my ($self, $op, $entry, $columns, $values) = @_;
   
       $op ||= "";
       my $BUFFER_SIZE = $self->{BUFFER_SIZE};
  @@ -1582,7 +1686,7 @@
           $resource_counts->{buffer}{$resource_key}--;
       }
       elsif ($op eq "release") {
  -        $self->_release_in_mem($entry);
  +        $self->_release_in_mem($entry, $columns, $values);
       }
       elsif ($op eq "unacquire") {
           $resource_counts->{total}{$resource_key}++;
  @@ -1705,58 +1809,3 @@
   
   1;
   
  -__END__
  -
  -#############################################################################
  -# _check_low_queue_buffers()
  -#############################################################################
  -
  -=head2 _check_low_queue_buffers()
  -
  -    * Signature: $q->_check_low_queue_buffers($entry);
  -    * Param:     $entry             ARRAY/HASH
  -    * Return:    undef
  -    * Throws:    App::Exception::WorkQueue
  -    * Since:     0.01
  -
  -    Sample Usage: 
  -
  -    $context = App->context();
  -    $q = $context->service("WorkQueue");  # or ...
  -    $q = $context->work_queue();
  -    $q->push({ name => "Joe", degrees => 3 });
  -    $q->push({ name => "Mike", degrees => 1 });
  -
  -    $q->_check_low_queue_buffers($entry);
  -
  -Queues may be implemented with remote storage.  In that case, there are local
  -queue buffers which are maintained in memory to increase performance.
  -There is conceptually a queue buffer for each combination of constraint 
values
  -(the constraint key of an entry).
  -
  -The Queue Buffers are checked each time an entry is push()ed.
  -The count for the particular constraint key is incremented.
  -Only if the count is below a configured high-water mark is it necessary
  -to also push the entry onto the queue buffer.
  -
  -=cut
  -
  -sub _check_low_queue_buffers {
  -    &App::sub_entry if ($App::trace);
  -    my ($self, $entry) = @_;
  -
  -    my $BUFFER_SIZE = $self->{BUFFER_SIZE};
  -    my $buffer_low = 0;
  -
  -    my $resource_key = $self->_resource_key($entry);
  -    my $resource_counts = $self->_resource_counts();
  -    $resource_counts->{total}{$resource_key}++;
  -    if ($resource_counts->{buffer}{$resource_key} < $BUFFER_SIZE) {
  -        $resource_counts->{buffer}{$resource_key} ++;
  -        $buffer_low = 1;
  -    }
  -
  -    &App::sub_exit($buffer_low) if ($App::trace);
  -    return($buffer_low);
  -}
  -
  
  
  
  1.8       +283 -97   p5ee/App-WorkQueue/lib/App/WorkQueue/Repository.pm
  
  Index: Repository.pm
  ===================================================================
  RCS file: /cvs/public/p5ee/App-WorkQueue/lib/App/WorkQueue/Repository.pm,v
  retrieving revision 1.7
  retrieving revision 1.8
  diff -u -r1.7 -r1.8
  --- Repository.pm     10 Feb 2006 18:07:41 -0000      1.7
  +++ Repository.pm     1 Mar 2006 03:39:51 -0000       1.8
  @@ -13,6 +13,7 @@
   use App::Repository;
   
   my $verbose = $App::options{verbose};
  +$verbose = 0 if (!$verbose || $verbose < 5);
   
   =head1 NAME
   
  @@ -41,30 +42,128 @@
   
   sub _init {
       &App::sub_entry if ($App::trace);
  +    my ($self,$args) = @_;
  +    $self->_init_attribs($args);
  +    $self->_init_db($args);
  +    &App::sub_exit() if ($App::trace);
  +}
  +
  +sub _init_attribs {
  +    &App::sub_entry if ($App::trace);
       my ($self, $args) = @_;
  +
  +    $self->{type} = $self->{queue_type} || ($self->{columns} ? "ARRAY" : 
"HASH");
  +    my $db = $self->_db();
  +    if (!$self->{columns}) {
  +        $self->{columns} = $db->_get_default_columns($self->{table});
  +    }
       $self->SUPER::_init($args);
  +
       die "STATUS_UNBUFFERED not set on queue" if 
(!$self->{STATUS_UNBUFFERED});
  -    die "queue_id_attrib not set on queue"   if (!$self->{queue_id_attrib});
  -    die "queue_id not set on queue"          if (!$self->{queue_id});
  -    if ($self->{auto_id_attrib}) {
  -        if ($self->{type} eq "ARRAY") {
  -            my $colidx  = $self->_colidx();
  -            my $attrib  = $self->{auto_id_attrib};
  -            my $columns = $self->{columns};
  -            if (defined $colidx->{$attrib}) {
  -                $self->{auto_id_idx} = $colidx->{$attrib};
  -            }
  -            else {
  -                push(@$columns, $attrib);
  -                $colidx->{$attrib} = $#$columns;
  -                $self->{auto_id_idx} = $#$columns;
  -            }
  +    die "client_id_attrib not set on queue"  if (!$self->{client_id_attrib});
  +    die "client_id not set on queue"         if (!$self->{client_id});
  +
  +    my $context = $self->{context};
  +    my $options = $context->{options};
  +    $context->schedule_event(
  +        tag           => "$self->{name}-heartbeat",
  +        service_type  => "WorkQueue",
  +        name          => $self->{name},
  +        method        => "_heartbeat",
  +        time          => time(),
  +        interval      => $options->{sleep_time} || 60,
  +        scheduled     => 1,
  +    );
  +
  +    &App::sub_exit() if ($App::trace);
  +}
  +
  +sub _init_db {
  +    &App::sub_entry if ($App::trace);
  +    my ($self, $args) = @_;
  +    die "STATUS_UNBUFFERED [$self->{STATUS_UNBUFFERED}] != STATUS_UNACQUIRED 
[$self->{STATUS_UNACQUIRED}] on queue"
  +        if ($self->{STATUS_UNBUFFERED} ne $self->{STATUS_UNACQUIRED});
  +    $self->_refresh_queue();
  +    &App::sub_exit() if ($App::trace);
  +}
  +
  +sub _heartbeat {
  +    &App::sub_entry if ($App::trace);
  +    my ($self) = @_;
  +    my $context = $self->{context};
  +    $context->log("[$self->{name}] _heartbeat\n");
  +    $self->_refresh_queue();
  +    &App::sub_exit() if ($App::trace);
  +}
  +
  +sub _refresh_queue {
  +    &App::sub_entry if ($App::trace);
  +    my ($self) = @_;
  +    my $context = $self->{context};
  +    $context->log("[$self->{name}] _refresh_queue\n");
  +    $self->_refresh_status_counts();
  +    $self->_refresh_resource_counts();
  +    $self->_maintain_queue_buffers();
  +    if ($self->{acquisition_event}) {
  +        my ($entry, %event);
  +        while (1) {
  +            $entry = $self->acquire();
  +            last if (!$entry);
  +            %event = %{$self->{acquisition_event}};
  +            $event{args} = [ $entry ];
  +            $context->send_event(\%event);
           }
       }
  +    &App::sub_exit() if ($App::trace);
  +}
  +
  +sub _refresh_status_counts {
  +    &App::sub_entry if ($App::trace);
  +    my ($self) = @_;
  +    my $status_attrib = $self->{status_attrib};
  +    my (%counts, $count_all);
  +    $self->_count_entries_by_attrib_in_db($status_attrib, \%counts, undef,
  +        
"$self->{STATUS_UNBUFFERED},$self->{STATUS_UNACQUIRED},$self->{STATUS_ACQUIRED}");
  +    $count_all = 0;
  +    foreach my $key (keys %counts) {
  +        $count_all += $counts{$key};
  +    }
  +    $counts{ALL} = $count_all;
  +    $self->{status_counts} = \%counts;
  +    &App::sub_exit() if ($App::trace);
  +}
  +
  +sub _refresh_resource_counts {
  +    &App::sub_entry if ($App::trace);
  +    my ($self) = @_;
  +    my (%counts);
  +    # --------------
  +    my $STATUS_UNBUFFERED = $self->{STATUS_UNBUFFERED};
  +    my $STATUS_UNACQUIRED = $self->{STATUS_UNACQUIRED};
  +    my $STATUS_ACQUIRED   = $self->{STATUS_ACQUIRED};
  +    my $status_attrib     = $self->{status_attrib};
       my $db = $self->_db();
  -    if ($self->{queue_id_attrib}) {
  -        $db->set($self->{table}, { $self->{queue_id_attrib} => 
$self->{queue_id} }, $self->{queue_id_attrib}, undef);
  +    my $count_expr = "count(1)";
  +    my $status = "$STATUS_UNBUFFERED,$STATUS_UNACQUIRED";
  +    my $params = { $status_attrib => $status };
  +    if ($self->{multiple_queues} && $self->{client_id_attrib} && 
$self->{client_id}) {
  +        $params->{$self->{client_id_attrib}} = $self->{client_id};
  +    }
  +    if ($self->{params}) {
  +        my $base_params = $self->{params};
  +        foreach my $param (keys %$base_params) {
  +            $params->{$param} = $base_params->{$param};
  +        }
       }
  +    my (@resource_cols, $value, $resource_key);
  +    my $rows = $db->get_rows($self->{table}, $params, [ @resource_cols, 
$count_expr ], { group_by => [ @resource_cols ] });
  +    foreach my $row (@$rows) {
  +        $value = pop(@$row);
  +        $resource_key = join(":", @$row);
  +        $counts{$resource_key} = $value;
  +    }
  +    # --------------
  +    $self->{resource_counts}{total} = \%counts;
       &App::sub_exit() if ($App::trace);
   }
   
  @@ -78,7 +177,7 @@
   primitive operations for this work queue are:
   
    * $q->push($entry)
  - * $entry = $q->acquire(\%params)
  + * $entry = $q->acquire()
    * $q->release($entry)
    * $q->unacquire($entry)
    * @entries = $q->locate(\%params, \%options)
  @@ -88,16 +187,32 @@
   sub push {
       &App::sub_entry if ($App::trace);
       my ($self, $entry) = @_;
  -    #if ($self->{type} eq "ARRAY") {
  -    #    die "wrong number of columns" if ($#{$self->{columns}} != $#$entry);
  -    #}
  -    my $columns = [$self->{status_attrib}, $self->{queue_id_attrib}];
  -    my $values  = [$self->{STATUS_UNBUFFERED}, $self->{queue_id}];
  +    my $columns = [$self->{status_attrib}, $self->{client_id_attrib}];
  +    my $values  = [$self->{STATUS_UNBUFFERED}, undef];
       $self->_update_ref($entry, $columns, $values, 0, 1);
  -    my $alt_entry = $self->_push_in_db($entry);
  -    if ($alt_entry) {
  -        $self->_update_in_db($alt_entry, $columns, $values);
  -        $entry = $alt_entry;
  +    eval {
  +        $self->_push_in_db($entry);
  +    };
  +    if ($@) {
  +        my $db = $self->_db();
  +        if ($@ =~ /duplicate/i) {
  +            my ($params, $alt_entry);
  +            if ($self->{type} eq "ARRAY") {
  +                $params = $self->_array_to_key_params($entry);
  +                $alt_entry = $db->get_row($self->{table}, $params, $columns);
  +            }
  +            else {
  +                $params = $self->_hash_to_key_params($entry);
  +                $alt_entry = $db->get_hash($self->{table}, $params, 
$columns);
  +            }
  +            if ($alt_entry) {
  +                $self->_update_in_db($alt_entry, $columns, $values);
  +                $entry = $alt_entry;
  +            }
  +        }
  +        else {
  +            die $@;
  +        }
       }
       $self->_update_ref($entry, $columns, $values, 1);
       $self->_maintain_queue_buffers("push",$entry);
  @@ -111,26 +226,19 @@
       my $db = $self->_db();
       my $ref = ref($entry);
       my $type = "";
  -    my ($alt_entry);
       if ($ref eq "ARRAY") {
           $type = "ARRAY";
           die "tried to push entry of type [$type] onto queue of type 
[$self->{type}]" if ($type ne $self->{type});
           print "PUSHED[D]: [", join("|",@$entry), "]\n" if ($verbose);
           my $columns = $self->{columns};
           my $auto_id_idx = $self->{auto_id_idx};
  -        eval {
  -            if (defined $auto_id_idx) {
  -                $entry->[$auto_id_idx] = 0 if (!$entry->[$auto_id_idx]);
  -                $db->insert($self->{table}, $columns, $entry);
  -                $entry->[$auto_id_idx] = $db->_last_inserted_id();
  -            }
  -            else {
  -                $db->insert($self->{table}, $columns, $entry);
  -            }
  -        };
  -        if ($@) {
  -            my $params = $self->_array_to_key_params($entry);
  -            $alt_entry = $db->get_row($self->{table}, $params, $columns);
  +        if (defined $auto_id_idx) {
  +            $entry->[$auto_id_idx] = 0 if (!$entry->[$auto_id_idx]);
  +            $db->insert($self->{table}, $columns, $entry);
  +            $entry->[$auto_id_idx] = $db->_last_inserted_id() if 
(!$entry->[$auto_id_idx]);
  +        }
  +        else {
  +            $db->insert($self->{table}, $columns, $entry);
           }
       }
       elsif ($ref) {
  @@ -139,23 +247,16 @@
           print "PUSHED[D]: {", join("|",%$entry), "}\n" if ($verbose);
           my $columns = $self->{columns};
           my $auto_id_attrib = $self->{auto_id_attrib};
  -        eval {
  -            $entry->{$auto_id_attrib} = 0 if (defined $auto_id_attrib && 
!$entry->{$auto_id_attrib});
  -            if ($columns) {
  -                $db->insert($self->{table}, $columns, $entry);
  -            }
  -            else {
  -                $db->insert($self->{table}, $entry);
  -            }
  -            $entry->{$auto_id_attrib} = $db->_last_inserted_id() if (defined 
$auto_id_attrib);
  -        };
  -        if ($@) {
  -            my $params = $self->_hash_to_key_params($entry);
  -            $alt_entry = $db->get_hash($self->{table}, $params, $columns);
  +        $entry->{$auto_id_attrib} = 0 if (defined $auto_id_attrib && 
!$entry->{$auto_id_attrib});
  +        if ($columns) {
  +            $db->insert($self->{table}, $columns, $entry);
  +        }
  +        else {
  +            $db->insert($self->{table}, $entry);
           }
  +        $entry->{$auto_id_attrib} = $db->_last_inserted_id() if (defined 
$auto_id_attrib && !$entry->{$auto_id_attrib});
       }
  -    &App::sub_exit($alt_entry) if ($App::trace);
  -    return($alt_entry);
  +    &App::sub_exit() if ($App::trace);
   }
   
   sub _array_to_key_params {
  @@ -193,8 +294,8 @@
   
   sub acquire {
       &App::sub_entry if ($App::trace);
  -    my ($self, $sort_spec) = @_;
  -    my $entry = $self->_acquire_in_mem($sort_spec);
  +    my ($self) = @_;
  +    my $entry = $self->_acquire_in_mem();
       $self->_maintain_queue_buffers("acquire",$entry) if ($entry);  # load up 
buffer if it's low
       $self->print() if ($self->{verbose});
       &App::sub_exit($entry) if ($App::trace);
  @@ -204,36 +305,11 @@
   sub release {
       &App::sub_entry if ($App::trace);
       my ($self, $entry, $columns, $values) = @_;
  -    $self->_release_in_mem($entry, $columns, $values);
  -    $self->_release_in_db($entry, $columns, $values);
  -    $self->_maintain_queue_buffers("release",$entry);
  +    $self->_maintain_queue_buffers("release",$entry,$columns,$values);
       $self->print() if ($self->{verbose});
       &App::sub_exit() if ($App::trace);
   }
   
  -sub _release_in_db {
  -    &App::sub_entry if ($App::trace);
  -    my ($self, $entry, $columns, $values) = @_;
  -
  -    my $STATUS_ACQUIRED   = $self->{STATUS_ACQUIRED};
  -    my $STATUS_RELEASED   = $self->{STATUS_RELEASED};
  -    my $status_attrib     = $self->{status_attrib};
  -    my $data              = $self->{data};
  -    my ($e, $ent, $entry_key);
  -
  -    if ($self->{type} eq "ARRAY") {
  -        my $colidx = $self->_colidx();
  -        my $status_idx = $colidx->{$status_attrib};
  -        die "status_attribute [$status_attrib] does not exist on elements of 
this queue" if (!defined $status_idx);
  -        $entry_key = $self->_array_to_key($entry);
  -    }
  -    else {
  -        $entry_key = $self->_hash_to_key($entry);
  -    }
  -
  -    &App::sub_exit() if ($App::trace);
  -}
  -
   sub _refill_buffer {
       &App::sub_entry if ($App::trace);
       my ($self, $resource_key) = @_;
  @@ -248,12 +324,18 @@
   
       my $params = $self->_resource_key_to_params($resource_key);
       $params->{$status_attrib} = $STATUS_UNBUFFERED;
  -    if ($self->{queue_id_attrib} && $self->{queue_id}) {
  -        $params->{$self->{queue_id_attrib}} = $self->{queue_id};
  +    if ($self->{multiple_queues} && $self->{client_id_attrib} && 
$self->{client_id}) {
  +        $params->{$self->{client_id_attrib}} = $self->{client_id};
  +    }
  +    if ($self->{params}) {
  +        my $base_params = $self->{params};
  +        foreach my $param (keys %$base_params) {
  +            $params->{$param} = $base_params->{$param};
  +        }
       }
   
       my (%options, $new_entries);
  -    $options{endrow} = $BUFFER_SIZE;
  +    $options{endrow} = 2*$BUFFER_SIZE - 1;
       $self->_sort_spec_to_options(\%options) if ($self->{sort_spec});
   
       my $columns = $self->{columns};
  @@ -298,6 +380,13 @@
       &App::sub_exit() if ($App::trace);
   }
   
  +sub _unacquire_entry {
  +    &App::sub_entry if ($App::trace);
  +    my ($self, $entry) = @_;
  +    $self->update($entry,[$self->{status_attrib}, 
$self->{client_id_attrib}],[$self->{STATUS_UNACQUIRED}, undef]);
  +    &App::sub_exit() if ($App::trace);
  +}
  +
   sub locate {
       &App::sub_entry if ($App::trace);
       my ($self, $params, $options) = @_;
  @@ -317,8 +406,14 @@
       my $STATUS_ACQUIRED   = $self->{STATUS_ACQUIRED};
       my $status_attrib     = $self->{status_attrib};
       $params{$status_attrib} = 
"$STATUS_UNBUFFERED,$STATUS_UNACQUIRED,$STATUS_ACQUIRED";
  -    if ($self->{queue_id_attrib} && $self->{queue_id}) {
  -        $params{$self->{queue_id_attrib}} = $self->{queue_id};
  +    if ($self->{multiple_queues} && $self->{client_id_attrib} && 
$self->{client_id}) {
  +        $params{$self->{client_id_attrib}} = $self->{client_id};
  +    }
  +    if ($self->{params}) {
  +        my $base_params = $self->{params};
  +        foreach my $param (keys %$base_params) {
  +            $params->{$param} = $base_params->{$param};
  +        }
       }
       my ($entries);
       my $db = $self->_db();
  @@ -334,14 +429,86 @@
       return(@$entries);
   }
   
  +sub _acquire_entry {
  +    &App::sub_entry if ($App::trace);
  +    my ($self, $entry) = @_;
  +    my $acquired = 1;
  +    my $columns = [$self->{status_attrib}];
  +    my $values = [$self->{STATUS_ACQUIRED}];
  +    if ($self->_update_is_different($entry, $columns, $values)) {
  +        my $db = $self->_db();
  +        my ($params);
  +        if ($self->{type} eq "ARRAY") {
  +            $params = $self->_array_to_key_params($entry);
  +        }
  +        else {
  +            $params = $self->_hash_to_key_params($entry);
  +        }
  +        if ($self->{client_id_attrib} && $self->{client_id}) {
  +            $params->{$self->{client_id_attrib}} = undef;
  +            CORE::push(@$columns, $self->{client_id_attrib});
  +            CORE::push(@$values,  $self->{client_id});
  +        }
  +        $acquired = $db->update($self->{table}, $params, $columns, $values);
  +        $self->_update_ref($entry, $columns, $values) if ($acquired);
  +    }
  +    else {
  +        $acquired = 0;
  +    }
  +    &App::sub_exit($acquired) if ($App::trace);
  +    return($acquired);
  +}
  +
   sub update {
       &App::sub_entry if ($App::trace);
       my ($self, $entry, $columns, $values) = @_;
  -    $self->_update_ref($entry, $columns, $values);
  -    $self->_update_in_db($entry, $columns, $values);
  +    if ($self->_update_is_different($entry, $columns, $values)) {
  +        $self->_update_ref($entry, $columns, $values);
  +        $self->_update_in_db($entry, $columns, $values);
  +    }
       &App::sub_exit() if ($App::trace);
   }
   
  +sub _update_is_different {
  +    &App::sub_entry if ($App::trace);
  +    my ($self, $entry, $columns, $values) = @_;
  +    my $different = 0;
  +    my ($value);
  +    if ($self->{type} eq "ARRAY") {
  +        my $colidx = $self->_colidx();
  +        for (my $i = 0; $i <= $#$columns; $i++) {
  +            $value = $entry->[$colidx->{$columns->[$i]}];
  +            if (defined $value && defined $values->[$i]) {
  +                if ($value ne $values->[$i]) {
  +                    $different = 1;
  +                    last;
  +                }
  +            }
  +            elsif (defined $value || defined $values->[$i]) {
  +                $different = 1;
  +                last;
  +            }
  +        }
  +    }
  +    else {
  +        for (my $i = 0; $i <= $#$columns; $i++) {
  +            $value = $entry->{$columns->[$i]};
  +            if (defined $value && defined $values->[$i]) {
  +                if ($value ne $values->[$i]) {
  +                    $different = 1;
  +                    last;
  +                }
  +            }
  +            elsif (defined $value || defined $values->[$i]) {
  +                $different = 1;
  +                last;
  +            }
  +        }
  +    }
  +    &App::sub_exit($different) if ($App::trace);
  +    return($different);
  +}
  +
   sub _update_in_db {
       &App::sub_entry if ($App::trace);
       my ($self, $entry, $columns, $values) = @_;
  @@ -374,7 +541,7 @@
   
   sub _count_entries_by_attrib_in_db {
       &App::sub_entry if ($App::trace);
  -    my ($self, $key_attrib, $counts, $count_attrib) = @_;
  +    my ($self, $key_attrib, $counts, $count_attrib, $status) = @_;
       my $STATUS_UNBUFFERED = $self->{STATUS_UNBUFFERED};
       my $STATUS_UNACQUIRED = $self->{STATUS_UNACQUIRED};
       my $STATUS_ACQUIRED   = $self->{STATUS_ACQUIRED};
  @@ -382,9 +549,16 @@
       my $db = $self->_db();
       my $count_expr = "count(1)";
       $count_expr = "sum($count_attrib)" if ($count_attrib);
  -    my $params = { $status_attrib => "$STATUS_UNBUFFERED,$STATUS_UNACQUIRED" 
};
  -    if ($self->{queue_id_attrib} && $self->{queue_id}) {
  -        $params->{$self->{queue_id_attrib}} = $self->{queue_id};
  +    $status = "$STATUS_UNBUFFERED,$STATUS_UNACQUIRED" if (!$status);
  +    my $params = { $status_attrib => $status };
  +    if ($self->{multiple_queues} && $self->{client_id_attrib} && 
$self->{client_id}) {
  +        $params->{$self->{client_id_attrib}} = $self->{client_id};
  +    }
  +    if ($self->{params}) {
  +        my $base_params = $self->{params};
  +        foreach my $param (keys %$base_params) {
  +            $params->{$param} = $base_params->{$param};
  +        }
       }
       my $rows = $db->get_rows($self->{table}, $params, [ $key_attrib, 
$count_expr ], { group_by => [ $key_attrib ] });
       foreach my $row (@$rows) {
  @@ -429,7 +603,19 @@
       $columns = $self->{columns} if (!$columns);
       my (%options);
       $self->_sort_spec_to_options(\%options);
  -    my $rows = $db->get_rows($self->{table}, {}, $columns, \%options);
  +    my $params = {
  +        $self->{status_attrib} => 
"$self->{STATUS_UNBUFFERED},$self->{STATUS_UNACQUIRED},$self->{STATUS_ACQUIRED}",
  +    };
  +    if ($self->{multiple_queues} && $self->{client_id_attrib} && 
$self->{client_id}) {
  +        $params->{$self->{client_id_attrib}} = $self->{client_id};
  +    }
  +    if ($self->{params}) {
  +        my $base_params = $self->{params};
  +        foreach my $param (keys %$base_params) {
  +            $params->{$param} = $base_params->{$param};
  +        }
  +    }
  +    my $rows = $db->get_rows($self->{table}, $params, $columns, \%options);
       foreach my $row (@$rows) {
           if ($format) {
               printf $fh $format, @$row;
  
  
  

Reply via email to