cvsuser     06/02/05 20:12:07

  Added:       App-WorkQueue/lib/App WorkQueue.pm
  Log:
  new
  
  Revision  Changes    Path
  1.1                  p5ee/App-WorkQueue/lib/App/WorkQueue.pm
  
  Index: WorkQueue.pm
  ===================================================================
  
  #############################################################################
  ## $Id: WorkQueue.pm,v 1.1 2006/02/06 04:12:07 spadkins Exp $
  #############################################################################
  
  package App::Context;
  
  sub work_queue {
      my ($self, $service_name, @args) = @_;
      return($self->service("WorkQueue", $service_name, @args));
  }
  
  package App::WorkQueue;
  
  use App;
  use App::Service;
  @ISA = ( "App::Service" );
  
  use strict;
  
  my $verbose = $App::options{verbose};
  
  =head1 NAME
  
  App::WorkQueue - Interface for a work queue
  
  =head1 SYNOPSIS
  
      use App;
  
      $context = App->context();
      $q = $context->service("WorkQueue"); 
  
      ...
  
  =head1 DESCRIPTION
  
  A WorkQueue is for channeling work from a variety of requesting sources to a 
  limited set of workers who work the queue.
  
  It implements a new service in the App-Context framework called a
  WorkQueue.  A normal queue supports push() and pop() operations
  where the entries push()ed onto the queue come out in a first-in-
  first-out (FIFO) basis.  A WorkQueue however is used to control the
  processing of units of work, where the order that entries are
  processed is prioritized subject to various constraints.
  
  Therefore, a WorkQueue supports the following operations.
  
    push()      - add a unit of work to the queue
    acquire()   - get the highest priority entry subject to constraints
    release()   - remove an (acquired) entry from the queue
    unacquire() - return an (acquired) entry to the unacquired state
  
  Internally, the WorkQueue can hold HASHREF's (the default) or ARRAYREF's. 
  However, if ARRAYREF's are to be stored, then a {columns} array and a
  {colidx} hash must also be stored on the object.
  
  =head1 INTERNAL ELEMENTS
  
   $self->{data} = [];
   $self->{columns} = [];
   $self->{colidx} = {};
  
  =cut
  
  #############################################################################
  # CONSTANTS
  #############################################################################
  my $GCONSTR_COUNTS       = 0;
  my $GCONSTR_LIMITS       = 1;
  my $GCONSTR_COUNT_ATTRIB = 2;
  my $GCONSTR_LIMIT_ATTRIB = 3;
  
  my $CONSTR_COUNTS        = 0;
  my $CONSTR_LIMITS        = 1;
  my $CONSTR_KEY_ATTRIB    = 2;
  my $CONSTR_COUNT_ATTRIB  = 3;
  my $CONSTR_KEY_IDX       = 4;
  my $CONSTR_COUNT_IDX     = 5;
  
  #############################################################################
  # CONSTRUCTOR METHODS
  #############################################################################
  
  =head1 Constructor Methods:
  
  #############################################################################
  # Method: _init()
  #############################################################################
  
  =head2 _init()
  
  The _init() method is called from within the standard Service
  constructor.
  It allows a WorkQueue to customize the behavior of the
  constructor. 
  
      * Signature: _init($named)
      * Param:     $named      {}   [in]
      * Return:    void
      * Throws:    App::Exception
      * Since:     0.01
  
      Sample Usage: 
  
      $service->_init(\%args);
  
  =cut
  
  sub _init {
      &App::sub_entry if ($App::trace);
      my ($self, $args) = @_;
  
      $self->SUPER::_init($args);
      $self->{data} = [];
      $self->{type} = ($self->{columns}) ? "ARRAY" : "HASH";
  
      die "status_attrib not set on queue"     if (!$self->{status_attrib});
      #die "STATUS_UNBUFFERED not set on queue" if 
(!$self->{STATUS_UNBUFFERED});
      die "STATUS_UNACQUIRED not set on queue" if (!$self->{STATUS_UNACQUIRED});
      die "STATUS_ACQUIRED not set on queue"   if (!$self->{STATUS_ACQUIRED});
      die "STATUS_RELEASED not set on queue"   if (!$self->{STATUS_RELEASED});
  
      my $id_attrib = $self->{id_attrib} || die "id_attrib not set on queue";
      if ($self->{type} eq "ARRAY") {
          my $colidx = $self->_colidx();
          my $id_attribs = [split(/,/,$id_attrib)];
          my (@id_idx);
          foreach my $attrib (@$id_attribs) {
              push(@id_idx, $colidx->{$attrib});
          }
          $self->{id_attribs} = $id_attribs;
          $self->{id_indexes} = [EMAIL PROTECTED];
      }
      else {
          $self->{id_attribs} = [split(/,/,$id_attrib)];
      }
      if ($self->{sort_spec}) {
          $self->_analyze_sort_spec();
      }
      $self->{verbose} = $App::options{verbose};
      &App::sub_exit() if ($App::trace);
  }
  
  =cut
  
  #############################################################################
  # new()
  #############################################################################
  
  =head2 new()
  
  The constructor is inherited from
  L<C<App::Service>|App::Service/"new()">.
  
  =cut
  
  #############################################################################
  # PUBLIC METHODS
  #############################################################################
  
  =head1 Public Methods:
  
  Unlike a regular queue where the primitive operations are push()/pop(), the
  primitive operations for this work queue are:
  
   * $q->push($entry)
   * $entry = $q->acquire(\%params)
   * $q->release($entry)
   * $q->unacquire($entry)
   * @entries = $q->locate(\%params, \%options)
  
  =cut
  
  #############################################################################
  # push()
  #############################################################################
  
  =head2 push()
  
      * Signature: $q->push($entry);
      * Param:     $entry             HASH/ARRAY
      * Return:    undef
      * Throws:    App::Exception::WorkQueue
      * Since:     0.01
  
      Sample Usage: 
  
      $context = App->context();
      $q = $context->service("WorkQueue");  # or ...
      $q = $context->work_queue();
  
      $q->push({ name => "Joe", degrees => 3 });
      $q->push({ name => "Mike", degrees => 1 });
  
  =cut
  
  sub push {
      &App::sub_entry if ($App::trace);
      my ($self, $entry) = @_;
      # We explicitly _update_ref() rather than update() because the entry is 
not
      # in the remote store yet.
      
$self->_update_ref($entry,[$self->{status_attrib}],[$self->{STATUS_UNACQUIRED}]);
      die "push() not implemented";
      &App::sub_exit() if ($App::trace);
  }
  
  sub _push_in_mem {
      &App::sub_entry if ($App::trace);
      my ($self, $entry, $release_lowest) = @_;
      my $ref = ref($entry);
      my $type = "";
      if ($ref eq "ARRAY") {
          $type = "ARRAY";
          print "PUSHED[M]: [", join("|",@$entry), "]\n" if ($verbose);
      }
      elsif ($ref) {
          $type = "HASH";
          print "PUSHED[M]: {", join("|",%$entry), "}\n" if ($verbose);
      }
      die "tried to push entry of type [$type] onto queue of type 
[$self->{type}]" if ($type ne $self->{type});
      my $entries = $self->{data};
      my ($removed_entry);
      if (!$self->{sort_spec} || $#$entries == -1) {
          if ($release_lowest) {
              # do nothing. this entry is the "lowest", so we shouldn't even 
push it on.
          }
          else {
              CORE::push(@$entries, $entry);
              
$self->update($entry,[$self->{status_attrib}],[$self->{STATUS_UNACQUIRED}]);
          }
      }
      else {
          my $inserted = 0;
          if ($release_lowest) {
              my $removed = 0;
              my ($ent);
              my $resource_key = $self->_resource_key($entry);
              for (my $i = $#$entries; $i >= 0; $i--) {
                  $ent = $entries->[$i];
                  if ($self->_resource_key($ent) eq $resource_key) {
                      # IF not inserted yet AND new entry is lower or same 
priority THEN
                      if (!$inserted && $self->_compare_entries($entry, $ent) > 
-1) {
                          if (!$removed) {   # "insert and remove self"
                              $inserted = 1;
                              $removed  = 1;
                          }
                          else {
                              
$self->update($entry,[$self->{status_attrib}],[$self->{STATUS_UNACQUIRED}]);
                              splice(@$entries, $i+1, 0, $entry);
                              $inserted = 1;
                          }
                      }
                      if (!$removed) {
                          $removed_entry = $entries->[$i];
                          
$self->update($removed_entry,[$self->{status_attrib}],[$self->{STATUS_UNBUFFERED}]);
                          splice(@$entries, $i, 1);
                          $removed = 1;
                      }
                  }
                  else {
                      if (!$inserted && $self->_compare_entries($entry, $ent) > 
-1) {
                          
$self->update($entry,[$self->{status_attrib}],[$self->{STATUS_UNACQUIRED}]);
                          splice(@$entries, $i+1, 0, $entry);
                          $inserted = 1;
                      }
                  }
                  last if ($inserted && $removed);
              }
          }
          else {
              for (my $i = $#$entries; $i >= 0; $i--) {
                  if ($self->_compare_entries($entry, $entries->[$i]) > -1) {
                      
$self->update($entry,[$self->{status_attrib}],[$self->{STATUS_UNACQUIRED}]);
                      splice(@$entries, $i+1, 0, $entry);
                      $inserted = 1;
                      last;
                  }
              }
          }
          if (!$inserted) {
              
$self->update($entry,[$self->{status_attrib}],[$self->{STATUS_UNACQUIRED}]);
              unshift(@$entries, $entry);
          }
      }
      &App::sub_exit() if ($App::trace);
  }
  
  sub _analyze_sort_spec {
      my ($self) = @_;
      my $sort_spec = $self->{sort_spec};
      if ($sort_spec) {
          my $sort_columns = [];
          my $sort_idx     = [];
          my $direction    = [];
          my $numeric      = [];
          my $colidx       = $self->_colidx();
          my @sort_spec    = split(/,/, $sort_spec);
          foreach my $col (@sort_spec) {
              if ($col =~ /^([NC]?)([+-]?)(.*)$/) {
                  CORE::push(@$direction,    (($2 eq "-") ? -1 : 1));
                  CORE::push(@$numeric,      (($1 eq "N") ? 1 : 0));
                  CORE::push(@$sort_columns, $3);
                  if ($self->{type} eq "ARRAY") {
                      CORE::push(@$sort_idx, $colidx->{$3});
                  }
              }
              else {
                  CORE::push(@$sort_columns, $col);
                  CORE::push(@$direction,    1);
                  CORE::push(@$numeric,      0);
              }
          }
          $self->{sort_columns} = $sort_columns;
          $self->{sort_idx}     = $sort_idx;
          $self->{direction}    = $direction;
          $self->{numeric}      = $numeric;
      }
      else {
          $self->{sort_columns} = [];
          $self->{sort_idx}     = [];
          $self->{direction}    = [];
          $self->{numeric}      = [];
      }
  }
  
  sub _compare_entries {
      my ($self, $a, $b) = @_;
      my $sign = 0;
      my $columns   = $self->{sort_columns};
      my $sort_idx  = $self->{sort_idx};
      my $direction = $self->{direction};
      my $numeric   = $self->{numeric};
      if (ref($a) eq "ARRAY") {
          for (my $c = 0; $c <= $#$sort_idx; $c++) {
              if ($numeric && $numeric->[$c]) {
                  $sign = ($a->[$sort_idx->[$c]] <=> $b->[$sort_idx->[$c]]) * 
$direction->[$c];
                  #print "N: $sign = ($a->[$sort_idx->[$c]] <=> 
$b->[$sort_idx->[$c]]) * $direction->[$c];\n";
              }
              else {
                  $sign = ($a->[$sort_idx->[$c]] cmp $b->[$sort_idx->[$c]]) * 
$direction->[$c];
                  #print "C: $sign = ($a->[$sort_idx->[$c]] <=> 
$b->[$sort_idx->[$c]]) * $direction->[$c];\n";
              }
              last if ($sign);
          }
          #print "A: [", join("|",@$a), "]\n";
          #print "B: [", join("|",@$b), "]\n";
          #print " => $sign\n";
      }
      else {
          for (my $c = 0; $c <= $#$columns; $c++) {
              if ($numeric && $numeric->[$c]) {
                  $sign = ($a->{$columns->[$c]} <=> $b->{$columns->[$c]}) * 
$direction->[$c];
              }
              else {
                  $sign = ($a->{$columns->[$c]} cmp $b->{$columns->[$c]}) * 
$direction->[$c];
              }
              last if ($sign);
          }
      }
      return($sign);
  }
  
  #############################################################################
  # acquire()
  #############################################################################
  
  =head2 acquire()
  
      * Signature: $entry = $q->acquire();
      * Signature: $entry = $q->acquire($sort_spec);
      * Param:     $sort_spec         string
      * Return:    $entry             HASH/ARRAY
      * Throws:    App::Exception::WorkQueue
      * Since:     0.01
  
      Sample Usage: 
  
      my $entry = $q->acquire();
  
  =cut
  
  sub acquire {
      &App::sub_entry if ($App::trace);
      my ($self, $sort_spec) = @_;
      my $entry = undef;
      die "acquire() not implemented";
      &App::sub_exit($entry) if ($App::trace);
      return($entry);
  }
  
  sub _acquire_in_mem {
      &App::sub_entry if ($App::trace);
      my ($self, $sort_spec) = @_;
  
      my ($entry);
      my $STATUS_UNACQUIRED = $self->{STATUS_UNACQUIRED};
      my $STATUS_ACQUIRED   = $self->{STATUS_ACQUIRED};
      my $status_attrib     = $self->{status_attrib};
      my $entries           = $self->{data};
  
      if ($self->_global_resources_exist()) {
          if ($self->{type} eq "ARRAY") {
              my $colidx = $self->_colidx();
              my $status_idx = $colidx->{$status_attrib};
              die "status_attribute [$status_attrib] does not exist on elements 
of this queue" if (!defined $status_idx);
              foreach my $e (@$entries) {
                  # print ">>> ENTRY: status=[$e->[$status_idx]] 
idx=[$status_idx] (should be $STATUS_UNACQUIRED)\n";
                  next if ($e->[$status_idx] ne $STATUS_UNACQUIRED);
                  if ($self->_acquire_resources($e)) {
                      $entry = $e;
                      
$self->update($entry,[$self->{status_attrib}],[$self->{STATUS_ACQUIRED}]);
                      print "ACQUIRED[M]: [", join("|",@$e), "]\n" if 
($verbose);
                      last;
                  }
              }
          }
          else {
              foreach my $e (@$entries) {
                  next if ($e->{$status_attrib} ne $STATUS_UNACQUIRED);
                  if ($self->_acquire_resources($e)) {
                      $entry = $e;
                      
$self->update($entry,[$self->{status_attrib}],[$self->{STATUS_ACQUIRED}]);
                      print "ACQUIRED[M]: {", join("|",%$e), "}\n" if 
($verbose);
                      last;
                  }
              }
          }
      }
  
      print "ACQUIRED[M]: undef\n" if (!$entry && $verbose);
      &App::sub_exit($entry) if ($App::trace);
      return($entry);
  }
  
  #############################################################################
  # release()
  #############################################################################
  
  =head2 release()
  
      * Signature: $q->release($entry);
      * Signature: $q->release($entry, $columns, $values);
      * Param:     $entry             HASH/ARRAY
      * Return:    undef
      * Throws:    App::Exception::WorkQueue
      * Since:     0.01
  
      Sample Usage: 
  
      $context = App->context();
      $q = $context->service("WorkQueue");  # or ...
      $q = $context->work_queue();
      $q->release({ name => "Joe", degrees => 3 });
      $q->release({ name => "Mike", degrees => 1 });
  
  =cut
  
  sub release {
      &App::sub_entry if ($App::trace);
      my ($self, $entry, $columns, $values) = @_;
      die "release() not implemented";
      &App::sub_exit() if ($App::trace);
  }
  
  sub _release_in_mem {
      &App::sub_entry if ($App::trace);
      my ($self, $entry, $columns, $values) = @_;
  
      my $STATUS_ACQUIRED   = $self->{STATUS_ACQUIRED};
      my $STATUS_RELEASED   = $self->{STATUS_RELEASED};
      my $status_attrib     = $self->{status_attrib};
      my $data              = $self->{data};
      my ($e, $ent, $entry_key);
  
      my @columns = ( $self->{status_attrib} );
      my @values  = ( $self->{STATUS_RELEASED} );
      if ($columns) {
          CORE::push(@columns, @$columns);
          CORE::push(@values,  @$values);
      }
  
      if ($self->{type} eq "ARRAY") {
          my $colidx = $self->_colidx();
          my $status_idx = $colidx->{$status_attrib};
          die "status_attribute [$status_attrib] does not exist on elements of 
this queue" if (!defined $status_idx);
          $entry_key = $self->_array_to_key($entry);
          for ($e = 0; $e <= $#$data; $e++) {
              $ent = $data->[$e];
              if ($self->_array_to_key($ent) eq $entry_key) {
                  if ($ent->[$status_idx] eq $STATUS_ACQUIRED) {
                      $self->_release_resources($ent);
                  }
                  $self->update($ent,[EMAIL PROTECTED],[EMAIL PROTECTED]);
                  splice(@$data, $e, 1);
                  print "RELEASED[M]: [", join("|",@$entry), "]\n" if 
($verbose);
                  last;
              }
          }
      }
      else {
          $entry_key = $self->_hash_to_key($entry);
          for ($e = 0; $e <= $#$data; $e++) {
              $ent = $data->[$e];
              if ($self->_hash_to_key($ent) eq $entry_key) {
                  if ($ent->{$status_attrib} eq $STATUS_ACQUIRED) {
                      $self->_release_resources($ent);
                  }
                  $self->update($ent,[EMAIL PROTECTED],[EMAIL PROTECTED]);
                  splice(@$data, $e, 1);
                  print "RELEASED[M]: {", join("|",%$entry), "}\n" if 
($verbose);
                  last;
              }
          }
      }
  
      &App::sub_exit() if ($App::trace);
  }
  
  sub _array_to_key {
      my ($self, $entry) = @_;
      my $id_indexes = $self->{id_indexes};
      my ($key);
      if ($#$id_indexes == 0) {
          $key = $entry->[$id_indexes->[0]];
      }
      else {
          $key = join(":", @[EMAIL PROTECTED]);
      }
      return($key);
  }
  
  sub _hash_to_key {
      my ($self, $entry) = @_;
      my $id_attribs = $self->{id_attribs};
      my ($key);
      if ($#$id_attribs == 0) {
          $key = $entry->{$id_attribs->[0]};
      }
      else {
          $key = join(":", @[EMAIL PROTECTED]);
      }
      return($key);
  }
  
  #############################################################################
  # unacquire()
  #############################################################################
  
  =head2 unacquire()
  
      * Signature: $q->unacquire($entry);
      * Param:     $entry             HASH/ARRAY
      * Return:    undef
      * Throws:    App::Exception::WorkQueue
      * Since:     0.01
  
      Sample Usage: 
  
      $context = App->context();
      $q = $context->service("WorkQueue");  # or ...
      $q = $context->work_queue();
      $q->unacquire({ name => "Joe", degrees => 3 });
      $q->unacquire({ name => "Mike", degrees => 1 });
  
  =cut
  
  sub unacquire {
      &App::sub_entry if ($App::trace);
      my ($self, $entry) = @_;
      die "unacquire() not implemented";
      &App::sub_exit() if ($App::trace);
  }
  
  sub _unacquire_in_mem {
      &App::sub_entry if ($App::trace);
      my ($self, $entry) = @_;
  
      my $STATUS_ACQUIRED   = $self->{STATUS_ACQUIRED};
      my $STATUS_UNACQUIRED = $self->{STATUS_UNACQUIRED};
      my $status_attrib     = $self->{status_attrib};
      my $id_attrib         = $self->{id_attrib};
      my $data              = $self->{data};
      my ($e, $ent, $entry_key);
  
      if ($self->{type} eq "ARRAY") {
          my $colidx = $self->_colidx();
          my $status_idx = $colidx->{$status_attrib};
          die "status_attribute [$status_attrib] does not exist on elements of 
this queue" if (!defined $status_idx);
          $entry_key = $self->_array_to_key($entry);
          for ($e = 0; $e <= $#$data; $e++) {
              $ent = $data->[$e];
              if ($self->_array_to_key($ent) eq $entry_key) {
                  if ($ent->[$status_idx] eq $STATUS_ACQUIRED) {
                      $self->_release_resources($ent);
                  }
                  
$self->update($ent,[$self->{status_attrib}],[$self->{STATUS_UNACQUIRED}]);
                  print "UNACQUIRED[M]: [", join("|",@$ent), "]\n" if 
($verbose);
                  last;
              }
          }
      }
      else {
          $entry_key = $self->_hash_to_key($entry);
          for ($e = 0; $e <= $#$data; $e++) {
              $ent = $data->[$e];
              if ($self->_hash_to_key($ent) eq $entry_key) {
                  if ($ent->{$status_attrib} eq $STATUS_ACQUIRED) {
                      $self->_release_resources($ent);
                  }
                  
$self->update($ent,[$self->{status_attrib}],[$self->{STATUS_UNACQUIRED}]);
                  print "UNACQUIRED[M]: {", join("|",%$ent), "}\n" if 
($verbose);
                  last;
              }
          }
      }
  
      &App::sub_exit() if ($App::trace);
  }
  
  #############################################################################
  # locate()
  #############################################################################
  
  =head2 locate()
  
      * Signature: @entries = $q->locate();
      * Signature: @entries = $q->locate($params);
      * Signature: @entries = $q->locate($params, $options);
      * Param:     $params         HASH
      * Param:     $options        HASH
      * Return:    @entries
      * Throws:    App::Exception::WorkQueue
      * Since:     0.01
  
      Sample Usage: 
  
      my @entries = $q->locate();
  
  =cut
  
  sub locate {
      &App::sub_entry if ($App::trace);
      my ($self, $params, $options) = @_;
      my @entries = ();
      die "locate() not implemented";
      &App::sub_exit(@entries) if ($App::trace);
      return(@entries);
  }
  
  sub _locate_in_mem {
      &App::sub_entry if ($App::trace);
      my ($self, $params, $options) = @_;
  
      my (@entries, @param_name, @param_value, $p, $matches);
      if ($params && %$params) {
          @param_name = (keys %$params);
          for ($p = 0; $p <= $#param_name; $p++) {
              $param_value[$p] = $params->{$param_name[$p]};
          }
      }
  
      my $entries = $self->{data};
  
      if ($#param_name == -1) {
          @entries = @$entries;
      }
      else {
          if ($self->{type} eq "ARRAY") {
              my $colidx = $self->_colidx();
              my (@param_idx);
              for ($p = 0; $p <= $#param_name; $p++) {
                  $param_idx[$p] = $colidx->{$param_name[$p]};
              }
              foreach my $e (@$entries) {
                  $matches = 1;
                  for ($p = 0; $p <= $#param_name; $p++) {
                      if ($e->[$param_idx[$p]] ne $param_value[$p]) {
                          $matches = 0;
                          last;
                      }
                  }
                  CORE::push(@entries, $e) if ($matches);
              }
          }
          else {
              foreach my $e (@$entries) {
                  $matches = 1;
                  for ($p = 0; $p <= $#param_name; $p++) {
                      if ($e->{$param_name[$p]} ne $param_value[$p]) {
                          $matches = 0;
                          last;
                      }
                  }
                  CORE::push(@entries, $e) if ($matches);
              }
          }
      }
  
      &App::sub_exit(@entries) if ($App::trace);
      return(@entries);
  }
  
  #############################################################################
  # update()
  #############################################################################
  
  =head2 update()
  
      * Signature: $q->update($entry, $columns, $values);
      * Param:     $entry          HASH/ARRAY
      * Param:     $columns        ARRAY
      * Param:     $values         ARRAY
      * Return:    @entries
      * Throws:    App::Exception::WorkQueue
      * Since:     0.01
  
      Sample Usage: 
  
      my @entries = $q->update();
  
  =cut
  
  sub update {
      &App::sub_entry if ($App::trace);
      my ($self, $entry, $columns, $values) = @_;
      die "update() not implemented";
      &App::sub_exit() if ($App::trace);
  }
  
  sub _update_ref {
      &App::sub_entry if ($App::trace);
      my ($self, $entry, $columns, $values, $one_way, $raw) = @_;
  
      my $status_attrib = $self->{status_attrib};
      if ($self->{type} eq "ARRAY") {
          my $colidx = $self->_colidx();
          my $status_idx = $colidx->{$status_attrib};
          for (my $c = 0; $c <= $#$columns; $c++) {
              if (!$raw && $columns->[$c] eq $status_attrib) {
                  $self->_maintain_status_counts(1, $values->[$c], $one_way ? 
undef : $entry->[$status_idx]);
              }
              $entry->[$colidx->{$columns->[$c]}] = $values->[$c];
          }
      }
      else {
          for (my $c = 0; $c <= $#$columns; $c++) {
              if (!$raw && $columns->[$c] eq $status_attrib) {
                  $self->_maintain_status_counts(1, $values->[$c], $one_way ? 
undef : $entry->{$status_attrib});
              }
              $entry->{$columns->[$c]} = $values->[$c];
          }
      }
  
      &App::sub_exit() if ($App::trace);
  }
  
  sub _maintain_status_counts {
      &App::sub_entry if ($App::trace);
      my ($self, $count, $status, $from_status) = @_;
      if (ref($status)) {
          my $entry = $status;
          my $status_attrib = $self->{status_attrib};
          if ($self->{type} eq "ARRAY") {
              my $colidx = $self->_colidx();
              $status = $entry->[$colidx->{$status_attrib}];
          }
          else {
              $status = $entry->{$status_attrib};
          }
      }
      my $status_counts = $self->_status_counts();
      $status_counts->{$status} += $count;
      if ($from_status) {
          $status_counts->{$from_status} -= $count;
      }
      else {
          $status_counts->{ALL} += $count;
      }
      &App::sub_exit() if ($App::trace);
  }
  
  #############################################################################
  # print()
  #############################################################################
  
  =head2 print()
  
      * Signature: $q->print($fh);
      * Param:     $fh                FILEHANDLE
      * Return:    void
      * Throws:    App::Exception::WorkQueue
      * Since:     0.01
  
      Sample Usage: 
  
      $q->print(\*STDOUT);
  
  =cut
  
  sub print {
      &App::sub_entry if ($App::trace);
      my ($self, $fh) = @_;
      $fh = \*STDOUT if (!$fh);
      print $fh "ENTRIES:\n";
      $self->print_entries($fh);
      print $fh "CONSTRAINTS:\n";
      $self->print_constraints($fh);
      &App::sub_exit() if ($App::trace);
  }
  
  #############################################################################
  # print_constraints()
  #############################################################################
  
  =head2 print_constraints()
  
      * Signature: $q->print_constraints();
      * Signature: $q->print_constraints($fh);
      * Param:     $fh                FILEHANDLE
      * Return:    void
      * Throws:    App::Exception::WorkQueue
      * Since:     0.01
  
      Sample Usage: 
  
      $q->print_constraints();
      $q->print_constraints(\*STDOUT);
  
  =cut
  
  sub print_constraints {
      &App::sub_entry if ($App::trace);
      my ($self, $fh) = @_;
      $fh = \*STDOUT if (!$fh);
  
      foreach my $c (@{$self->{global_constraints}}) {
          printf $fh "     GLOBAL CONSTRAINT: %4d/%4d\n", $c->[0]{$c->[2]}, 
$c->[1]{$c->[3]};
      }
      foreach my $c (@{$self->{constraints}}) {
          my (%seen);
          printf $fh "   -----------------------------\n";
          foreach my $key (sort keys %{$c->[1]}) {
              $seen{$key} = 1;
              printf $fh "   %8s CONSTRAINT: %4d/%4d [%s]\n", $key, 
$c->[0]{$key}, $c->[1]{$key}, $c->[2];
          }
          foreach my $key (sort keys %{$c->[0]}) {
              if (!$seen{$key}) {
                  $seen{$key} = 1;
                  printf $fh "   %8s CONSTRAINT: %4d/%4d [%s]\n", $key, 
$c->[0]{$key}, $c->[1]{$key}, $c->[2];
              }
          }
      }
  
      &App::sub_exit() if ($App::trace);
  }
  
  #############################################################################
  # print_entries()
  #############################################################################
  
  =head2 print_entries()
  
      * Signature: $q->print_entries($fh, $format, $columns);
      * Param:     $fh                FILEHANDLE
      * Param:     $format            string
      * Param:     $columns              ARRAY
      * Return:    void
      * Throws:    App::Exception::WorkQueue
      * Since:     0.01
  
      Sample Usage: 
  
      $q->print_entries(\*STDOUT, "%s %s %s", ["ip","hostname","path"]);
  
  =cut
  
  sub print_entries {
      &App::sub_entry if ($App::trace);
      my ($self, $fh, $format, $columns) = @_;
      die "print_entries() not implemented";
      &App::sub_exit() if ($App::trace);
  }
  
  sub _print_entries_in_mem {
      &App::sub_entry if ($App::trace);
      my ($self, $fh, $format, $columns) = @_;
      $fh = \*STDOUT if (!$fh);
  
      my ($entry);
      my $entries = $self->{data};
  
      if ($self->{type} eq "ARRAY") {
          my $colidx = $self->_colidx();
          my (@idx, $i);
          for ($i = 0; $i <= $#$columns; $i++) {
              $idx[$i] = $colidx->{$columns->[$i]};
          }
          foreach my $entry (@$entries) {
              if ($format) {
                  printf $fh $format, @[EMAIL PROTECTED];
              }
              else {
                  print $fh "   [", join("|",@$entry), "]\n";
              }
          }
      }
      else {
          foreach my $entry (@$entries) {
              if ($format) {
                  printf $fh $format, @[EMAIL PROTECTED];
              }
              else {
                  print $fh "   {", join("|",%$entry), "}\n";
              }
          }
      }
  
      &App::sub_exit() if ($App::trace);
  }
  
  sub print_status_counts {
      &App::sub_entry if ($App::trace);
      my ($self, $fh) = @_;
      $fh = \*STDOUT if (!$fh);
      my $status_counts = $self->_status_counts();
      foreach my $status (sort keys %$status_counts) {
          printf $fh "   %7s => %8d\n", $status, $status_counts->{$status};
      }
      &App::sub_exit() if ($App::trace);
  }
  
  sub print_resource_counts {
      &App::sub_entry if ($App::trace);
      my ($self, $fh) = @_;
      $fh = \*STDOUT if (!$fh);
      my $resource_counts = $self->_resource_counts();
      foreach my $resource_key (sort keys %{$resource_counts->{total}}) {
          printf $fh "   %7s => %8d=tot %8d=buf\n", $resource_key,
              $resource_counts->{total}{$resource_key},
              $resource_counts->{buffer}{$resource_key};
      }
      &App::sub_exit() if ($App::trace);
  }
  
  #############################################################################
  # num_entries()
  #############################################################################
  
  =head2 num_entries()
  
      * Signature: $q->num_entries();
      * Signature: $q->num_entries($status);
      * Param:     $status         string
      * Return:    void
      * Throws:    App::Exception::WorkQueue
      * Since:     0.01
  
      Sample Usage: 
  
      $num = $q->num_entries();
  
  =cut
  
  sub num_entries {
      &App::sub_entry if ($App::trace);
      my ($self, $status) = @_;
      my $num = 0;
      die "num_entries() not implemented";
      &App::sub_exit($num) if ($App::trace);
      return($num);
  }
  
  sub _num_entries_in_mem {
      &App::sub_entry if ($App::trace);
      my ($self, $status) = @_;
      my $entries = $self->{data};
      my $num = $#$entries + 1;
      if ($status) {
          $num = 0;
          my $status_attrib     = $self->{status_attrib};
          if ($self->{type} eq "ARRAY") {
              my $colidx = $self->_colidx();
              my $status_idx = $colidx->{$status_attrib};
              foreach my $entry (@$entries) {
                  $num ++ if ($entry->[$status_idx] eq $status);
              }
          }
          else {
              foreach my $entry (@$entries) {
                  $num ++ if ($entry->{$status_attrib} eq $status);
              }
          }
      }
      &App::sub_exit($num) if ($App::trace);
      return($num);
  }
  
  sub _num_entries_from_status_counts {
      &App::sub_entry if ($App::trace);
      my ($self, $status) = @_;
      my $status_counts = $self->_status_counts();
      my ($num);
      if ($status) {
          $num = $status_counts->{$status};
      }
      else {
          $num = $status_counts->{N} + $status_counts->{W};
      }
      $num = 0 if (!defined $num);
      &App::sub_exit($num) if ($App::trace);
      return($num);
  }
  
  #############################################################################
  # set_global_constraint()
  #############################################################################
  
  =head2 set_global_constraint()
  
      * Signature: $q->set_global_constraint($counts, $limits, $count_attrib, 
$limit_attrib);
      * Param:     $counts            HASH
      * Param:     $limits            HASH
      * Param:     $count_attrib      string
      * Param:     $limit_attrib      string
      * Return:    undef
      * Throws:    App::Exception::WorkQueue
      * Since:     0.01
  
      Sample Usage: 
  
      $context = App->context();
      $q = $context->service("WorkQueue");  # or ...
      $q = $context->work_queue();
      $q->push({ name => "Joe", degrees => 3 });
      $q->push({ name => "Mike", degrees => 1 });
      my %state = (
          count => 0,
          limit => 20,
      );
      $q->set_global_constraint(\%state, \%state, "count", "limit");
  
  =cut
  
  sub set_global_constraint {
      &App::sub_entry if ($App::trace);
      my ($self, $counts, $limits, $count_attrib, $limit_attrib) = @_;
  
      my $global_constraints = $self->{global_constraints};
      if (!$global_constraints) {
          $global_constraints = [];
          $self->{global_constraints} = $global_constraints;
      }
  
      CORE::push(@$global_constraints, [ $counts, $limits, $count_attrib, 
$limit_attrib ]);
  
      &App::sub_exit() if ($App::trace);
  }
  
  #############################################################################
  # set_constraint()
  #############################################################################
  
  =head2 set_constraint()
  
      * Signature: $q->set_constraint($counts, $limits, $key_attrib, 
$count_attrib);
      * Signature: $q->set_constraint($counts, $limits, $key_attrib);
      * Param:     $key_attrib        string
      * Param:     $count_attrib      string
      * Param:     $counts            HASH
      * Param:     $limits            HASH
      * Return:    undef
      * Throws:    App::Exception::WorkQueue
      * Since:     0.01
  
      Sample Usage: 
  
      $context = App->context();
      $q = $context->service("WorkQueue");  # or ...
      $q = $context->work_queue();
      $q->push({ name => "Joe", degrees => 3 });
      $q->push({ name => "Mike", degrees => 1 });
      my (%counts);
      my %limits = (
      );
      $q->set_constraint(\%counts, \%limits, "degrees");
  
  =cut
  
  sub set_constraint {
      &App::sub_entry if ($App::trace);
      my ($self, $counts, $limits, $key_attrib, $count_attrib) = @_;
  
      my $constraints = $self->{constraints};
      if (!$constraints) {
          $constraints = [];
          $self->{constraints} = $constraints;
      }
       
      if ($self->{type} eq "ARRAY") {
          my $colidx = $self->_colidx();
          my $key_idx = $colidx->{$key_attrib};
          die "key_attribute [$key_attrib] does not exist on elements of this 
queue" if (!defined $key_idx);
          my ($count_idx);
          if ($count_attrib) {
              $count_idx = $colidx->{$key_attrib};
              die "count_attribute [$count_attrib] does not exist on elements 
of this queue" if (!defined $count_idx);
          }
          CORE::push(@$constraints, [ $counts, $limits, $key_attrib, 
$count_attrib, $key_idx, $count_idx ]);
      }
      else {
          CORE::push(@$constraints, [ $counts, $limits, $key_attrib, 
$count_attrib ]);
      }
  
      &App::sub_exit() if ($App::trace);
  }
  
  #############################################################################
  # clear_constraints()
  #############################################################################
  
  =head2 clear_constraints()
  
      * Signature: $q->clear_constraints();
      * Param:     void
      * Return:    void
      * Throws:    App::Exception::WorkQueue
      * Since:     0.01
  
      Sample Usage: 
  
      $q->clear_constraints();
  
  =cut
  
  sub clear_constraints {
      &App::sub_entry if ($App::trace);
      my ($self) = @_;
  
      delete $self->{constraints};
  
      &App::sub_exit() if ($App::trace);
  }
  
  #############################################################################
  # count_entries_by_attrib()
  #############################################################################
  
  =head2 count_entries_by_attrib()
  
      * Signature: $q->count_entries_by_attrib($attrib, $counts);
      * Param:     $attrib            string
      * Param:     $counts            HASH
      * Return:    undef
      * Throws:    App::Exception::WorkQueue
      * Since:     0.01
  
      Sample Usage: 
  
      $context = App->context();
      $q = $context->service("WorkQueue");  # or ...
      $q = $context->work_queue();
      $q->push({ name => "Joe", degrees => 3 });
      $q->push({ name => "Mike", degrees => 1 });
      my (%counts);
      $q->count_entries_by_attrib("degrees", \%counts);
  
  =cut
  
  sub count_entries_by_attrib {
      &App::sub_entry if ($App::trace);
      my ($self, $key_attrib, $counts, $count_attrib) = @_;
      die "count_entries_by_attrib() not implemented";
      &App::sub_exit() if ($App::trace);
  }
  
  sub _count_entries_by_attrib_in_mem {
      &App::sub_entry if ($App::trace);
      my ($self, $key_attrib, $counts, $count_attrib) = @_;
      if ($self->{type} eq "ARRAY") {
          my $colidx = $self->_colidx();
          die "key_attribute [$key_attrib] does not exist on elements of this 
queue" if (!exists $colidx->{$key_attrib});
          my $i = $colidx->{$key_attrib};
          if ($count_attrib) {
              my $count_i = $colidx->{$count_attrib};
              die "count_attribute [$count_attrib] does not exist on elements 
of this queue" if (! defined $count_i);
              foreach my $entry (@{$self->{data}}) {
                  $counts->{$entry->[$i]} += $entry->[$count_i];
              }
          }
          else {
              foreach my $entry (@{$self->{data}}) {
                  $counts->{$entry->[$i]} ++;
              }
          }
      }
      else {
          if ($count_attrib) {
              foreach my $entry (@{$self->{data}}) {
                  $counts->{$entry->{$key_attrib}} += $entry->{$count_attrib};
              }
          }
          else {
              foreach my $entry (@{$self->{data}}) {
                  $counts->{$entry->{$key_attrib}} ++;
              }
          }
      }
      &App::sub_exit() if ($App::trace);
  }
  
  #############################################################################
  # PROTECTED METHODS
  #############################################################################
  
  =head1 Protected Methods:
  
  =cut
  
  #############################################################################
  # Method: _colidx()
  #############################################################################
  
  =head2 _colidx()
  
  Returns the column-to-index hashref.
  
      * Signature: $colidx = $q->_colidx();
      * Param:     void
      * Return:    $colidx      HASH
      * Since:     0.01
  
      $colidx = $q->_colidx();
      $idx = $colidx->{$column};   # get the column index for a named $column
  
  =cut
  
  sub _colidx {
      &App::sub_entry if ($App::trace);
      my ($self) = @_;
      my $colidx = $self->{colidx};
      if (!$colidx) {
          my $columns = $self->{columns} || die "columns of the queue elements 
must be supplied in order to know the column indexes";
          $colidx = {};
          for (my $i = 0; $i <= $#$columns; $i++) {
              $colidx->{$columns->[$i]} = $i;
          }
          $self->{colidx} = $colidx;
      }
      &App::sub_exit($colidx) if ($App::trace);
      return($colidx);
  }
  
  #############################################################################
  # _global_resources_exist()
  #############################################################################
  
  =head2 _global_resources_exist()
  
      * Signature: $q->_global_resources_exist();
      * Param:     $constraints       ARRAY
      * Return:    undef
      * Throws:    App::Exception::WorkQueue
      * Since:     0.01
  
      Sample Usage: 
  
      $context = App->context();
      $q = $context->service("WorkQueue");  # or ...
      $q = $context->work_queue();
  
      $q->_global_resources_exist();
  
  =cut
  
  sub _global_resources_exist {
      &App::sub_entry if ($App::trace);
      my ($self) = @_;
      my $complies = 1;
      my $global_constraints = $self->{global_constraints};
      if (!$global_constraints || $#$global_constraints == -1) {
          # do nothing
      }
      else {
          foreach my $c (@$global_constraints) {
              if ($c->[$GCONSTR_COUNTS]{$c->[$GCONSTR_COUNT_ATTRIB]} >= 
$c->[$GCONSTR_LIMITS]{$c->[$GCONSTR_LIMIT_ATTRIB]}) {
                  $complies = 0;
                  last;
              }
          }
      }
      &App::sub_exit($complies) if ($App::trace);
      return($complies);
  }
  
  #############################################################################
  # _acquire_resources()
  #############################################################################
  
  =head2 _acquire_resources()
  
      * Signature: $q->_acquire_resources($entry, $constraints);
      * Param:     $entry             ARRAY/HASH
      * Param:     $constraints       ARRAY
      * Return:    undef
      * Throws:    App::Exception::WorkQueue
      * Since:     0.01
  
      Sample Usage: 
  
      $context = App->context();
      $q = $context->service("WorkQueue");  # or ...
      $q = $context->work_queue();
      $q->push({ name => "Joe", degrees => 3 });
      $q->push({ name => "Mike", degrees => 1 });
  
      $q->_acquire_resources($entry);
      $q->_acquire_resources($entry, $constraints);
  
  =cut
  
  sub _acquire_resources {
      &App::sub_entry if ($App::trace);
      my ($self, $entry, $constraints) = @_;
  
      my $complies = 1;
  
      my $global_constraints = $self->{global_constraints};
      if (!$global_constraints || $#$global_constraints == -1) {
          # do nothing
      }
      else {
          foreach my $c (@$global_constraints) {
              if ($c->[$GCONSTR_COUNTS]{$c->[$GCONSTR_COUNT_ATTRIB]} >= 
$c->[$GCONSTR_LIMITS]{$c->[$GCONSTR_LIMIT_ATTRIB]}) {
                  $complies = 0;
                  last;
              }
          }
      }
  
      $constraints ||= $self->{constraints};
      # print "_acquire_resources(): complies=[$complies] 
constraints=[$constraints] #constraints=[$#$constraints]\n";
      if (!$complies || !$constraints || $#$constraints == -1) {
          # do nothing
      }
      else {
          my ($key, $count_incr, $limit);
  
          if ($self->{type} eq "ARRAY") {
              foreach my $c (@$constraints) {
                  $key = $entry->[$c->[$CONSTR_KEY_IDX]];
                  $limit = $c->[$CONSTR_LIMITS]{$key};
                  $limit = $c->[$CONSTR_LIMITS]{_DEFAULT} if (!defined $limit);
                  if (defined $limit) {
                      $count_incr = (defined $c->[$CONSTR_COUNT_IDX]) ? 
$entry->[$c->[$CONSTR_COUNT_IDX]] : 1;
                      if ($c->[$CONSTR_COUNTS]{$key} + $count_incr > $limit) {
                          $complies = 0;
                          last;
                      }
                  }
              }
              if ($complies) {
                  foreach my $c (@$constraints) {
                      $key = $entry->[$c->[$CONSTR_KEY_IDX]];
                      $limit = $c->[$CONSTR_LIMITS]{$key};
                      $limit = $c->[$CONSTR_LIMITS]{_DEFAULT} if (!defined 
$limit);
                      if (defined $limit) {
                          $count_incr = (defined $c->[$CONSTR_COUNT_IDX]) ? 
$entry->[$c->[$CONSTR_COUNT_IDX]] : 1;
                          $c->[$CONSTR_COUNTS]{$key} += $count_incr;
                      }
                  }
              }
          }
          else {
              foreach my $c (@$constraints) {
                  $key = $entry->{$c->[$CONSTR_KEY_ATTRIB]};
                  $limit = $c->[$CONSTR_LIMITS]{$key};
                  $limit = $c->[$CONSTR_LIMITS]{_DEFAULT} if (!defined $limit);
                  if (defined $limit) {
                      $count_incr = (defined $c->[$CONSTR_COUNT_ATTRIB]) ? 
$entry->{$c->[$CONSTR_COUNT_ATTRIB]} : 1;
                      if ($c->[$CONSTR_COUNTS]{$key} + $count_incr > $limit) {
                          $complies = 0;
                          last;
                      }
                  }
              }
              if ($complies) {
                  foreach my $c (@$constraints) {
                      $key = $entry->{$c->[$CONSTR_KEY_ATTRIB]};
                      $limit = $c->[$CONSTR_LIMITS]{$key};
                      $limit = $c->[$CONSTR_LIMITS]{_DEFAULT} if (!defined 
$limit);
                      if (defined $limit) {
                          $count_incr = (defined $c->[$CONSTR_COUNT_ATTRIB]) ? 
$entry->{$c->[$CONSTR_COUNT_ATTRIB]} : 1;
                          $c->[$CONSTR_COUNTS]{$key} += $count_incr;
                      }
                  }
              }
          }
      }
  
      if ($complies) {
          foreach my $c (@$global_constraints) {
              $c->[$GCONSTR_COUNTS]{$c->[$GCONSTR_COUNT_ATTRIB]} ++;
          }
      }
  
      &App::sub_exit($complies) if ($App::trace);
      return($complies);
  }
  
  #############################################################################
  # _release_resources()
  #############################################################################
  
  =head2 _release_resources()
  
      * Signature: $q->_release_resources($entry, $constraints);
      * Param:     $entry             ARRAY/HASH
      * Param:     $constraints       ARRAY
      * Return:    undef
      * Throws:    App::Exception::WorkQueue
      * Since:     0.01
  
      Sample Usage: 
  
      $context = App->context();
      $q = $context->service("WorkQueue");  # or ...
      $q = $context->work_queue();
      $q->push({ name => "Joe", degrees => 3 });
      $q->push({ name => "Mike", degrees => 1 });
  
      $q->_release_resources($entry);
      $q->_release_resources($entry, $constraints);
  
  =cut
  
  sub _release_resources {
      &App::sub_entry if ($App::trace);
      my ($self, $entry, $constraints) = @_;
  
      my $global_constraints = $self->{global_constraints};
      if (!$global_constraints || $#$global_constraints == -1) {
          # do nothing
      }
      else {
          foreach my $c (@$global_constraints) {
              $c->[$GCONSTR_COUNTS]{$c->[$GCONSTR_COUNT_ATTRIB]} --;
          }
      }
  
      $constraints ||= $self->{constraints};
      if (!$constraints || $#$constraints == -1) {
          # do nothing
      }
      else {
          my ($key, $count_incr, $limit);
          if ($self->{type} eq "ARRAY") {
              foreach my $c (@$constraints) {
                  $key = $entry->[$c->[$CONSTR_KEY_IDX]];
                  $limit = $c->[$CONSTR_LIMITS]{$key};
                  $limit = $c->[$CONSTR_LIMITS]{_DEFAULT} if (!defined $limit);
                  if (defined $limit) {
                      $count_incr = (defined $c->[$CONSTR_COUNT_IDX]) ? 
$entry->[$c->[$CONSTR_COUNT_IDX]] : 1;
                      $c->[$CONSTR_COUNTS]{$key} -= $count_incr;
                  }
              }
          }
          else {
              foreach my $c (@$constraints) {
                  $key = $entry->{$c->[$CONSTR_KEY_ATTRIB]};
                  $limit = $c->[$CONSTR_LIMITS]{$key};
                  $limit = $c->[$CONSTR_LIMITS]{_DEFAULT} if (!defined $limit);
                  if (defined $limit) {
                      $count_incr = (defined $c->[$CONSTR_COUNT_ATTRIB]) ? 
$entry->{$c->[$CONSTR_COUNT_ATTRIB]} : 1;
                      $c->[$CONSTR_COUNTS]{$key} -= $count_incr;
                  }
              }
          }
      }
      &App::sub_exit() if ($App::trace);
  }
  
  sub _status_counts {
      my ($self) = @_;
      my $status_counts = $self->{status_counts};
      if (!$status_counts) {
          $status_counts = {};
          $self->{status_counts} = $status_counts;
      }
      return($status_counts);
  }
  
  sub _resource_counts {
      my ($self) = @_;
      my $resource_counts = $self->{resource_counts};
      if (!$resource_counts) {
          $resource_counts = $self->_initialize_resource_counts();
          $self->{resource_counts} = $resource_counts;
      }
      return($resource_counts);
  }
  
  sub _initialize_resource_counts {
      my ($self) = @_;
      my $resource_counts = {
          total => {},
          buffer => {},
      };
      return($resource_counts);
  }
  
  #############################################################################
  # _maintain_queue_buffers()
  #############################################################################
  
  =head2 _maintain_queue_buffers()
  
      * Signature: $q->_maintain_queue_buffers($add, $entry);
      * Param:     $add               [-1,0,1]
      * Param:     $entry             ARRAY/HASH
      * Return:    undef
      * Throws:    App::Exception::WorkQueue
      * Since:     0.01
  
      Sample Usage: 
  
      $context = App->context();
      $q = $context->service("WorkQueue");  # or ...
      $q = $context->work_queue();
      $q->push({ name => "Joe", degrees => 3 });
      $q->push({ name => "Mike", degrees => 1 });
  
      $q->_maintain_queue_buffers(1,$entry);
      $q->_maintain_queue_buffers(-1,$entry);
      $q->_maintain_queue_buffers();
  
  Queues may be implemented with remote storage.  In that case, there are local
  queue buffers which are maintained in memory to increase performance.
  There is conceptually a queue buffer for each combination of constraint values
  (the constraint key of an entry).
  
  The Queue Buffers are maintained each time an entry is push()ed release()d.
  The count for the particular constraint key is modified.
  If the count falls below a configured low-water mark, a new set of entries 
  are read in from the remote storage.
  
  =cut
  
  sub _maintain_queue_buffers {
      &App::sub_entry if ($App::trace);
      my ($self, $op, $entry) = @_;
  
      $op ||= "";
      my $BUFFER_SIZE = $self->{BUFFER_SIZE};
  
      my $resource_key = $self->_resource_key($entry);
      my $resource_counts = $self->_resource_counts();
  
      if ($op eq "push") {
          $resource_counts->{total}{$resource_key}++;
          if ($resource_counts->{buffer}{$resource_key} < $BUFFER_SIZE) {
              $resource_counts->{buffer}{$resource_key}++;
              $self->_push_in_mem($entry);
          }
          else {
              $self->_push_in_mem($entry,1); # release lowest
          }
      }
      elsif ($op eq "acquire") {
          $resource_counts->{total}{$resource_key}--;
          $resource_counts->{buffer}{$resource_key}--;
      }
      elsif ($op eq "release") {
          $self->_release_in_mem($entry);
      }
      elsif ($op eq "unacquire") {
          $resource_counts->{total}{$resource_key}++;
          $resource_counts->{buffer}{$resource_key}++;
      }
      my $num_total       = $resource_counts->{total}{$resource_key};
      my $num_in_buffer   = $resource_counts->{buffer}{$resource_key};
      if ($num_total > $num_in_buffer && $num_in_buffer < $BUFFER_SIZE) {
          my $num_added = $self->_refill_buffer($resource_key);
          $resource_counts->{buffer}{$resource_key} += $num_added;
      }
  
      &App::sub_exit() if ($App::trace);
  }
  
  #############################################################################
  # _resource_key()
  #############################################################################
  
  =head2 _resource_key()
  
      * Signature: $q->_resource_key($entry);
      * Param:     $entry             ARRAY/HASH
      * Return:    undef
      * Throws:    App::Exception::WorkQueue
      * Since:     0.01
  
      Sample Usage: 
  
      $context = App->context();
      $q = $context->service("WorkQueue");  # or ...
      $q = $context->work_queue();
      $q->push({ name => "Joe", degrees => 3 });
      $q->push({ name => "Mike", degrees => 1 });
  
      $resource_key = $q->_resource_key($entry);
  
  =cut
  
  sub _resource_key {
      &App::sub_entry if ($App::trace);
      my ($self, $entry) = @_;
  
      my $resource_key = "";
      my $constraints = $self->{constraints};
      if (!$constraints || $#$constraints == -1) {
          # do nothing
      }
      else {
          my (@resource_key, $key, $count_incr, $limit);
          if ($self->{type} eq "ARRAY") {
              foreach my $c (@$constraints) {
                  $key = $entry->[$c->[$CONSTR_KEY_IDX]];
                  $key = "" if (!defined $key);
                  CORE::push(@resource_key, $key);
              }
          }
          else {
              foreach my $c (@$constraints) {
                  $key = $entry->{$c->[$CONSTR_KEY_ATTRIB]};
                  $key = "" if (!defined $key);
                  CORE::push(@resource_key, $key);
              }
          }
          $resource_key = join(":",@resource_key);
      }
      &App::sub_exit($resource_key) if ($App::trace);
      return($resource_key);
  }
  
  sub _resource_key_to_params {
      &App::sub_entry if ($App::trace);
      my ($self, $resource_key) = @_;
  
      my $params = {};
      my $constraints = $self->{constraints};
      if (!$constraints || $#$constraints == -1) {
          # do nothing
      }
      else {
          my @resource_key = split(/:/, $resource_key);
          for (my $i = 0; $i <= $#$constraints; $i++) {
              $params->{$constraints->[$i][$CONSTR_KEY_ATTRIB]} = 
$resource_key[$i];
          }
      }
      &App::sub_exit($params) if ($App::trace);
      return($params);
  }
  
  #############################################################################
  # Method: service_type()
  #############################################################################
  
  =head2 service_type()
  
  Returns "WorkQueue";
  
      * Signature: $service_type = App::WorkQueue->service_type();
      * Param:     void
      * Return:    $service_type  string
      * Since:     0.01
  
      $service_type = $serializer->service_type();
  
  =cut
  
  sub service_type () { "WorkQueue"; }
  
  =head1 ACKNOWLEDGEMENTS
  
   * Author:  Stephen Adkins <[EMAIL PROTECTED]>
   * License: This is free software. It is licensed under the same terms as 
