cvsuser     06/02/05 20:11:44

  Added:       App-WorkQueue/lib/App/WorkQueue Memory.pm Repository.pm
  Log:
  new
  
  Revision  Changes    Path
  1.1                  p5ee/App-WorkQueue/lib/App/WorkQueue/Memory.pm
  
  Index: Memory.pm
  ===================================================================
  
  #############################################################################
  ## $Id: Memory.pm,v 1.1 2006/02/06 04:11:44 spadkins Exp $
  #############################################################################
  
  package App::WorkQueue::Memory;
  
  use App;
  use App::WorkQueue;
  @ISA = ( "App::WorkQueue" );
  
  use strict;
  
  =head1 NAME
  
  App::WorkQueue::Memory - Interface for a work queue stored in memory
  
  =head1 SYNOPSIS
  
      use App;
  
      $context = App->context();
      $q = $context->service("WorkQueue"); 
  
      ...
  
  =head1 DESCRIPTION
  
  A WorkQueue is for channeling work from a variety of requesting sources to a 
  limited set of workers who work the queue.
  
  =cut
  
  sub push {
      &App::sub_entry if ($App::trace);
      my ($self, $entry) = @_;
      if ($self->{type} eq "ARRAY") {
          die "wrong number of columns" if ($#{$self->{columns}} != $#$entry);
      }
      $self->_push_in_mem($entry);
      &App::sub_exit() if ($App::trace);
  }
  
  sub acquire {
      &App::sub_entry if ($App::trace);
      my ($self, $sort_spec) = @_;
      my $entry = $self->_acquire_in_mem($sort_spec);
      &App::sub_exit($entry) if ($App::trace);
      return($entry);
  }
  
  sub release {
      &App::sub_entry if ($App::trace);
      my ($self, $entry, $columns, $values) = @_;
      $self->_release_in_mem($entry, $columns, $values);
      &App::sub_exit() if ($App::trace);
  }
  
  sub unacquire {
      &App::sub_entry if ($App::trace);
      my ($self, $entry) = @_;
      $self->_unacquire_in_mem($entry);
      &App::sub_exit() if ($App::trace);
  }
  
  sub locate {
      &App::sub_entry if ($App::trace);
      my ($self, $params, $options) = @_;
      my @entries = $self->_locate_in_mem($params, $options);
      &App::sub_exit(@entries) if ($App::trace);
      return(@entries);
  }
  
  sub update {
      &App::sub_entry if ($App::trace);
      my ($self, $entry, $columns, $values) = @_;
      $self->_update_ref($entry, $columns, $values);
      &App::sub_exit() if ($App::trace);
  }
  
  sub num_entries {
      &App::sub_entry if ($App::trace);
      my ($self, $status) = @_;
      my $num = $self->_num_entries_in_mem($status);
      &App::sub_exit($num) if ($App::trace);
      return($num);
  }
  
  sub count_entries_by_attrib {
      &App::sub_entry if ($App::trace);
      my ($self, $key_attrib, $counts, $count_attrib) = @_;
      $self->_count_entries_by_attrib_in_mem($key_attrib, $counts, 
$count_attrib);
      &App::sub_exit() if ($App::trace);
  }
  
  sub print_entries {
      &App::sub_entry if ($App::trace);
      my ($self, $fh, $format, $columns) = @_;
      $self->_print_entries_in_mem($fh, $format, $columns);
      &App::sub_exit() if ($App::trace);
  }
  
  =head1 ACKNOWLEDGEMENTS
  
   * Author:  Stephen Adkins <[EMAIL PROTECTED]>
   * License: This is free software. It is licensed under the same terms as 
Perl itself.
  
  =head1 SEE ALSO
  
  L<C<App::Context>|App::Context>,
  L<C<App::Service>|App::Service>
  
  =cut
  
  1;
  
  
  
  
  1.1                  p5ee/App-WorkQueue/lib/App/WorkQueue/Repository.pm
  
  Index: Repository.pm
  ===================================================================
  
  #############################################################################
  ## $Id: Repository.pm,v 1.1 2006/02/06 04:11:44 spadkins Exp $
  #############################################################################
  
  package App::WorkQueue::Repository;
  
  use App;
  use App::WorkQueue;
  @ISA = ( "App::WorkQueue" );
  
  use strict;
  use App::Repository;
  
  my $verbose = $App::options{verbose};
  
  =head1 NAME
  
  App::WorkQueue::Repository - Interface for a work queue stored in a repository
  
  =head1 SYNOPSIS
  
      use App;
  
      $context = App->context();
      $q = $context->service("WorkQueue"); 
  
      ...
  
  =head1 DESCRIPTION
  
  A WorkQueue is for channeling work from a variety of requesting sources to a 
  limited set of workers who work the queue.
  
  =cut
  
  sub _db {
      my ($self) = @_;
      return($self->{context}->repository($self->{repository}));
  }
  
  sub _init {
      &App::sub_entry if ($App::trace);
      my ($self, $args) = @_;
      $self->SUPER::_init($args);
      die "STATUS_UNBUFFERED not set on queue" if (!$self->{STATUS_UNBUFFERED});
      die "queue_id_attrib not set on queue"   if (!$self->{queue_id_attrib});
      die "queue_id not set on queue"          if (!$self->{queue_id});
      if ($self->{auto_id_attrib}) {
          if ($self->{type} eq "ARRAY") {
              my $colidx  = $self->_colidx();
              my $attrib  = $self->{auto_id_attrib};
              my $columns = $self->{columns};
              if (defined $colidx->{$attrib}) {
                  $self->{auto_id_idx} = $colidx->{$attrib};
              }
              else {
                  push(@$columns, $attrib);
                  $colidx->{$attrib} = $#$columns;
                  $self->{auto_id_idx} = $#$columns;
              }
          }
      }
      my $db = $self->_db();
      if ($self->{queue_id_attrib}) {
          $db->set($self->{table}, { $self->{queue_id_attrib} => 
$self->{queue_id} }, $self->{queue_id_attrib}, undef);
      }
      &App::sub_exit() if ($App::trace);
  }
  
  #############################################################################
  # PUBLIC METHODS
  #############################################################################
  
  =head1 Public Methods:
  
  Unlike a regular queue where the primitive operations are push()/pop(), the
  primitive operations for this work queue are:
  
   * $q->push($entry)
   * $entry = $q->acquire(\%params)
   * $q->release($entry)
   * $q->unacquire($entry)
   * @entries = $q->locate(\%params, \%options)
  
  =cut
  
  sub push {
      &App::sub_entry if ($App::trace);
      my ($self, $entry) = @_;
      #if ($self->{type} eq "ARRAY") {
      #    die "wrong number of columns" if ($#{$self->{columns}} != $#$entry);
      #}
      my $columns = [$self->{status_attrib}, $self->{queue_id_attrib}];
      my $values  = [$self->{STATUS_UNBUFFERED}, $self->{queue_id}];
      $self->_update_ref($entry, $columns, $values, 0, 1);
      my $alt_entry = $self->_push_in_db($entry);
      if ($alt_entry) {
          $self->_update_in_db($alt_entry, $columns, $values);
          $entry = $alt_entry;
      }
      $self->_update_ref($entry, $columns, $values, 1);
      $self->_maintain_queue_buffers("push",$entry);
      $self->print() if ($self->{verbose});
      &App::sub_exit() if ($App::trace);
  }
  
  sub _push_in_db {
      &App::sub_entry if ($App::trace);
      my ($self, $entry) = @_;
      my $db = $self->_db();
      my $ref = ref($entry);
      my $type = "";
      my ($alt_entry);
      if ($ref eq "ARRAY") {
          $type = "ARRAY";
          die "tried to push entry of type [$type] onto queue of type 
[$self->{type}]" if ($type ne $self->{type});
          print "PUSHED[D]: [", join("|",@$entry), "]\n" if ($verbose);
          my $columns = $self->{columns};
          my $auto_id_idx = $self->{auto_id_idx};
          eval {
              if (defined $auto_id_idx) {
                  $entry->[$auto_id_idx] = 0 if (!$entry->[$auto_id_idx]);
                  $db->insert($self->{table}, $columns, $entry);
                  $entry->[$auto_id_idx] = $db->_last_inserted_id();
              }
              else {
                  $db->insert($self->{table}, $columns, $entry);
              }
          };
          if ($@) {
              my $params = $self->_array_to_key_params($entry);
              $alt_entry = $db->get_row($self->{table}, $params, $columns);
          }
      }
      elsif ($ref) {
          $type = "HASH";
          die "tried to push entry of type [$type] onto queue of type 
[$self->{type}]" if ($type ne $self->{type});
          print "PUSHED[D]: {", join("|",%$entry), "}\n" if ($verbose);
          my $columns = $self->{columns};
          my $auto_id_attrib = $self->{auto_id_attrib};
          eval {
              $entry->{$auto_id_attrib} = 0 if (defined $auto_id_attrib && 
!$entry->{$auto_id_attrib});
              if ($columns) {
                  $db->insert($self->{table}, $columns, $entry);
              }
              else {
                  $db->insert($self->{table}, $entry);
              }
              $entry->{$auto_id_attrib} = $db->_last_inserted_id() if (defined 
$auto_id_attrib);
          };
          if ($@) {
              my $params = $self->_hash_to_key_params($entry);
              $alt_entry = $db->get_hash($self->{table}, $params, $columns);
          }
      }
      &App::sub_exit($alt_entry) if ($App::trace);
      return($alt_entry);
  }
  
  sub _array_to_key_params {
      my ($self, $entry) = @_;
      my $id_attribs = $self->{id_attribs};
      my $id_indexes = $self->{id_indexes};
      my (%params);
      my $auto_id_idx = $self->{auto_id_idx};
      if (defined $auto_id_idx && $entry->[$auto_id_idx]) {
          $params{$self->{auto_id_attrib}} = $entry->[$auto_id_idx];
      }
      else {
          for (my $i = 0; $i <= $#$id_indexes; $i++) {
              $params{$id_attribs->[$i]} = $entry->[$id_indexes->[$i]];
          }
      }
      return(\%params);
  }
  
  sub _hash_to_key_params {
      my ($self, $entry) = @_;
      my $id_attribs = $self->{id_attribs};
      my (%params, $attrib);
      my $auto_id_attrib = $self->{auto_id_attrib};
      if (defined $auto_id_attrib && $entry->{$auto_id_attrib}) {
          $params{$auto_id_attrib} = $entry->{$auto_id_attrib};
      }
      else {
          foreach $attrib (@$id_attribs) {
              $params{$attrib} = $entry->{$attrib};
          }
      }
      return(\%params);
  }
  
  sub acquire {
      &App::sub_entry if ($App::trace);
      my ($self, $sort_spec) = @_;
      my $entry = $self->_acquire_in_mem($sort_spec);
      $self->_maintain_queue_buffers("acquire",$entry) if ($entry);  # load up 
buffer if it's low
      $self->print() if ($self->{verbose});
      &App::sub_exit($entry) if ($App::trace);
      return($entry);
  }
  
  sub release {
      &App::sub_entry if ($App::trace);
      my ($self, $entry, $columns, $values) = @_;
      $self->_release_in_mem($entry, $columns, $values);
      $self->_release_in_db($entry, $columns, $values);
      $self->_maintain_queue_buffers("release",$entry);
      $self->print() if ($self->{verbose});
      &App::sub_exit() if ($App::trace);
  }
  
  sub _release_in_db {
      &App::sub_entry if ($App::trace);
      my ($self, $entry, $columns, $values) = @_;
  
      my $STATUS_ACQUIRED   = $self->{STATUS_ACQUIRED};
      my $STATUS_RELEASED   = $self->{STATUS_RELEASED};
      my $status_attrib     = $self->{status_attrib};
      my $data              = $self->{data};
      my ($e, $ent, $entry_key);
  
      if ($self->{type} eq "ARRAY") {
          my $colidx = $self->_colidx();
          my $status_idx = $colidx->{$status_attrib};
          die "status_attribute [$status_attrib] does not exist on elements of 
this queue" if (!defined $status_idx);
          $entry_key = $self->_array_to_key($entry);
      }
      else {
          $entry_key = $self->_hash_to_key($entry);
      }
  
      &App::sub_exit() if ($App::trace);
  }
  
  sub _refill_buffer {
      &App::sub_entry if ($App::trace);
      my ($self, $resource_key) = @_;
  
      my $db = $self->_db();
  
      my $BUFFER_SIZE       = $self->{BUFFER_SIZE};
      my $STATUS_UNBUFFERED = $self->{STATUS_UNBUFFERED};
      my $STATUS_UNACQUIRED = $self->{STATUS_UNACQUIRED};
      my $STATUS_ACQUIRED   = $self->{STATUS_ACQUIRED};
      my $status_attrib     = $self->{status_attrib};
  
      my $params = $self->_resource_key_to_params($resource_key);
      $params->{$status_attrib} = $STATUS_UNBUFFERED;
      if ($self->{queue_id_attrib} && $self->{queue_id}) {
          $params->{$self->{queue_id_attrib}} = $self->{queue_id};
      }
  
      my (%options, $new_entries);
      $options{numrows} = $BUFFER_SIZE;
      $self->_sort_spec_to_options(\%options) if ($self->{sort_spec});
  
      my $columns = $self->{columns};
  
      if ($self->{type} eq "ARRAY") {
          $new_entries = $db->get_rows($self->{table}, $params, $columns, 
\%options);
      }
      else {
          $params->{$status_attrib} = $STATUS_UNBUFFERED;
          $new_entries = $db->get_hashes($self->{table}, $params, $columns, 
\%options);
      }
      foreach my $new_entry (@$new_entries) {
          $self->_push_in_mem($new_entry);
      }
      my $num_added = $#$new_entries + 1;
      &App::sub_exit($num_added) if ($App::trace);
      return($num_added);
  }
  
  sub _sort_spec_to_options {
      &App::sub_entry if ($App::trace);
      my ($self, $options) = @_;
      if ($self->{sort_columns}) {
          my $sort_columns = $self->{sort_columns};
          my $direction    = $self->{direction};
          #my $numeric      = $self->{numeric};
          $options->{order_by} = $sort_columns;
          for (my $i = 0; $i <= $#$sort_columns; $i++) {
              if ($direction->[$i] < 0) {
                  $options->{direction}{$sort_columns->[$i]} = "desc";
              }
          }
      }
      &App::sub_exit() if ($App::trace);
  }
  
  sub unacquire {
      &App::sub_entry if ($App::trace);
      my ($self, $entry) = @_;
      $self->_unacquire_in_mem($entry);
      $self->_maintain_queue_buffers("unacquire",$entry);
      &App::sub_exit() if ($App::trace);
  }
  
  sub locate {
      &App::sub_entry if ($App::trace);
      my ($self, $params, $options) = @_;
      my @entries = $self->_locate_in_db($params, $options);
      &App::sub_exit(@entries) if ($App::trace);
      return(@entries);
  }
  
  sub update {
      &App::sub_entry if ($App::trace);
      my ($self, $entry, $columns, $values) = @_;
      $self->_update_ref($entry, $columns, $values);
      $self->_update_in_db($entry, $columns, $values);
      &App::sub_exit() if ($App::trace);
  }
  
  sub _update_in_db {
      &App::sub_entry if ($App::trace);
      my ($self, $entry, $columns, $values) = @_;
      my $db = $self->_db();
      my ($params);
      if ($self->{type} eq "ARRAY") {
          $params = $self->_array_to_key_params($entry);
      }
      else {
          $params = $self->_hash_to_key_params($entry);
      }
      $db->update($self->{table}, $params, $columns, $values);
      &App::sub_exit() if ($App::trace);
  }
  
  sub num_entries {
      &App::sub_entry if ($App::trace);
      my ($self, $status) = @_;
      my $num = $self->_num_entries_from_status_counts($status);
      &App::sub_exit($num) if ($App::trace);
      return($num);
  }
  
  sub count_entries_by_attrib {
      &App::sub_entry if ($App::trace);
      my ($self, $key_attrib, $counts, $count_attrib) = @_;
      $self->_count_entries_by_attrib_in_db($key_attrib, $counts, 
$count_attrib);
      &App::sub_exit() if ($App::trace);
  }
  
  sub _count_entries_by_attrib_in_db {
      &App::sub_entry if ($App::trace);
      my ($self, $key_attrib, $counts, $count_attrib) = @_;
      my $STATUS_UNBUFFERED = $self->{STATUS_UNBUFFERED};
      my $STATUS_UNACQUIRED = $self->{STATUS_UNACQUIRED};
      my $STATUS_ACQUIRED   = $self->{STATUS_ACQUIRED};
      my $status_attrib     = $self->{status_attrib};
      my $db = $self->_db();
      my $count_expr = "count(1)";
      $count_expr = "sum($count_attrib)" if ($count_attrib);
      my $params = { $status_attrib => "$STATUS_UNBUFFERED,$STATUS_UNACQUIRED" 
};
      if ($self->{queue_id_attrib} && $self->{queue_id}) {
          $params->{$self->{queue_id_attrib}} = $self->{queue_id};
      }
      my $rows = $db->get_rows($self->{table}, { $status_attrib => 
"$STATUS_UNBUFFERED,$STATUS_UNACQUIRED", },
          [ $key_attrib, $count_expr ], { group_by => [ $key_attrib ] });
      foreach my $row (@$rows) {
          $counts->{$row->[0]} = $row->[1];
      }
      &App::sub_exit() if ($App::trace);
  }
  
  sub print {
      &App::sub_entry if ($App::trace);
      my ($self, $fh) = @_;
      $fh = \*STDOUT if (!$fh);
      print $fh "ENTRIES IN MEM:\n";
      $self->_print_entries_in_mem($fh);
      print $fh "ENTRIES IN DB:\n";
      $self->_print_entries_in_db($fh);
      print $fh "CONSTRAINTS:\n";
      $self->print_constraints($fh);
      print $fh "STATUS COUNTS:\n";
      $self->print_status_counts($fh);
      print $fh "RESOURCE COUNTS:\n";
      $self->print_resource_counts($fh);
      &App::sub_exit() if ($App::trace);
  }
  
  sub print_entries {
      &App::sub_entry if ($App::trace);
      my ($self, $fh, $format, $columns) = @_;
      $fh = \*STDOUT if (!$fh);
      $self->_print_entries_in_mem($fh, $format, $columns);
      $self->_print_entries_in_db($fh, $format, $columns);
      &App::sub_exit() if ($App::trace);
  }
  
  sub _print_entries_in_db {
      &App::sub_entry if ($App::trace);
      my ($self, $fh, $format, $columns) = @_;
      $fh = \*STDOUT if (!$fh);
      my $db = $self->_db();
      $columns = $self->{columns} if (!$columns);
      my (%options);
      $self->_sort_spec_to_options(\%options);
      my $rows = $db->get_rows($self->{table}, {}, $columns, \%options);
      foreach my $row (@$rows) {
          print "   [", join("|",@$row), "]\n";
      }
      &App::sub_exit() if ($App::trace);
  }
  
  =head1 ACKNOWLEDGEMENTS
  
   * Author:  Stephen Adkins <[EMAIL PROTECTED]>
   * License: This is free software. It is licensed under the same terms as 
Perl itself.
  
  =head1 SEE ALSO
  
  L<C<App::Context>|App::Context>,
  L<C<App::Service>|App::Service>
  
  =cut
  
  1;
  
  __END__
  
  
  
  

Reply via email to