cvsuser 06/02/28 19:39:51
Modified: App-WorkQueue/lib/App WorkQueue.pm
App-WorkQueue/lib/App/WorkQueue Repository.pm
Log:
get Repository queue to work
Revision Changes Path
1.4 +152 -103 p5ee/App-WorkQueue/lib/App/WorkQueue.pm
Index: WorkQueue.pm
===================================================================
RCS file: /cvs/public/p5ee/App-WorkQueue/lib/App/WorkQueue.pm,v
retrieving revision 1.3
retrieving revision 1.4
diff -u -r1.3 -r1.4
--- WorkQueue.pm 9 Feb 2006 17:57:15 -0000 1.3
+++ WorkQueue.pm 1 Mar 2006 03:39:51 -0000 1.4
@@ -19,6 +19,7 @@
use strict;
my $verbose = $App::options{verbose};
+$verbose = 0 if (!$verbose || $verbose < 5);
=head1 NAME
@@ -71,6 +72,8 @@
my $GCONSTR_LIMITS = 1;
my $GCONSTR_COUNT_ATTRIB = 2;
my $GCONSTR_LIMIT_ATTRIB = 3;
+my $GCONSTR_VALUE_ATTRIB = 4;
+my $GCONSTR_HARD_LIMIT = 5;
my $CONSTR_COUNTS = 0;
my $CONSTR_LIMITS = 1;
@@ -114,15 +117,18 @@
$self->SUPER::_init($args);
$self->{data} = [];
- $self->{type} = ($self->{columns}) ? "ARRAY" : "HASH";
+ if (!$self->{type}) {
+ $self->{type} = $self->{queue_type} || ($self->{columns} ? "ARRAY" :
"HASH");
+ }
die "status_attrib not set on queue" if (!$self->{status_attrib});
#die "STATUS_UNBUFFERED not set on queue" if
(!$self->{STATUS_UNBUFFERED});
die "STATUS_UNACQUIRED not set on queue" if
(!$self->{STATUS_UNACQUIRED});
die "STATUS_ACQUIRED not set on queue" if (!$self->{STATUS_ACQUIRED});
die "STATUS_RELEASED not set on queue" if (!$self->{STATUS_RELEASED});
+ die "id_attrib or auto_id_attrib not set on queue" if
(!$self->{id_attrib} && !$self->{auto_id_attrib});
- my $id_attrib = $self->{id_attrib} || die "id_attrib not set on queue";
+ my $id_attrib = $self->{id_attrib};
if ($self->{type} eq "ARRAY") {
my $colidx = $self->_colidx();
my $id_attribs = [split(/,/,$id_attrib)];
@@ -136,10 +142,36 @@
else {
$self->{id_attribs} = [split(/,/,$id_attrib)];
}
+
+ if ($self->{auto_id_attrib}) {
+ if ($self->{type} eq "ARRAY") {
+ my $colidx = $self->_colidx();
+ my $attrib = $self->{auto_id_attrib};
+ my $columns = $self->{columns};
+ if (defined $colidx->{$attrib}) {
+ $self->{auto_id_idx} = $colidx->{$attrib};
+ }
+ else {
+ push(@$columns, $attrib);
+ $colidx->{$attrib} = $#$columns;
+ $self->{auto_id_idx} = $#$columns;
+ }
+ }
+ }
+
+ $self->{status_counts} = {};
+
if ($self->{sort_spec}) {
$self->_analyze_sort_spec();
}
- $self->{verbose} = $App::options{verbose};
+
+ $self->{verbose} = $verbose;
+ &App::sub_exit() if ($App::trace);
+}
+
+sub shutdown {
+ &App::sub_entry if ($App::trace);
+ my ($self) = @_;
&App::sub_exit() if ($App::trace);
}
@@ -166,7 +198,7 @@
primitive operations for this work queue are:
* $q->push($entry)
- * $entry = $q->acquire(\%params)
+ * $entry = $q->acquire()
* $q->release($entry)
* $q->unacquire($entry)
* @entries = $q->locate(\%params, \%options)
@@ -235,13 +267,14 @@
my $inserted = 0;
if ($release_lowest) {
my $removed = 0;
- my ($ent);
+ my ($ent, $cmp);
my $resource_key = $self->_resource_key($entry);
for (my $i = $#$entries; $i >= 0; $i--) {
$ent = $entries->[$i];
if ($self->_resource_key($ent) eq $resource_key) {
# IF not inserted yet AND new entry is lower or same
priority THEN
- if (!$inserted && $self->_compare_entries($entry, $ent)
> -1) {
+ $cmp = $self->_compare_entries($entry, $ent);
+ if (!$inserted && $cmp > -1) {
if (!$removed) { # "insert and remove self"
$inserted = 1;
$removed = 1;
@@ -369,7 +402,6 @@
=head2 acquire()
* Signature: $entry = $q->acquire();
- * Signature: $entry = $q->acquire($sort_spec);
* Param: $sort_spec string
* Return: $entry HASH/ARRAY
* Throws: App::Exception::WorkQueue
@@ -383,7 +415,7 @@
sub acquire {
&App::sub_entry if ($App::trace);
- my ($self, $sort_spec) = @_;
+ my ($self) = @_;
my $entry = undef;
die "acquire() not implemented";
&App::sub_exit($entry) if ($App::trace);
@@ -392,7 +424,7 @@
sub _acquire_in_mem {
&App::sub_entry if ($App::trace);
- my ($self, $sort_spec) = @_;
+ my ($self) = @_;
my ($entry);
my $STATUS_UNACQUIRED = $self->{STATUS_UNACQUIRED};
@@ -405,25 +437,39 @@
my $colidx = $self->_colidx();
my $status_idx = $colidx->{$status_attrib};
die "status_attribute [$status_attrib] does not exist on
elements of this queue" if (!defined $status_idx);
+ my ($acquired);
foreach my $e (@$entries) {
# print ">>> ENTRY: status=[$e->[$status_idx]]
idx=[$status_idx] (should be $STATUS_UNACQUIRED)\n";
next if ($e->[$status_idx] ne $STATUS_UNACQUIRED);
if ($self->_acquire_resources($e)) {
$entry = $e;
-
$self->update($entry,[$self->{status_attrib}],[$self->{STATUS_ACQUIRED}]);
- print "ACQUIRED[M]: [", join("|",@$e), "]\n" if
($verbose);
- last;
+ $acquired = $self->_acquire_entry($entry);
+ if ($acquired) {
+ print "ACQUIRED[M]: [", join("|",@$e), "]\n" if
($verbose);
+ last;
+ }
+ else {
+ $self->_release_resources($entry);
+ $self->_maintain_queue_buffers("release",$entry);
+ }
}
}
}
else {
+ my ($acquired);
foreach my $e (@$entries) {
next if ($e->{$status_attrib} ne $STATUS_UNACQUIRED);
if ($self->_acquire_resources($e)) {
$entry = $e;
-
$self->update($entry,[$self->{status_attrib}],[$self->{STATUS_ACQUIRED}]);
- print "ACQUIRED[M]: {", join("|",%$e), "}\n" if
($verbose);
- last;
+ $acquired = $self->_acquire_entry($entry);
+ if ($acquired) {
+ print "ACQUIRED[M]: {", join("|",%$e), "}\n" if
($verbose);
+ last;
+ }
+ else {
+ $self->_release_resources($entry);
+ $self->_maintain_queue_buffers("release",$entry);
+ }
}
}
}
@@ -434,6 +480,46 @@
return($entry);
}
+sub _acquire_entry {
+ &App::sub_entry if ($App::trace);
+ my ($self, $entry) = @_;
+ my $acquired = 1;
+
$self->update($entry,[$self->{status_attrib}],[$self->{STATUS_ACQUIRED}]);
+ &App::sub_exit($acquired) if ($App::trace);
+ return($acquired);
+}
+
+sub acquired_entries {
+ &App::sub_entry if ($App::trace);
+ my ($self) = @_;
+
+ my (@entries);
+ my $STATUS_UNACQUIRED = $self->{STATUS_UNACQUIRED};
+ my $STATUS_UNBUFFERED = $self->{STATUS_UNBUFFERED};
+ my $status_attrib = $self->{status_attrib};
+ my $entries = $self->{data};
+
+ if ($self->{type} eq "ARRAY") {
+ my $colidx = $self->_colidx();
+ my $status_idx = $colidx->{$status_attrib};
+ foreach my $entry (@$entries) {
+ if ($entry->[$status_idx] ne $STATUS_UNACQUIRED &&
$entry->[$status_idx] ne $STATUS_UNBUFFERED) {
+ CORE::push(@entries, $entry);
+ }
+ }
+ }
+ else {
+ foreach my $entry (@$entries) {
+ if ($entry->{$status_attrib} ne $STATUS_UNACQUIRED &&
$entry->{$status_attrib} ne $STATUS_UNBUFFERED) {
+ CORE::push(@entries, $entry);
+ }
+ }
+ }
+
+ &App::sub_exit([EMAIL PROTECTED]) if ($App::trace);
+ return([EMAIL PROTECTED]);
+}
+
#############################################################################
# release()
#############################################################################
@@ -520,26 +606,38 @@
sub _array_to_key {
my ($self, $entry) = @_;
- my $id_indexes = $self->{id_indexes};
my ($key);
- if ($#$id_indexes == 0) {
- $key = $entry->[$id_indexes->[0]];
+ my $auto_id_idx = $self->{auto_id_idx};
+ if (defined $auto_id_idx) {
+ $key = $entry->[$auto_id_idx];
}
else {
- $key = join(":", @[EMAIL PROTECTED]);
+ my $id_indexes = $self->{id_indexes};
+ if ($#$id_indexes == 0) {
+ $key = $entry->[$id_indexes->[0]];
+ }
+ else {
+ $key = join(":", @[EMAIL PROTECTED]);
+ }
}
return($key);
}
sub _hash_to_key {
my ($self, $entry) = @_;
- my $id_attribs = $self->{id_attribs};
my ($key);
- if ($#$id_attribs == 0) {
- $key = $entry->{$id_attribs->[0]};
+ my $auto_id_idx = $self->{auto_id_idx};
+ if (defined $auto_id_idx) {
+ $key = $entry->[$auto_id_idx];
}
else {
- $key = join(":", @[EMAIL PROTECTED]);
+ my $id_attribs = $self->{id_attribs};
+ if ($#$id_attribs == 0) {
+ $key = $entry->{$id_attribs->[0]};
+ }
+ else {
+ $key = join(":", @[EMAIL PROTECTED]);
+ }
}
return($key);
}
@@ -580,7 +678,6 @@
my $STATUS_ACQUIRED = $self->{STATUS_ACQUIRED};
my $STATUS_UNACQUIRED = $self->{STATUS_UNACQUIRED};
my $status_attrib = $self->{status_attrib};
- my $id_attrib = $self->{id_attrib};
my $data = $self->{data};
my ($e, $ent, $entry_key);
@@ -595,7 +692,7 @@
if ($ent->[$status_idx] eq $STATUS_ACQUIRED) {
$self->_release_resources($ent);
}
-
$self->update($ent,[$self->{status_attrib}],[$self->{STATUS_UNACQUIRED}]);
+ $self->_unacquire_entry($ent);
print "UNACQUIRED[M]: [", join("|",@$ent), "]\n" if
($verbose);
last;
}
@@ -609,7 +706,7 @@
if ($ent->{$status_attrib} eq $STATUS_ACQUIRED) {
$self->_release_resources($ent);
}
-
$self->update($ent,[$self->{status_attrib}],[$self->{STATUS_UNACQUIRED}]);
+ $self->_unacquire_entry($ent);
print "UNACQUIRED[M]: {", join("|",%$ent), "}\n" if
($verbose);
last;
}
@@ -619,6 +716,13 @@
&App::sub_exit() if ($App::trace);
}
+sub _unacquire_entry {
+ &App::sub_entry if ($App::trace);
+ my ($self, $entry) = @_;
+
$self->update($entry,[$self->{status_attrib}],[$self->{STATUS_UNACQUIRED}]);
+ &App::sub_exit() if ($App::trace);
+}
+
#############################################################################
# locate()
#############################################################################
@@ -770,7 +874,7 @@
$status = $entry->{$status_attrib};
}
}
- my $status_counts = $self->_status_counts();
+ my $status_counts = $self->{status_counts};
$status_counts->{$status} += $count;
if ($from_status) {
$status_counts->{$from_status} -= $count;
@@ -781,6 +885,14 @@
&App::sub_exit() if ($App::trace);
}
+sub schedule_entry_acquisition {
+ &App::sub_entry if ($App::trace);
+ my $self = shift;
+ my %event = @_;
+ $self->{acquisition_event} = \%event;
+ &App::sub_exit() if ($App::trace);
+}
+
#############################################################################
# print()
#############################################################################
@@ -836,7 +948,7 @@
$fh = \*STDOUT if (!$fh);
foreach my $c (@{$self->{global_constraints}}) {
- printf $fh " GLOBAL CONSTRAINT: %4d/%4d\n", $c->[0]{$c->[2]},
$c->[1]{$c->[3]};
+ printf $fh " GLOBAL CONSTRAINT: %4d/%4d [%s]\n",
$c->[0]{$c->[2]}, $c->[1]{$c->[3]}, $c->[4];
}
foreach my $c (@{$self->{constraints}}) {
my (%seen);
@@ -924,7 +1036,7 @@
&App::sub_entry if ($App::trace);
my ($self, $fh) = @_;
$fh = \*STDOUT if (!$fh);
- my $status_counts = $self->_status_counts();
+ my $status_counts = $self->{status_counts};
foreach my $status (sort keys %$status_counts) {
printf $fh " %7s => %8d\n", $status, $status_counts->{$status};
}
@@ -937,8 +1049,8 @@
$fh = \*STDOUT if (!$fh);
my $resource_counts = $self->_resource_counts();
foreach my $resource_key (sort keys %{$resource_counts->{total}}) {
- printf $fh " %7s => %8d=tot %8d=buf %8d=BUFSIZE\n",
- $resource_key,
+ printf $fh " %9s => %8d=tot %8d=buf %8d=BUFSIZE\n",
+ "[$resource_key]",
$resource_counts->{total}{$resource_key},
$resource_counts->{buffer}{$resource_key},
$self->{BUFFER_SIZE};
@@ -1002,13 +1114,15 @@
sub _num_entries_from_status_counts {
&App::sub_entry if ($App::trace);
my ($self, $status) = @_;
- my $status_counts = $self->_status_counts();
+ my $status_counts = $self->{status_counts};
my ($num);
if ($status) {
$num = $status_counts->{$status};
}
else {
- $num = $status_counts->{N} + $status_counts->{W};
+ $num = $status_counts->{$self->{STATUS_UNBUFFERED}};
+ $num += $status_counts->{$self->{STATUS_UNACQUIRED}}
+ if ($self->{STATUS_UNBUFFERED} ne $self->{STATUS_UNACQUIRED});
}
$num = 0 if (!defined $num);
&App::sub_exit($num) if ($App::trace);
@@ -1047,7 +1161,7 @@
sub set_global_constraint {
&App::sub_entry if ($App::trace);
- my ($self, $counts, $limits, $count_attrib, $limit_attrib) = @_;
+ my ($self, $counts, $limits, $count_attrib, $limit_attrib,
$value_attrib, $hard_limit) = @_;
my $global_constraints = $self->{global_constraints};
if (!$global_constraints) {
@@ -1055,7 +1169,7 @@
$self->{global_constraints} = $global_constraints;
}
- CORE::push(@$global_constraints, [ $counts, $limits, $count_attrib,
$limit_attrib ]);
+ CORE::push(@$global_constraints, [ $counts, $limits, $count_attrib,
$limit_attrib, $value_attrib, $hard_limit ]);
&App::sub_exit() if ($App::trace);
}
@@ -1491,16 +1605,6 @@
&App::sub_exit() if ($App::trace);
}
-sub _status_counts {
- my ($self) = @_;
- my $status_counts = $self->{status_counts};
- if (!$status_counts) {
- $status_counts = {};
- $self->{status_counts} = $status_counts;
- }
- return($status_counts);
-}
-
sub _resource_counts {
my ($self) = @_;
my $resource_counts = $self->{resource_counts};
@@ -1559,7 +1663,7 @@
sub _maintain_queue_buffers {
&App::sub_entry if ($App::trace);
- my ($self, $op, $entry) = @_;
+ my ($self, $op, $entry, $columns, $values) = @_;
$op ||= "";
my $BUFFER_SIZE = $self->{BUFFER_SIZE};
@@ -1582,7 +1686,7 @@
$resource_counts->{buffer}{$resource_key}--;
}
elsif ($op eq "release") {
- $self->_release_in_mem($entry);
+ $self->_release_in_mem($entry, $columns, $values);
}
elsif ($op eq "unacquire") {
$resource_counts->{total}{$resource_key}++;
@@ -1705,58 +1809,3 @@
1;
-__END__
-
-#############################################################################
-# _check_low_queue_buffers()
-#############################################################################
-
-=head2 _check_low_queue_buffers()
-
- * Signature: $q->_check_low_queue_buffers($entry);
- * Param: $entry ARRAY/HASH
- * Return: undef
- * Throws: App::Exception::WorkQueue
- * Since: 0.01
-
- Sample Usage:
-
- $context = App->context();
- $q = $context->service("WorkQueue"); # or ...
- $q = $context->work_queue();
- $q->push({ name => "Joe", degrees => 3 });
- $q->push({ name => "Mike", degrees => 1 });
-
- $q->_check_low_queue_buffers($entry);
-
-Queues may be implemented with remote storage. In that case, there are local
-queue buffers which are maintained in memory to increase performance.
-There is conceptually a queue buffer for each combination of constraint
values
-(the constraint key of an entry).
-
-The Queue Buffers are checked each time an entry is push()ed.
-The count for the particular constraint key is incremented.
-Only if the count is below a configured high-water mark is it necessary
-to also push the entry onto the queue buffer.
-
-=cut
-
-sub _check_low_queue_buffers {
- &App::sub_entry if ($App::trace);
- my ($self, $entry) = @_;
-
- my $BUFFER_SIZE = $self->{BUFFER_SIZE};
- my $buffer_low = 0;
-
- my $resource_key = $self->_resource_key($entry);
- my $resource_counts = $self->_resource_counts();
- $resource_counts->{total}{$resource_key}++;
- if ($resource_counts->{buffer}{$resource_key} < $BUFFER_SIZE) {
- $resource_counts->{buffer}{$resource_key} ++;
- $buffer_low = 1;
- }
-
- &App::sub_exit($buffer_low) if ($App::trace);
- return($buffer_low);
-}
-
1.8 +283 -97 p5ee/App-WorkQueue/lib/App/WorkQueue/Repository.pm
Index: Repository.pm
===================================================================
RCS file: /cvs/public/p5ee/App-WorkQueue/lib/App/WorkQueue/Repository.pm,v
retrieving revision 1.7
retrieving revision 1.8
diff -u -r1.7 -r1.8
--- Repository.pm 10 Feb 2006 18:07:41 -0000 1.7
+++ Repository.pm 1 Mar 2006 03:39:51 -0000 1.8
@@ -13,6 +13,7 @@
use App::Repository;
my $verbose = $App::options{verbose};
+$verbose = 0 if (!$verbose || $verbose < 5);
=head1 NAME
@@ -41,30 +42,128 @@
sub _init {
&App::sub_entry if ($App::trace);
+ my ($self,$args) = @_;
+ $self->_init_attribs($args);
+ $self->_init_db($args);
+ &App::sub_exit() if ($App::trace);
+}
+
+sub _init_attribs {
+ &App::sub_entry if ($App::trace);
my ($self, $args) = @_;
+
+ $self->{type} = $self->{queue_type} || ($self->{columns} ? "ARRAY" :
"HASH");
+ my $db = $self->_db();
+ if (!$self->{columns}) {
+ $self->{columns} = $db->_get_default_columns($self->{table});
+ }
$self->SUPER::_init($args);
+
die "STATUS_UNBUFFERED not set on queue" if
(!$self->{STATUS_UNBUFFERED});
- die "queue_id_attrib not set on queue" if (!$self->{queue_id_attrib});
- die "queue_id not set on queue" if (!$self->{queue_id});
- if ($self->{auto_id_attrib}) {
- if ($self->{type} eq "ARRAY") {
- my $colidx = $self->_colidx();
- my $attrib = $self->{auto_id_attrib};
- my $columns = $self->{columns};
- if (defined $colidx->{$attrib}) {
- $self->{auto_id_idx} = $colidx->{$attrib};
- }
- else {
- push(@$columns, $attrib);
- $colidx->{$attrib} = $#$columns;
- $self->{auto_id_idx} = $#$columns;
- }
+ die "client_id_attrib not set on queue" if (!$self->{client_id_attrib});
+ die "client_id not set on queue" if (!$self->{client_id});
+
+ my $context = $self->{context};
+ my $options = $context->{options};
+ $context->schedule_event(
+ tag => "$self->{name}-heartbeat",
+ service_type => "WorkQueue",
+ name => $self->{name},
+ method => "_heartbeat",
+ time => time(),
+ interval => $options->{sleep_time} || 60,
+ scheduled => 1,
+ );
+
+ &App::sub_exit() if ($App::trace);
+}
+
+sub _init_db {
+ &App::sub_entry if ($App::trace);
+ my ($self, $args) = @_;
+ die "STATUS_UNBUFFERED [$self->{STATUS_UNBUFFERED}] != STATUS_UNACQUIRED
[$self->{STATUS_UNACQUIRED}] on queue"
+ if ($self->{STATUS_UNBUFFERED} ne $self->{STATUS_UNACQUIRED});
+ $self->_refresh_queue();
+ &App::sub_exit() if ($App::trace);
+}
+
+sub _heartbeat {
+ &App::sub_entry if ($App::trace);
+ my ($self) = @_;
+ my $context = $self->{context};
+ $context->log("[$self->{name}] _heartbeat\n");
+ $self->_refresh_queue();
+ &App::sub_exit() if ($App::trace);
+}
+
+sub _refresh_queue {
+ &App::sub_entry if ($App::trace);
+ my ($self) = @_;
+ my $context = $self->{context};
+ $context->log("[$self->{name}] _refresh_queue\n");
+ $self->_refresh_status_counts();
+ $self->_refresh_resource_counts();
+ $self->_maintain_queue_buffers();
+ if ($self->{acquisition_event}) {
+ my ($entry, %event);
+ while (1) {
+ $entry = $self->acquire();
+ last if (!$entry);
+ %event = %{$self->{acquisition_event}};
+ $event{args} = [ $entry ];
+ $context->send_event(\%event);
}
}
+ &App::sub_exit() if ($App::trace);
+}
+
+sub _refresh_status_counts {
+ &App::sub_entry if ($App::trace);
+ my ($self) = @_;
+ my $status_attrib = $self->{status_attrib};
+ my (%counts, $count_all);
+ $self->_count_entries_by_attrib_in_db($status_attrib, \%counts, undef,
+
"$self->{STATUS_UNBUFFERED},$self->{STATUS_UNACQUIRED},$self->{STATUS_ACQUIRED}");
+ $count_all = 0;
+ foreach my $key (keys %counts) {
+ $count_all += $counts{$key};
+ }
+ $counts{ALL} = $count_all;
+ $self->{status_counts} = \%counts;
+ &App::sub_exit() if ($App::trace);
+}
+
+sub _refresh_resource_counts {
+ &App::sub_entry if ($App::trace);
+ my ($self) = @_;
+ my (%counts);
+ # --------------
+ my $STATUS_UNBUFFERED = $self->{STATUS_UNBUFFERED};
+ my $STATUS_UNACQUIRED = $self->{STATUS_UNACQUIRED};
+ my $STATUS_ACQUIRED = $self->{STATUS_ACQUIRED};
+ my $status_attrib = $self->{status_attrib};
my $db = $self->_db();
- if ($self->{queue_id_attrib}) {
- $db->set($self->{table}, { $self->{queue_id_attrib} =>
$self->{queue_id} }, $self->{queue_id_attrib}, undef);
+ my $count_expr = "count(1)";
+ my $status = "$STATUS_UNBUFFERED,$STATUS_UNACQUIRED";
+ my $params = { $status_attrib => $status };
+ if ($self->{multiple_queues} && $self->{client_id_attrib} &&
$self->{client_id}) {
+ $params->{$self->{client_id_attrib}} = $self->{client_id};
+ }
+ if ($self->{params}) {
+ my $base_params = $self->{params};
+ foreach my $param (keys %$base_params) {
+ $params->{$param} = $base_params->{$param};
+ }
}
+ my (@resource_cols, $value, $resource_key);
+ my $rows = $db->get_rows($self->{table}, $params, [ @resource_cols,
$count_expr ], { group_by => [ @resource_cols ] });
+ foreach my $row (@$rows) {
+ $value = pop(@$row);
+ $resource_key = join(":", @$row);
+ $counts{$resource_key} = $value;
+ }
+ # --------------
+ $self->{resource_counts}{total} = \%counts;
&App::sub_exit() if ($App::trace);
}
@@ -78,7 +177,7 @@
primitive operations for this work queue are:
* $q->push($entry)
- * $entry = $q->acquire(\%params)
+ * $entry = $q->acquire()
* $q->release($entry)
* $q->unacquire($entry)
* @entries = $q->locate(\%params, \%options)
@@ -88,16 +187,32 @@
sub push {
&App::sub_entry if ($App::trace);
my ($self, $entry) = @_;
- #if ($self->{type} eq "ARRAY") {
- # die "wrong number of columns" if ($#{$self->{columns}} != $#$entry);
- #}
- my $columns = [$self->{status_attrib}, $self->{queue_id_attrib}];
- my $values = [$self->{STATUS_UNBUFFERED}, $self->{queue_id}];
+ my $columns = [$self->{status_attrib}, $self->{client_id_attrib}];
+ my $values = [$self->{STATUS_UNBUFFERED}, undef];
$self->_update_ref($entry, $columns, $values, 0, 1);
- my $alt_entry = $self->_push_in_db($entry);
- if ($alt_entry) {
- $self->_update_in_db($alt_entry, $columns, $values);
- $entry = $alt_entry;
+ eval {
+ $self->_push_in_db($entry);
+ };
+ if ($@) {
+ my $db = $self->_db();
+ if ($@ =~ /duplicate/i) {
+ my ($params, $alt_entry);
+ if ($self->{type} eq "ARRAY") {
+ $params = $self->_array_to_key_params($entry);
+ $alt_entry = $db->get_row($self->{table}, $params, $columns);
+ }
+ else {
+ $params = $self->_hash_to_key_params($entry);
+ $alt_entry = $db->get_hash($self->{table}, $params,
$columns);
+ }
+ if ($alt_entry) {
+ $self->_update_in_db($alt_entry, $columns, $values);
+ $entry = $alt_entry;
+ }
+ }
+ else {
+ die $@;
+ }
}
$self->_update_ref($entry, $columns, $values, 1);
$self->_maintain_queue_buffers("push",$entry);
@@ -111,26 +226,19 @@
my $db = $self->_db();
my $ref = ref($entry);
my $type = "";
- my ($alt_entry);
if ($ref eq "ARRAY") {
$type = "ARRAY";
die "tried to push entry of type [$type] onto queue of type
[$self->{type}]" if ($type ne $self->{type});
print "PUSHED[D]: [", join("|",@$entry), "]\n" if ($verbose);
my $columns = $self->{columns};
my $auto_id_idx = $self->{auto_id_idx};
- eval {
- if (defined $auto_id_idx) {
- $entry->[$auto_id_idx] = 0 if (!$entry->[$auto_id_idx]);
- $db->insert($self->{table}, $columns, $entry);
- $entry->[$auto_id_idx] = $db->_last_inserted_id();
- }
- else {
- $db->insert($self->{table}, $columns, $entry);
- }
- };
- if ($@) {
- my $params = $self->_array_to_key_params($entry);
- $alt_entry = $db->get_row($self->{table}, $params, $columns);
+ if (defined $auto_id_idx) {
+ $entry->[$auto_id_idx] = 0 if (!$entry->[$auto_id_idx]);
+ $db->insert($self->{table}, $columns, $entry);
+ $entry->[$auto_id_idx] = $db->_last_inserted_id() if
(!$entry->[$auto_id_idx]);
+ }
+ else {
+ $db->insert($self->{table}, $columns, $entry);
}
}
elsif ($ref) {
@@ -139,23 +247,16 @@
print "PUSHED[D]: {", join("|",%$entry), "}\n" if ($verbose);
my $columns = $self->{columns};
my $auto_id_attrib = $self->{auto_id_attrib};
- eval {
- $entry->{$auto_id_attrib} = 0 if (defined $auto_id_attrib &&
!$entry->{$auto_id_attrib});
- if ($columns) {
- $db->insert($self->{table}, $columns, $entry);
- }
- else {
- $db->insert($self->{table}, $entry);
- }
- $entry->{$auto_id_attrib} = $db->_last_inserted_id() if (defined
$auto_id_attrib);
- };
- if ($@) {
- my $params = $self->_hash_to_key_params($entry);
- $alt_entry = $db->get_hash($self->{table}, $params, $columns);
+ $entry->{$auto_id_attrib} = 0 if (defined $auto_id_attrib &&
!$entry->{$auto_id_attrib});
+ if ($columns) {
+ $db->insert($self->{table}, $columns, $entry);
+ }
+ else {
+ $db->insert($self->{table}, $entry);
}
+ $entry->{$auto_id_attrib} = $db->_last_inserted_id() if (defined
$auto_id_attrib && !$entry->{$auto_id_attrib});
}
- &App::sub_exit($alt_entry) if ($App::trace);
- return($alt_entry);
+ &App::sub_exit() if ($App::trace);
}
sub _array_to_key_params {
@@ -193,8 +294,8 @@
sub acquire {
&App::sub_entry if ($App::trace);
- my ($self, $sort_spec) = @_;
- my $entry = $self->_acquire_in_mem($sort_spec);
+ my ($self) = @_;
+ my $entry = $self->_acquire_in_mem();
$self->_maintain_queue_buffers("acquire",$entry) if ($entry); # load up
buffer if it's low
$self->print() if ($self->{verbose});
&App::sub_exit($entry) if ($App::trace);
@@ -204,36 +305,11 @@
sub release {
&App::sub_entry if ($App::trace);
my ($self, $entry, $columns, $values) = @_;
- $self->_release_in_mem($entry, $columns, $values);
- $self->_release_in_db($entry, $columns, $values);
- $self->_maintain_queue_buffers("release",$entry);
+ $self->_maintain_queue_buffers("release",$entry,$columns,$values);
$self->print() if ($self->{verbose});
&App::sub_exit() if ($App::trace);
}
-sub _release_in_db {
- &App::sub_entry if ($App::trace);
- my ($self, $entry, $columns, $values) = @_;
-
- my $STATUS_ACQUIRED = $self->{STATUS_ACQUIRED};
- my $STATUS_RELEASED = $self->{STATUS_RELEASED};
- my $status_attrib = $self->{status_attrib};
- my $data = $self->{data};
- my ($e, $ent, $entry_key);
-
- if ($self->{type} eq "ARRAY") {
- my $colidx = $self->_colidx();
- my $status_idx = $colidx->{$status_attrib};
- die "status_attribute [$status_attrib] does not exist on elements of
this queue" if (!defined $status_idx);
- $entry_key = $self->_array_to_key($entry);
- }
- else {
- $entry_key = $self->_hash_to_key($entry);
- }
-
- &App::sub_exit() if ($App::trace);
-}
-
sub _refill_buffer {
&App::sub_entry if ($App::trace);
my ($self, $resource_key) = @_;
@@ -248,12 +324,18 @@
my $params = $self->_resource_key_to_params($resource_key);
$params->{$status_attrib} = $STATUS_UNBUFFERED;
- if ($self->{queue_id_attrib} && $self->{queue_id}) {
- $params->{$self->{queue_id_attrib}} = $self->{queue_id};
+ if ($self->{multiple_queues} && $self->{client_id_attrib} &&
$self->{client_id}) {
+ $params->{$self->{client_id_attrib}} = $self->{client_id};
+ }
+ if ($self->{params}) {
+ my $base_params = $self->{params};
+ foreach my $param (keys %$base_params) {
+ $params->{$param} = $base_params->{$param};
+ }
}
my (%options, $new_entries);
- $options{endrow} = $BUFFER_SIZE;
+ $options{endrow} = 2*$BUFFER_SIZE - 1;
$self->_sort_spec_to_options(\%options) if ($self->{sort_spec});
my $columns = $self->{columns};
@@ -298,6 +380,13 @@
&App::sub_exit() if ($App::trace);
}
+sub _unacquire_entry {
+ &App::sub_entry if ($App::trace);
+ my ($self, $entry) = @_;
+ $self->update($entry,[$self->{status_attrib},
$self->{client_id_attrib}],[$self->{STATUS_UNACQUIRED}, undef]);
+ &App::sub_exit() if ($App::trace);
+}
+
sub locate {
&App::sub_entry if ($App::trace);
my ($self, $params, $options) = @_;
@@ -317,8 +406,14 @@
my $STATUS_ACQUIRED = $self->{STATUS_ACQUIRED};
my $status_attrib = $self->{status_attrib};
$params{$status_attrib} =
"$STATUS_UNBUFFERED,$STATUS_UNACQUIRED,$STATUS_ACQUIRED";
- if ($self->{queue_id_attrib} && $self->{queue_id}) {
- $params{$self->{queue_id_attrib}} = $self->{queue_id};
+ if ($self->{multiple_queues} && $self->{client_id_attrib} &&
$self->{client_id}) {
+ $params{$self->{client_id_attrib}} = $self->{client_id};
+ }
+ if ($self->{params}) {
+ my $base_params = $self->{params};
+ foreach my $param (keys %$base_params) {
+ $params->{$param} = $base_params->{$param};
+ }
}
my ($entries);
my $db = $self->_db();
@@ -334,14 +429,86 @@
return(@$entries);
}
+sub _acquire_entry {
+ &App::sub_entry if ($App::trace);
+ my ($self, $entry) = @_;
+ my $acquired = 1;
+ my $columns = [$self->{status_attrib}];
+ my $values = [$self->{STATUS_ACQUIRED}];
+ if ($self->_update_is_different($entry, $columns, $values)) {
+ my $db = $self->_db();
+ my ($params);
+ if ($self->{type} eq "ARRAY") {
+ $params = $self->_array_to_key_params($entry);
+ }
+ else {
+ $params = $self->_hash_to_key_params($entry);
+ }
+ if ($self->{client_id_attrib} && $self->{client_id}) {
+ $params->{$self->{client_id_attrib}} = undef;
+ CORE::push(@$columns, $self->{client_id_attrib});
+ CORE::push(@$values, $self->{client_id});
+ }
+ $acquired = $db->update($self->{table}, $params, $columns, $values);
+ $self->_update_ref($entry, $columns, $values) if ($acquired);
+ }
+ else {
+ $acquired = 0;
+ }
+ &App::sub_exit($acquired) if ($App::trace);
+ return($acquired);
+}
+
sub update {
&App::sub_entry if ($App::trace);
my ($self, $entry, $columns, $values) = @_;
- $self->_update_ref($entry, $columns, $values);
- $self->_update_in_db($entry, $columns, $values);
+ if ($self->_update_is_different($entry, $columns, $values)) {
+ $self->_update_ref($entry, $columns, $values);
+ $self->_update_in_db($entry, $columns, $values);
+ }
&App::sub_exit() if ($App::trace);
}
+sub _update_is_different {
+ &App::sub_entry if ($App::trace);
+ my ($self, $entry, $columns, $values) = @_;
+ my $different = 0;
+ my ($value);
+ if ($self->{type} eq "ARRAY") {
+ my $colidx = $self->_colidx();
+ for (my $i = 0; $i <= $#$columns; $i++) {
+ $value = $entry->[$colidx->{$columns->[$i]}];
+ if (defined $value && defined $values->[$i]) {
+ if ($value ne $values->[$i]) {
+ $different = 1;
+ last;
+ }
+ }
+ elsif (defined $value || defined $values->[$i]) {
+ $different = 1;
+ last;
+ }
+ }
+ }
+ else {
+ for (my $i = 0; $i <= $#$columns; $i++) {
+ $value = $entry->{$columns->[$i]};
+ if (defined $value && defined $values->[$i]) {
+ if ($value ne $values->[$i]) {
+ $different = 1;
+ last;
+ }
+ }
+ elsif (defined $value || defined $values->[$i]) {
+ $different = 1;
+ last;
+ }
+ }
+ }
+ &App::sub_exit($different) if ($App::trace);
+ return($different);
+}
+
sub _update_in_db {
&App::sub_entry if ($App::trace);
my ($self, $entry, $columns, $values) = @_;
@@ -374,7 +541,7 @@
sub _count_entries_by_attrib_in_db {
&App::sub_entry if ($App::trace);
- my ($self, $key_attrib, $counts, $count_attrib) = @_;
+ my ($self, $key_attrib, $counts, $count_attrib, $status) = @_;
my $STATUS_UNBUFFERED = $self->{STATUS_UNBUFFERED};
my $STATUS_UNACQUIRED = $self->{STATUS_UNACQUIRED};
my $STATUS_ACQUIRED = $self->{STATUS_ACQUIRED};
@@ -382,9 +549,16 @@
my $db = $self->_db();
my $count_expr = "count(1)";
$count_expr = "sum($count_attrib)" if ($count_attrib);
- my $params = { $status_attrib => "$STATUS_UNBUFFERED,$STATUS_UNACQUIRED"
};
- if ($self->{queue_id_attrib} && $self->{queue_id}) {
- $params->{$self->{queue_id_attrib}} = $self->{queue_id};
+ $status = "$STATUS_UNBUFFERED,$STATUS_UNACQUIRED" if (!$status);
+ my $params = { $status_attrib => $status };
+ if ($self->{multiple_queues} && $self->{client_id_attrib} &&
$self->{client_id}) {
+ $params->{$self->{client_id_attrib}} = $self->{client_id};
+ }
+ if ($self->{params}) {
+ my $base_params = $self->{params};
+ foreach my $param (keys %$base_params) {
+ $params->{$param} = $base_params->{$param};
+ }
}
my $rows = $db->get_rows($self->{table}, $params, [ $key_attrib,
$count_expr ], { group_by => [ $key_attrib ] });
foreach my $row (@$rows) {
@@ -429,7 +603,19 @@
$columns = $self->{columns} if (!$columns);
my (%options);
$self->_sort_spec_to_options(\%options);
- my $rows = $db->get_rows($self->{table}, {}, $columns, \%options);
+ my $params = {
+ $self->{status_attrib} =>
"$self->{STATUS_UNBUFFERED},$self->{STATUS_UNACQUIRED},$self->{STATUS_ACQUIRED}",
+ };
+ if ($self->{multiple_queues} && $self->{client_id_attrib} &&
$self->{client_id}) {
+ $params->{$self->{client_id_attrib}} = $self->{client_id};
+ }
+ if ($self->{params}) {
+ my $base_params = $self->{params};
+ foreach my $param (keys %$base_params) {
+ $params->{$param} = $base_params->{$param};
+ }
+ }
+ my $rows = $db->get_rows($self->{table}, $params, $columns, \%options);
foreach my $row (@$rows) {
if ($format) {
printf $fh $format, @$row;