Perl itself.
  
  =head1 SEE ALSO
  
  L<C<App::Context>|App::Context>,
  L<C<App::Service>|App::Service>
  
  =cut
  
  1;
  
  __END__
  
  #############################################################################
  # _check_low_queue_buffers()
  #############################################################################
  
  =head2 _check_low_queue_buffers()
  
      * Signature: $q->_check_low_queue_buffers($entry);
      * Param:     $entry             ARRAY/HASH
      * Return:    undef
      * Throws:    App::Exception::WorkQueue
      * Since:     0.01
  
      Sample Usage: 
  
      $context = App->context();
      $q = $context->service("WorkQueue");  # or ...
      $q = $context->work_queue();
      $q->push({ name => "Joe", degrees => 3 });
      $q->push({ name => "Mike", degrees => 1 });
  
      $q->_check_low_queue_buffers($entry);
  
  Queues may be implemented with remote storage.  In that case, there are local
  queue buffers which are maintained in memory to increase performance.
  There is conceptually a queue buffer for each combination of constraint values
  (the constraint key of an entry).
  
  The Queue Buffers are checked each time an entry is push()ed.
  The count for the particular constraint key is incremented.
  Only if the count is below a configured high-water mark is it necessary
  to also push the entry onto the queue buffer.
  
  =cut
  
  sub _check_low_queue_buffers {
      &App::sub_entry if ($App::trace);
      my ($self, $entry) = @_;
  
      my $BUFFER_SIZE = $self->{BUFFER_SIZE};
      my $buffer_low = 0;
  
      my $resource_key = $self->_resource_key($entry);
      my $resource_counts = $self->_resource_counts();
      $resource_counts->{total}{$resource_key}++;
      if ($resource_counts->{buffer}{$resource_key} < $BUFFER_SIZE) {
          $resource_counts->{buffer}{$resource_key} ++;
          $buffer_low = 1;
      }
  
      &App::sub_exit($buffer_low) if ($App::trace);
      return($buffer_low);
  }
  
  
  
  

Reply via email to