cvsuser 06/02/05 20:11:44
Added: App-WorkQueue/lib/App/WorkQueue Memory.pm Repository.pm
Log:
new
Revision Changes Path
1.1 p5ee/App-WorkQueue/lib/App/WorkQueue/Memory.pm
Index: Memory.pm
===================================================================
#############################################################################
## $Id: Memory.pm,v 1.1 2006/02/06 04:11:44 spadkins Exp $
#############################################################################
package App::WorkQueue::Memory;
use App;
use App::WorkQueue;
@ISA = ( "App::WorkQueue" );
use strict;
=head1 NAME
App::WorkQueue::Memory - Interface for a work queue stored in memory
=head1 SYNOPSIS
use App;
$context = App->context();
$q = $context->service("WorkQueue");
...
=head1 DESCRIPTION
A WorkQueue is for channeling work from a variety of requesting sources to a
limited set of workers who work the queue.
=cut
sub push {
&App::sub_entry if ($App::trace);
my ($self, $entry) = @_;
if ($self->{type} eq "ARRAY") {
die "wrong number of columns" if ($#{$self->{columns}} != $#$entry);
}
$self->_push_in_mem($entry);
&App::sub_exit() if ($App::trace);
}
sub acquire {
&App::sub_entry if ($App::trace);
my ($self, $sort_spec) = @_;
my $entry = $self->_acquire_in_mem($sort_spec);
&App::sub_exit($entry) if ($App::trace);
return($entry);
}
sub release {
&App::sub_entry if ($App::trace);
my ($self, $entry, $columns, $values) = @_;
$self->_release_in_mem($entry, $columns, $values);
&App::sub_exit() if ($App::trace);
}
sub unacquire {
&App::sub_entry if ($App::trace);
my ($self, $entry) = @_;
$self->_unacquire_in_mem($entry);
&App::sub_exit() if ($App::trace);
}
sub locate {
&App::sub_entry if ($App::trace);
my ($self, $params, $options) = @_;
my @entries = $self->_locate_in_mem($params, $options);
&App::sub_exit(@entries) if ($App::trace);
return(@entries);
}
sub update {
&App::sub_entry if ($App::trace);
my ($self, $entry, $columns, $values) = @_;
$self->_update_ref($entry, $columns, $values);
&App::sub_exit() if ($App::trace);
}
sub num_entries {
&App::sub_entry if ($App::trace);
my ($self, $status) = @_;
my $num = $self->_num_entries_in_mem($status);
&App::sub_exit($num) if ($App::trace);
return($num);
}
sub count_entries_by_attrib {
&App::sub_entry if ($App::trace);
my ($self, $key_attrib, $counts, $count_attrib) = @_;
$self->_count_entries_by_attrib_in_mem($key_attrib, $counts,
$count_attrib);
&App::sub_exit() if ($App::trace);
}
sub print_entries {
&App::sub_entry if ($App::trace);
my ($self, $fh, $format, $columns) = @_;
$self->_print_entries_in_mem($fh, $format, $columns);
&App::sub_exit() if ($App::trace);
}
=head1 ACKNOWLEDGEMENTS
* Author: Stephen Adkins <[EMAIL PROTECTED]>
* License: This is free software. It is licensed under the same terms as
Perl itself.
=head1 SEE ALSO
L<C<App::Context>|App::Context>,
L<C<App::Service>|App::Service>
=cut
1;
1.1 p5ee/App-WorkQueue/lib/App/WorkQueue/Repository.pm
Index: Repository.pm
===================================================================
#############################################################################
## $Id: Repository.pm,v 1.1 2006/02/06 04:11:44 spadkins Exp $
#############################################################################
package App::WorkQueue::Repository;
use App;
use App::WorkQueue;
@ISA = ( "App::WorkQueue" );
use strict;
use App::Repository;
my $verbose = $App::options{verbose};
=head1 NAME
App::WorkQueue::Repository - Interface for a work queue stored in a repository
=head1 SYNOPSIS
use App;
$context = App->context();
$q = $context->service("WorkQueue");
...
=head1 DESCRIPTION
A WorkQueue is for channeling work from a variety of requesting sources to a
limited set of workers who work the queue.
=cut
sub _db {
my ($self) = @_;
return($self->{context}->repository($self->{repository}));
}
sub _init {
&App::sub_entry if ($App::trace);
my ($self, $args) = @_;
$self->SUPER::_init($args);
die "STATUS_UNBUFFERED not set on queue" if (!$self->{STATUS_UNBUFFERED});
die "queue_id_attrib not set on queue" if (!$self->{queue_id_attrib});
die "queue_id not set on queue" if (!$self->{queue_id});
if ($self->{auto_id_attrib}) {
if ($self->{type} eq "ARRAY") {
my $colidx = $self->_colidx();
my $attrib = $self->{auto_id_attrib};
my $columns = $self->{columns};
if (defined $colidx->{$attrib}) {
$self->{auto_id_idx} = $colidx->{$attrib};
}
else {
push(@$columns, $attrib);
$colidx->{$attrib} = $#$columns;
$self->{auto_id_idx} = $#$columns;
}
}
}
my $db = $self->_db();
if ($self->{queue_id_attrib}) {
$db->set($self->{table}, { $self->{queue_id_attrib} =>
$self->{queue_id} }, $self->{queue_id_attrib}, undef);
}
&App::sub_exit() if ($App::trace);
}
#############################################################################
# PUBLIC METHODS
#############################################################################
=head1 Public Methods:
Unlike a regular queue where the primitive operations are push()/pop(), the
primitive operations for this work queue are:
* $q->push($entry)
* $entry = $q->acquire(\%params)
* $q->release($entry)
* $q->unacquire($entry)
* @entries = $q->locate(\%params, \%options)
=cut
sub push {
&App::sub_entry if ($App::trace);
my ($self, $entry) = @_;
#if ($self->{type} eq "ARRAY") {
# die "wrong number of columns" if ($#{$self->{columns}} != $#$entry);
#}
my $columns = [$self->{status_attrib}, $self->{queue_id_attrib}];
my $values = [$self->{STATUS_UNBUFFERED}, $self->{queue_id}];
$self->_update_ref($entry, $columns, $values, 0, 1);
my $alt_entry = $self->_push_in_db($entry);
if ($alt_entry) {
$self->_update_in_db($alt_entry, $columns, $values);
$entry = $alt_entry;
}
$self->_update_ref($entry, $columns, $values, 1);
$self->_maintain_queue_buffers("push",$entry);
$self->print() if ($self->{verbose});
&App::sub_exit() if ($App::trace);
}
sub _push_in_db {
&App::sub_entry if ($App::trace);
my ($self, $entry) = @_;
my $db = $self->_db();
my $ref = ref($entry);
my $type = "";
my ($alt_entry);
if ($ref eq "ARRAY") {
$type = "ARRAY";
die "tried to push entry of type [$type] onto queue of type
[$self->{type}]" if ($type ne $self->{type});
print "PUSHED[D]: [", join("|",@$entry), "]\n" if ($verbose);
my $columns = $self->{columns};
my $auto_id_idx = $self->{auto_id_idx};
eval {
if (defined $auto_id_idx) {
$entry->[$auto_id_idx] = 0 if (!$entry->[$auto_id_idx]);
$db->insert($self->{table}, $columns, $entry);
$entry->[$auto_id_idx] = $db->_last_inserted_id();
}
else {
$db->insert($self->{table}, $columns, $entry);
}
};
if ($@) {
my $params = $self->_array_to_key_params($entry);
$alt_entry = $db->get_row($self->{table}, $params, $columns);
}
}
elsif ($ref) {
$type = "HASH";
die "tried to push entry of type [$type] onto queue of type
[$self->{type}]" if ($type ne $self->{type});
print "PUSHED[D]: {", join("|",%$entry), "}\n" if ($verbose);
my $columns = $self->{columns};
my $auto_id_attrib = $self->{auto_id_attrib};
eval {
$entry->{$auto_id_attrib} = 0 if (defined $auto_id_attrib &&
!$entry->{$auto_id_attrib});
if ($columns) {
$db->insert($self->{table}, $columns, $entry);
}
else {
$db->insert($self->{table}, $entry);
}
$entry->{$auto_id_attrib} = $db->_last_inserted_id() if (defined
$auto_id_attrib);
};
if ($@) {
my $params = $self->_hash_to_key_params($entry);
$alt_entry = $db->get_hash($self->{table}, $params, $columns);
}
}
&App::sub_exit($alt_entry) if ($App::trace);
return($alt_entry);
}
sub _array_to_key_params {
my ($self, $entry) = @_;
my $id_attribs = $self->{id_attribs};
my $id_indexes = $self->{id_indexes};
my (%params);
my $auto_id_idx = $self->{auto_id_idx};
if (defined $auto_id_idx && $entry->[$auto_id_idx]) {
$params{$self->{auto_id_attrib}} = $entry->[$auto_id_idx];
}
else {
for (my $i = 0; $i <= $#$id_indexes; $i++) {
$params{$id_attribs->[$i]} = $entry->[$id_indexes->[$i]];
}
}
return(\%params);
}
sub _hash_to_key_params {
my ($self, $entry) = @_;
my $id_attribs = $self->{id_attribs};
my (%params, $attrib);
my $auto_id_attrib = $self->{auto_id_attrib};
if (defined $auto_id_attrib && $entry->{$auto_id_attrib}) {
$params{$auto_id_attrib} = $entry->{$auto_id_attrib};
}
else {
foreach $attrib (@$id_attribs) {
$params{$attrib} = $entry->{$attrib};
}
}
return(\%params);
}
sub acquire {
&App::sub_entry if ($App::trace);
my ($self, $sort_spec) = @_;
my $entry = $self->_acquire_in_mem($sort_spec);
$self->_maintain_queue_buffers("acquire",$entry) if ($entry); # load up
buffer if it's low
$self->print() if ($self->{verbose});
&App::sub_exit($entry) if ($App::trace);
return($entry);
}
sub release {
&App::sub_entry if ($App::trace);
my ($self, $entry, $columns, $values) = @_;
$self->_release_in_mem($entry, $columns, $values);
$self->_release_in_db($entry, $columns, $values);
$self->_maintain_queue_buffers("release",$entry);
$self->print() if ($self->{verbose});
&App::sub_exit() if ($App::trace);
}
sub _release_in_db {
&App::sub_entry if ($App::trace);
my ($self, $entry, $columns, $values) = @_;
my $STATUS_ACQUIRED = $self->{STATUS_ACQUIRED};
my $STATUS_RELEASED = $self->{STATUS_RELEASED};
my $status_attrib = $self->{status_attrib};
my $data = $self->{data};
my ($e, $ent, $entry_key);
if ($self->{type} eq "ARRAY") {
my $colidx = $self->_colidx();
my $status_idx = $colidx->{$status_attrib};
die "status_attribute [$status_attrib] does not exist on elements of
this queue" if (!defined $status_idx);
$entry_key = $self->_array_to_key($entry);
}
else {
$entry_key = $self->_hash_to_key($entry);
}
&App::sub_exit() if ($App::trace);
}
sub _refill_buffer {
&App::sub_entry if ($App::trace);
my ($self, $resource_key) = @_;
my $db = $self->_db();
my $BUFFER_SIZE = $self->{BUFFER_SIZE};
my $STATUS_UNBUFFERED = $self->{STATUS_UNBUFFERED};
my $STATUS_UNACQUIRED = $self->{STATUS_UNACQUIRED};
my $STATUS_ACQUIRED = $self->{STATUS_ACQUIRED};
my $status_attrib = $self->{status_attrib};
my $params = $self->_resource_key_to_params($resource_key);
$params->{$status_attrib} = $STATUS_UNBUFFERED;
if ($self->{queue_id_attrib} && $self->{queue_id}) {
$params->{$self->{queue_id_attrib}} = $self->{queue_id};
}
my (%options, $new_entries);
$options{numrows} = $BUFFER_SIZE;
$self->_sort_spec_to_options(\%options) if ($self->{sort_spec});
my $columns = $self->{columns};
if ($self->{type} eq "ARRAY") {
$new_entries = $db->get_rows($self->{table}, $params, $columns,
\%options);
}
else {
$params->{$status_attrib} = $STATUS_UNBUFFERED;
$new_entries = $db->get_hashes($self->{table}, $params, $columns,
\%options);
}
foreach my $new_entry (@$new_entries) {
$self->_push_in_mem($new_entry);
}
my $num_added = $#$new_entries + 1;
&App::sub_exit($num_added) if ($App::trace);
return($num_added);
}
sub _sort_spec_to_options {
&App::sub_entry if ($App::trace);
my ($self, $options) = @_;
if ($self->{sort_columns}) {
my $sort_columns = $self->{sort_columns};
my $direction = $self->{direction};
#my $numeric = $self->{numeric};
$options->{order_by} = $sort_columns;
for (my $i = 0; $i <= $#$sort_columns; $i++) {
if ($direction->[$i] < 0) {
$options->{direction}{$sort_columns->[$i]} = "desc";
}
}
}
&App::sub_exit() if ($App::trace);
}
sub unacquire {
&App::sub_entry if ($App::trace);
my ($self, $entry) = @_;
$self->_unacquire_in_mem($entry);
$self->_maintain_queue_buffers("unacquire",$entry);
&App::sub_exit() if ($App::trace);
}
sub locate {
&App::sub_entry if ($App::trace);
my ($self, $params, $options) = @_;
my @entries = $self->_locate_in_db($params, $options);
&App::sub_exit(@entries) if ($App::trace);
return(@entries);
}
sub update {
&App::sub_entry if ($App::trace);
my ($self, $entry, $columns, $values) = @_;
$self->_update_ref($entry, $columns, $values);
$self->_update_in_db($entry, $columns, $values);
&App::sub_exit() if ($App::trace);
}
sub _update_in_db {
&App::sub_entry if ($App::trace);
my ($self, $entry, $columns, $values) = @_;
my $db = $self->_db();
my ($params);
if ($self->{type} eq "ARRAY") {
$params = $self->_array_to_key_params($entry);
}
else {
$params = $self->_hash_to_key_params($entry);
}
$db->update($self->{table}, $params, $columns, $values);
&App::sub_exit() if ($App::trace);
}
sub num_entries {
&App::sub_entry if ($App::trace);
my ($self, $status) = @_;
my $num = $self->_num_entries_from_status_counts($status);
&App::sub_exit($num) if ($App::trace);
return($num);
}
sub count_entries_by_attrib {
&App::sub_entry if ($App::trace);
my ($self, $key_attrib, $counts, $count_attrib) = @_;
$self->_count_entries_by_attrib_in_db($key_attrib, $counts,
$count_attrib);
&App::sub_exit() if ($App::trace);
}
sub _count_entries_by_attrib_in_db {
&App::sub_entry if ($App::trace);
my ($self, $key_attrib, $counts, $count_attrib) = @_;
my $STATUS_UNBUFFERED = $self->{STATUS_UNBUFFERED};
my $STATUS_UNACQUIRED = $self->{STATUS_UNACQUIRED};
my $STATUS_ACQUIRED = $self->{STATUS_ACQUIRED};
my $status_attrib = $self->{status_attrib};
my $db = $self->_db();
my $count_expr = "count(1)";
$count_expr = "sum($count_attrib)" if ($count_attrib);
my $params = { $status_attrib => "$STATUS_UNBUFFERED,$STATUS_UNACQUIRED"
};
if ($self->{queue_id_attrib} && $self->{queue_id}) {
$params->{$self->{queue_id_attrib}} = $self->{queue_id};
}
my $rows = $db->get_rows($self->{table}, { $status_attrib =>
"$STATUS_UNBUFFERED,$STATUS_UNACQUIRED", },
[ $key_attrib, $count_expr ], { group_by => [ $key_attrib ] });
foreach my $row (@$rows) {
$counts->{$row->[0]} = $row->[1];
}
&App::sub_exit() if ($App::trace);
}
sub print {
&App::sub_entry if ($App::trace);
my ($self, $fh) = @_;
$fh = \*STDOUT if (!$fh);
print $fh "ENTRIES IN MEM:\n";
$self->_print_entries_in_mem($fh);
print $fh "ENTRIES IN DB:\n";
$self->_print_entries_in_db($fh);
print $fh "CONSTRAINTS:\n";
$self->print_constraints($fh);
print $fh "STATUS COUNTS:\n";
$self->print_status_counts($fh);
print $fh "RESOURCE COUNTS:\n";
$self->print_resource_counts($fh);
&App::sub_exit() if ($App::trace);
}
sub print_entries {
&App::sub_entry if ($App::trace);
my ($self, $fh, $format, $columns) = @_;
$fh = \*STDOUT if (!$fh);
$self->_print_entries_in_mem($fh, $format, $columns);
$self->_print_entries_in_db($fh, $format, $columns);
&App::sub_exit() if ($App::trace);
}
sub _print_entries_in_db {
&App::sub_entry if ($App::trace);
my ($self, $fh, $format, $columns) = @_;
$fh = \*STDOUT if (!$fh);
my $db = $self->_db();
$columns = $self->{columns} if (!$columns);
my (%options);
$self->_sort_spec_to_options(\%options);
my $rows = $db->get_rows($self->{table}, {}, $columns, \%options);
foreach my $row (@$rows) {
print " [", join("|",@$row), "]\n";
}
&App::sub_exit() if ($App::trace);
}
=head1 ACKNOWLEDGEMENTS
* Author: Stephen Adkins <[EMAIL PROTECTED]>
* License: This is free software. It is licensed under the same terms as
Perl itself.
=head1 SEE ALSO
L<C<App::Context>|App::Context>,
L<C<App::Service>|App::Service>
=cut
1;
__END__