cvsuser 06/02/05 20:12:07
Added: App-WorkQueue/lib/App WorkQueue.pm
Log:
new
Revision Changes Path
1.1 p5ee/App-WorkQueue/lib/App/WorkQueue.pm
Index: WorkQueue.pm
===================================================================
#############################################################################
## $Id: WorkQueue.pm,v 1.1 2006/02/06 04:12:07 spadkins Exp $
#############################################################################
package App::Context;
sub work_queue {
my ($self, $service_name, @args) = @_;
return($self->service("WorkQueue", $service_name, @args));
}
package App::WorkQueue;
use App;
use App::Service;
@ISA = ( "App::Service" );
use strict;
my $verbose = $App::options{verbose};
=head1 NAME
App::WorkQueue - Interface for a work queue
=head1 SYNOPSIS
use App;
$context = App->context();
$q = $context->service("WorkQueue");
...
=head1 DESCRIPTION
A WorkQueue is for channeling work from a variety of requesting sources to a
limited set of workers who work the queue.
It implements a new service in the App-Context framework called a
WorkQueue. A normal queue supports push() and pop() operations
where the entries push()ed onto the queue come out in a first-in-
first-out (FIFO) basis. A WorkQueue however is used to control the
processing of units of work, where the order that entries are
processed is prioritized subject to various constraints.
Therefore, a WorkQueue supports the following operations.
push() - add a unit of work to the queue
acquire() - get the highest priority entry subject to constraints
release() - remove an (acquired) entry from the queue
unacquire() - return an (acquired) entry to the unacquired state
Internally, the WorkQueue can hold HASHREF's (the default) or ARRAYREF's.
However, if ARRAYREF's are to be stored, then a {columns} array and a
{colidx} hash must also be stored on the object.
=head1 INTERNAL ELEMENTS
$self->{data} = [];
$self->{columns} = [];
$self->{colidx} = {};
=cut
#############################################################################
# CONSTANTS
#############################################################################
my $GCONSTR_COUNTS = 0;
my $GCONSTR_LIMITS = 1;
my $GCONSTR_COUNT_ATTRIB = 2;
my $GCONSTR_LIMIT_ATTRIB = 3;
my $CONSTR_COUNTS = 0;
my $CONSTR_LIMITS = 1;
my $CONSTR_KEY_ATTRIB = 2;
my $CONSTR_COUNT_ATTRIB = 3;
my $CONSTR_KEY_IDX = 4;
my $CONSTR_COUNT_IDX = 5;
#############################################################################
# CONSTRUCTOR METHODS
#############################################################################
=head1 Constructor Methods:
#############################################################################
# Method: _init()
#############################################################################
=head2 _init()
The _init() method is called from within the standard Service
constructor.
It allows a WorkQueue to customize the behavior of the
constructor.
* Signature: _init($named)
* Param: $named {} [in]
* Return: void
* Throws: App::Exception
* Since: 0.01
Sample Usage:
$service->_init(\%args);
=cut
sub _init {
&App::sub_entry if ($App::trace);
my ($self, $args) = @_;
$self->SUPER::_init($args);
$self->{data} = [];
$self->{type} = ($self->{columns}) ? "ARRAY" : "HASH";
die "status_attrib not set on queue" if (!$self->{status_attrib});
#die "STATUS_UNBUFFERED not set on queue" if
(!$self->{STATUS_UNBUFFERED});
die "STATUS_UNACQUIRED not set on queue" if (!$self->{STATUS_UNACQUIRED});
die "STATUS_ACQUIRED not set on queue" if (!$self->{STATUS_ACQUIRED});
die "STATUS_RELEASED not set on queue" if (!$self->{STATUS_RELEASED});
my $id_attrib = $self->{id_attrib} || die "id_attrib not set on queue";
if ($self->{type} eq "ARRAY") {
my $colidx = $self->_colidx();
my $id_attribs = [split(/,/,$id_attrib)];
my (@id_idx);
foreach my $attrib (@$id_attribs) {
push(@id_idx, $colidx->{$attrib});
}
$self->{id_attribs} = $id_attribs;
$self->{id_indexes} = [EMAIL PROTECTED];
}
else {
$self->{id_attribs} = [split(/,/,$id_attrib)];
}
if ($self->{sort_spec}) {
$self->_analyze_sort_spec();
}
$self->{verbose} = $App::options{verbose};
&App::sub_exit() if ($App::trace);
}
=cut
#############################################################################
# new()
#############################################################################
=head2 new()
The constructor is inherited from
L<C<App::Service>|App::Service/"new()">.
=cut
#############################################################################
# PUBLIC METHODS
#############################################################################
=head1 Public Methods:
Unlike a regular queue where the primitive operations are push()/pop(), the
primitive operations for this work queue are:
* $q->push($entry)
* $entry = $q->acquire(\%params)
* $q->release($entry)
* $q->unacquire($entry)
* @entries = $q->locate(\%params, \%options)
=cut
#############################################################################
# push()
#############################################################################
=head2 push()
* Signature: $q->push($entry);
* Param: $entry HASH/ARRAY
* Return: undef
* Throws: App::Exception::WorkQueue
* Since: 0.01
Sample Usage:
$context = App->context();
$q = $context->service("WorkQueue"); # or ...
$q = $context->work_queue();
$q->push({ name => "Joe", degrees => 3 });
$q->push({ name => "Mike", degrees => 1 });
=cut
sub push {
&App::sub_entry if ($App::trace);
my ($self, $entry) = @_;
# We explicitly _update_ref() rather than update() because the entry is
not
# in the remote store yet.
$self->_update_ref($entry,[$self->{status_attrib}],[$self->{STATUS_UNACQUIRED}]);
die "push() not implemented";
&App::sub_exit() if ($App::trace);
}
sub _push_in_mem {
&App::sub_entry if ($App::trace);
my ($self, $entry, $release_lowest) = @_;
my $ref = ref($entry);
my $type = "";
if ($ref eq "ARRAY") {
$type = "ARRAY";
print "PUSHED[M]: [", join("|",@$entry), "]\n" if ($verbose);
}
elsif ($ref) {
$type = "HASH";
print "PUSHED[M]: {", join("|",%$entry), "}\n" if ($verbose);
}
die "tried to push entry of type [$type] onto queue of type
[$self->{type}]" if ($type ne $self->{type});
my $entries = $self->{data};
my ($removed_entry);
if (!$self->{sort_spec} || $#$entries == -1) {
if ($release_lowest) {
# do nothing. this entry is the "lowest", so we shouldn't even
push it on.
}
else {
CORE::push(@$entries, $entry);
$self->update($entry,[$self->{status_attrib}],[$self->{STATUS_UNACQUIRED}]);
}
}
else {
my $inserted = 0;
if ($release_lowest) {
my $removed = 0;
my ($ent);
my $resource_key = $self->_resource_key($entry);
for (my $i = $#$entries; $i >= 0; $i--) {
$ent = $entries->[$i];
if ($self->_resource_key($ent) eq $resource_key) {
# IF not inserted yet AND new entry is lower or same
priority THEN
if (!$inserted && $self->_compare_entries($entry, $ent) >
-1) {
if (!$removed) { # "insert and remove self"
$inserted = 1;
$removed = 1;
}
else {
$self->update($entry,[$self->{status_attrib}],[$self->{STATUS_UNACQUIRED}]);
splice(@$entries, $i+1, 0, $entry);
$inserted = 1;
}
}
if (!$removed) {
$removed_entry = $entries->[$i];
$self->update($removed_entry,[$self->{status_attrib}],[$self->{STATUS_UNBUFFERED}]);
splice(@$entries, $i, 1);
$removed = 1;
}
}
else {
if (!$inserted && $self->_compare_entries($entry, $ent) >
-1) {
$self->update($entry,[$self->{status_attrib}],[$self->{STATUS_UNACQUIRED}]);
splice(@$entries, $i+1, 0, $entry);
$inserted = 1;
}
}
last if ($inserted && $removed);
}
}
else {
for (my $i = $#$entries; $i >= 0; $i--) {
if ($self->_compare_entries($entry, $entries->[$i]) > -1) {
$self->update($entry,[$self->{status_attrib}],[$self->{STATUS_UNACQUIRED}]);
splice(@$entries, $i+1, 0, $entry);
$inserted = 1;
last;
}
}
}
if (!$inserted) {
$self->update($entry,[$self->{status_attrib}],[$self->{STATUS_UNACQUIRED}]);
unshift(@$entries, $entry);
}
}
&App::sub_exit() if ($App::trace);
}
sub _analyze_sort_spec {
my ($self) = @_;
my $sort_spec = $self->{sort_spec};
if ($sort_spec) {
my $sort_columns = [];
my $sort_idx = [];
my $direction = [];
my $numeric = [];
my $colidx = $self->_colidx();
my @sort_spec = split(/,/, $sort_spec);
foreach my $col (@sort_spec) {
if ($col =~ /^([NC]?)([+-]?)(.*)$/) {
CORE::push(@$direction, (($2 eq "-") ? -1 : 1));
CORE::push(@$numeric, (($1 eq "N") ? 1 : 0));
CORE::push(@$sort_columns, $3);
if ($self->{type} eq "ARRAY") {
CORE::push(@$sort_idx, $colidx->{$3});
}
}
else {
CORE::push(@$sort_columns, $col);
CORE::push(@$direction, 1);
CORE::push(@$numeric, 0);
}
}
$self->{sort_columns} = $sort_columns;
$self->{sort_idx} = $sort_idx;
$self->{direction} = $direction;
$self->{numeric} = $numeric;
}
else {
$self->{sort_columns} = [];
$self->{sort_idx} = [];
$self->{direction} = [];
$self->{numeric} = [];
}
}
sub _compare_entries {
my ($self, $a, $b) = @_;
my $sign = 0;
my $columns = $self->{sort_columns};
my $sort_idx = $self->{sort_idx};
my $direction = $self->{direction};
my $numeric = $self->{numeric};
if (ref($a) eq "ARRAY") {
for (my $c = 0; $c <= $#$sort_idx; $c++) {
if ($numeric && $numeric->[$c]) {
$sign = ($a->[$sort_idx->[$c]] <=> $b->[$sort_idx->[$c]]) *
$direction->[$c];
#print "N: $sign = ($a->[$sort_idx->[$c]] <=>
$b->[$sort_idx->[$c]]) * $direction->[$c];\n";
}
else {
$sign = ($a->[$sort_idx->[$c]] cmp $b->[$sort_idx->[$c]]) *
$direction->[$c];
#print "C: $sign = ($a->[$sort_idx->[$c]] <=>
$b->[$sort_idx->[$c]]) * $direction->[$c];\n";
}
last if ($sign);
}
#print "A: [", join("|",@$a), "]\n";
#print "B: [", join("|",@$b), "]\n";
#print " => $sign\n";
}
else {
for (my $c = 0; $c <= $#$columns; $c++) {
if ($numeric && $numeric->[$c]) {
$sign = ($a->{$columns->[$c]} <=> $b->{$columns->[$c]}) *
$direction->[$c];
}
else {
$sign = ($a->{$columns->[$c]} cmp $b->{$columns->[$c]}) *
$direction->[$c];
}
last if ($sign);
}
}
return($sign);
}
#############################################################################
# acquire()
#############################################################################
=head2 acquire()
* Signature: $entry = $q->acquire();
* Signature: $entry = $q->acquire($sort_spec);
* Param: $sort_spec string
* Return: $entry HASH/ARRAY
* Throws: App::Exception::WorkQueue
* Since: 0.01
Sample Usage:
my $entry = $q->acquire();
=cut
sub acquire {
&App::sub_entry if ($App::trace);
my ($self, $sort_spec) = @_;
my $entry = undef;
die "acquire() not implemented";
&App::sub_exit($entry) if ($App::trace);
return($entry);
}
sub _acquire_in_mem {
&App::sub_entry if ($App::trace);
my ($self, $sort_spec) = @_;
my ($entry);
my $STATUS_UNACQUIRED = $self->{STATUS_UNACQUIRED};
my $STATUS_ACQUIRED = $self->{STATUS_ACQUIRED};
my $status_attrib = $self->{status_attrib};
my $entries = $self->{data};
if ($self->_global_resources_exist()) {
if ($self->{type} eq "ARRAY") {
my $colidx = $self->_colidx();
my $status_idx = $colidx->{$status_attrib};
die "status_attribute [$status_attrib] does not exist on elements
of this queue" if (!defined $status_idx);
foreach my $e (@$entries) {
# print ">>> ENTRY: status=[$e->[$status_idx]]
idx=[$status_idx] (should be $STATUS_UNACQUIRED)\n";
next if ($e->[$status_idx] ne $STATUS_UNACQUIRED);
if ($self->_acquire_resources($e)) {
$entry = $e;
$self->update($entry,[$self->{status_attrib}],[$self->{STATUS_ACQUIRED}]);
print "ACQUIRED[M]: [", join("|",@$e), "]\n" if
($verbose);
last;
}
}
}
else {
foreach my $e (@$entries) {
next if ($e->{$status_attrib} ne $STATUS_UNACQUIRED);
if ($self->_acquire_resources($e)) {
$entry = $e;
$self->update($entry,[$self->{status_attrib}],[$self->{STATUS_ACQUIRED}]);
print "ACQUIRED[M]: {", join("|",%$e), "}\n" if
($verbose);
last;
}
}
}
}
print "ACQUIRED[M]: undef\n" if (!$entry && $verbose);
&App::sub_exit($entry) if ($App::trace);
return($entry);
}
#############################################################################
# release()
#############################################################################
=head2 release()
* Signature: $q->release($entry);
* Signature: $q->release($entry, $columns, $values);
* Param: $entry HASH/ARRAY
* Return: undef
* Throws: App::Exception::WorkQueue
* Since: 0.01
Sample Usage:
$context = App->context();
$q = $context->service("WorkQueue"); # or ...
$q = $context->work_queue();
$q->release({ name => "Joe", degrees => 3 });
$q->release({ name => "Mike", degrees => 1 });
=cut
sub release {
&App::sub_entry if ($App::trace);
my ($self, $entry, $columns, $values) = @_;
die "release() not implemented";
&App::sub_exit() if ($App::trace);
}
sub _release_in_mem {
&App::sub_entry if ($App::trace);
my ($self, $entry, $columns, $values) = @_;
my $STATUS_ACQUIRED = $self->{STATUS_ACQUIRED};
my $STATUS_RELEASED = $self->{STATUS_RELEASED};
my $status_attrib = $self->{status_attrib};
my $data = $self->{data};
my ($e, $ent, $entry_key);
my @columns = ( $self->{status_attrib} );
my @values = ( $self->{STATUS_RELEASED} );
if ($columns) {
CORE::push(@columns, @$columns);
CORE::push(@values, @$values);
}
if ($self->{type} eq "ARRAY") {
my $colidx = $self->_colidx();
my $status_idx = $colidx->{$status_attrib};
die "status_attribute [$status_attrib] does not exist on elements of
this queue" if (!defined $status_idx);
$entry_key = $self->_array_to_key($entry);
for ($e = 0; $e <= $#$data; $e++) {
$ent = $data->[$e];
if ($self->_array_to_key($ent) eq $entry_key) {
if ($ent->[$status_idx] eq $STATUS_ACQUIRED) {
$self->_release_resources($ent);
}
$self->update($ent,[EMAIL PROTECTED],[EMAIL PROTECTED]);
splice(@$data, $e, 1);
print "RELEASED[M]: [", join("|",@$entry), "]\n" if
($verbose);
last;
}
}
}
else {
$entry_key = $self->_hash_to_key($entry);
for ($e = 0; $e <= $#$data; $e++) {
$ent = $data->[$e];
if ($self->_hash_to_key($ent) eq $entry_key) {
if ($ent->{$status_attrib} eq $STATUS_ACQUIRED) {
$self->_release_resources($ent);
}
$self->update($ent,[EMAIL PROTECTED],[EMAIL PROTECTED]);
splice(@$data, $e, 1);
print "RELEASED[M]: {", join("|",%$entry), "}\n" if
($verbose);
last;
}
}
}
&App::sub_exit() if ($App::trace);
}
sub _array_to_key {
my ($self, $entry) = @_;
my $id_indexes = $self->{id_indexes};
my ($key);
if ($#$id_indexes == 0) {
$key = $entry->[$id_indexes->[0]];
}
else {
$key = join(":", @[EMAIL PROTECTED]);
}
return($key);
}
sub _hash_to_key {
my ($self, $entry) = @_;
my $id_attribs = $self->{id_attribs};
my ($key);
if ($#$id_attribs == 0) {
$key = $entry->{$id_attribs->[0]};
}
else {
$key = join(":", @[EMAIL PROTECTED]);
}
return($key);
}
#############################################################################
# unacquire()
#############################################################################
=head2 unacquire()
* Signature: $q->unacquire($entry);
* Param: $entry HASH/ARRAY
* Return: undef
* Throws: App::Exception::WorkQueue
* Since: 0.01
Sample Usage:
$context = App->context();
$q = $context->service("WorkQueue"); # or ...
$q = $context->work_queue();
$q->unacquire({ name => "Joe", degrees => 3 });
$q->unacquire({ name => "Mike", degrees => 1 });
=cut
sub unacquire {
&App::sub_entry if ($App::trace);
my ($self, $entry) = @_;
die "unacquire() not implemented";
&App::sub_exit() if ($App::trace);
}
sub _unacquire_in_mem {
&App::sub_entry if ($App::trace);
my ($self, $entry) = @_;
my $STATUS_ACQUIRED = $self->{STATUS_ACQUIRED};
my $STATUS_UNACQUIRED = $self->{STATUS_UNACQUIRED};
my $status_attrib = $self->{status_attrib};
my $id_attrib = $self->{id_attrib};
my $data = $self->{data};
my ($e, $ent, $entry_key);
if ($self->{type} eq "ARRAY") {
my $colidx = $self->_colidx();
my $status_idx = $colidx->{$status_attrib};
die "status_attribute [$status_attrib] does not exist on elements of
this queue" if (!defined $status_idx);
$entry_key = $self->_array_to_key($entry);
for ($e = 0; $e <= $#$data; $e++) {
$ent = $data->[$e];
if ($self->_array_to_key($ent) eq $entry_key) {
if ($ent->[$status_idx] eq $STATUS_ACQUIRED) {
$self->_release_resources($ent);
}
$self->update($ent,[$self->{status_attrib}],[$self->{STATUS_UNACQUIRED}]);
print "UNACQUIRED[M]: [", join("|",@$ent), "]\n" if
($verbose);
last;
}
}
}
else {
$entry_key = $self->_hash_to_key($entry);
for ($e = 0; $e <= $#$data; $e++) {
$ent = $data->[$e];
if ($self->_hash_to_key($ent) eq $entry_key) {
if ($ent->{$status_attrib} eq $STATUS_ACQUIRED) {
$self->_release_resources($ent);
}
$self->update($ent,[$self->{status_attrib}],[$self->{STATUS_UNACQUIRED}]);
print "UNACQUIRED[M]: {", join("|",%$ent), "}\n" if
($verbose);
last;
}
}
}
&App::sub_exit() if ($App::trace);
}
#############################################################################
# locate()
#############################################################################
=head2 locate()
* Signature: @entries = $q->locate();
* Signature: @entries = $q->locate($params);
* Signature: @entries = $q->locate($params, $options);
* Param: $params HASH
* Param: $options HASH
* Return: @entries
* Throws: App::Exception::WorkQueue
* Since: 0.01
Sample Usage:
my @entries = $q->locate();
=cut
sub locate {
&App::sub_entry if ($App::trace);
my ($self, $params, $options) = @_;
my @entries = ();
die "locate() not implemented";
&App::sub_exit(@entries) if ($App::trace);
return(@entries);
}
sub _locate_in_mem {
&App::sub_entry if ($App::trace);
my ($self, $params, $options) = @_;
my (@entries, @param_name, @param_value, $p, $matches);
if ($params && %$params) {
@param_name = (keys %$params);
for ($p = 0; $p <= $#param_name; $p++) {
$param_value[$p] = $params->{$param_name[$p]};
}
}
my $entries = $self->{data};
if ($#param_name == -1) {
@entries = @$entries;
}
else {
if ($self->{type} eq "ARRAY") {
my $colidx = $self->_colidx();
my (@param_idx);
for ($p = 0; $p <= $#param_name; $p++) {
$param_idx[$p] = $colidx->{$param_name[$p]};
}
foreach my $e (@$entries) {
$matches = 1;
for ($p = 0; $p <= $#param_name; $p++) {
if ($e->[$param_idx[$p]] ne $param_value[$p]) {
$matches = 0;
last;
}
}
CORE::push(@entries, $e) if ($matches);
}
}
else {
foreach my $e (@$entries) {
$matches = 1;
for ($p = 0; $p <= $#param_name; $p++) {
if ($e->{$param_name[$p]} ne $param_value[$p]) {
$matches = 0;
last;
}
}
CORE::push(@entries, $e) if ($matches);
}
}
}
&App::sub_exit(@entries) if ($App::trace);
return(@entries);
}
#############################################################################
# update()
#############################################################################
=head2 update()
* Signature: $q->update($entry, $columns, $values);
* Param: $entry HASH/ARRAY
* Param: $columns ARRAY
* Param: $values ARRAY
* Return: @entries
* Throws: App::Exception::WorkQueue
* Since: 0.01
Sample Usage:
my @entries = $q->update();
=cut
sub update {
&App::sub_entry if ($App::trace);
my ($self, $entry, $columns, $values) = @_;
die "update() not implemented";
&App::sub_exit() if ($App::trace);
}
sub _update_ref {
&App::sub_entry if ($App::trace);
my ($self, $entry, $columns, $values, $one_way, $raw) = @_;
my $status_attrib = $self->{status_attrib};
if ($self->{type} eq "ARRAY") {
my $colidx = $self->_colidx();
my $status_idx = $colidx->{$status_attrib};
for (my $c = 0; $c <= $#$columns; $c++) {
if (!$raw && $columns->[$c] eq $status_attrib) {
$self->_maintain_status_counts(1, $values->[$c], $one_way ?
undef : $entry->[$status_idx]);
}
$entry->[$colidx->{$columns->[$c]}] = $values->[$c];
}
}
else {
for (my $c = 0; $c <= $#$columns; $c++) {
if (!$raw && $columns->[$c] eq $status_attrib) {
$self->_maintain_status_counts(1, $values->[$c], $one_way ?
undef : $entry->{$status_attrib});
}
$entry->{$columns->[$c]} = $values->[$c];
}
}
&App::sub_exit() if ($App::trace);
}
sub _maintain_status_counts {
&App::sub_entry if ($App::trace);
my ($self, $count, $status, $from_status) = @_;
if (ref($status)) {
my $entry = $status;
my $status_attrib = $self->{status_attrib};
if ($self->{type} eq "ARRAY") {
my $colidx = $self->_colidx();
$status = $entry->[$colidx->{$status_attrib}];
}
else {
$status = $entry->{$status_attrib};
}
}
my $status_counts = $self->_status_counts();
$status_counts->{$status} += $count;
if ($from_status) {
$status_counts->{$from_status} -= $count;
}
else {
$status_counts->{ALL} += $count;
}
&App::sub_exit() if ($App::trace);
}
#############################################################################
# print()
#############################################################################
=head2 print()
* Signature: $q->print($fh);
* Param: $fh FILEHANDLE
* Return: void
* Throws: App::Exception::WorkQueue
* Since: 0.01
Sample Usage:
$q->print(\*STDOUT);
=cut
sub print {
&App::sub_entry if ($App::trace);
my ($self, $fh) = @_;
$fh = \*STDOUT if (!$fh);
print $fh "ENTRIES:\n";
$self->print_entries($fh);
print $fh "CONSTRAINTS:\n";
$self->print_constraints($fh);
&App::sub_exit() if ($App::trace);
}
#############################################################################
# print_constraints()
#############################################################################
=head2 print_constraints()
* Signature: $q->print_constraints();
* Signature: $q->print_constraints($fh);
* Param: $fh FILEHANDLE
* Return: void
* Throws: App::Exception::WorkQueue
* Since: 0.01
Sample Usage:
$q->print_constraints();
$q->print_constraints(\*STDOUT);
=cut
sub print_constraints {
&App::sub_entry if ($App::trace);
my ($self, $fh) = @_;
$fh = \*STDOUT if (!$fh);
foreach my $c (@{$self->{global_constraints}}) {
printf $fh " GLOBAL CONSTRAINT: %4d/%4d\n", $c->[0]{$c->[2]},
$c->[1]{$c->[3]};
}
foreach my $c (@{$self->{constraints}}) {
my (%seen);
printf $fh " -----------------------------\n";
foreach my $key (sort keys %{$c->[1]}) {
$seen{$key} = 1;
printf $fh " %8s CONSTRAINT: %4d/%4d [%s]\n", $key,
$c->[0]{$key}, $c->[1]{$key}, $c->[2];
}
foreach my $key (sort keys %{$c->[0]}) {
if (!$seen{$key}) {
$seen{$key} = 1;
printf $fh " %8s CONSTRAINT: %4d/%4d [%s]\n", $key,
$c->[0]{$key}, $c->[1]{$key}, $c->[2];
}
}
}
&App::sub_exit() if ($App::trace);
}
#############################################################################
# print_entries()
#############################################################################
=head2 print_entries()
* Signature: $q->print_entries($fh, $format, $columns);
* Param: $fh FILEHANDLE
* Param: $format string
* Param: $columns ARRAY
* Return: void
* Throws: App::Exception::WorkQueue
* Since: 0.01
Sample Usage:
$q->print_entries(\*STDOUT, "%s %s %s", ["ip","hostname","path"]);
=cut
sub print_entries {
&App::sub_entry if ($App::trace);
my ($self, $fh, $format, $columns) = @_;
die "print_entries() not implemented";
&App::sub_exit() if ($App::trace);
}
sub _print_entries_in_mem {
&App::sub_entry if ($App::trace);
my ($self, $fh, $format, $columns) = @_;
$fh = \*STDOUT if (!$fh);
my ($entry);
my $entries = $self->{data};
if ($self->{type} eq "ARRAY") {
my $colidx = $self->_colidx();
my (@idx, $i);
for ($i = 0; $i <= $#$columns; $i++) {
$idx[$i] = $colidx->{$columns->[$i]};
}
foreach my $entry (@$entries) {
if ($format) {
printf $fh $format, @[EMAIL PROTECTED];
}
else {
print $fh " [", join("|",@$entry), "]\n";
}
}
}
else {
foreach my $entry (@$entries) {
if ($format) {
printf $fh $format, @[EMAIL PROTECTED];
}
else {
print $fh " {", join("|",%$entry), "}\n";
}
}
}
&App::sub_exit() if ($App::trace);
}
sub print_status_counts {
&App::sub_entry if ($App::trace);
my ($self, $fh) = @_;
$fh = \*STDOUT if (!$fh);
my $status_counts = $self->_status_counts();
foreach my $status (sort keys %$status_counts) {
printf $fh " %7s => %8d\n", $status, $status_counts->{$status};
}
&App::sub_exit() if ($App::trace);
}
sub print_resource_counts {
&App::sub_entry if ($App::trace);
my ($self, $fh) = @_;
$fh = \*STDOUT if (!$fh);
my $resource_counts = $self->_resource_counts();
foreach my $resource_key (sort keys %{$resource_counts->{total}}) {
printf $fh " %7s => %8d=tot %8d=buf\n", $resource_key,
$resource_counts->{total}{$resource_key},
$resource_counts->{buffer}{$resource_key};
}
&App::sub_exit() if ($App::trace);
}
#############################################################################
# num_entries()
#############################################################################
=head2 num_entries()
* Signature: $q->num_entries();
* Signature: $q->num_entries($status);
* Param: $status string
* Return: void
* Throws: App::Exception::WorkQueue
* Since: 0.01
Sample Usage:
$num = $q->num_entries();
=cut
sub num_entries {
&App::sub_entry if ($App::trace);
my ($self, $status) = @_;
my $num = 0;
die "num_entries() not implemented";
&App::sub_exit($num) if ($App::trace);
return($num);
}
sub _num_entries_in_mem {
&App::sub_entry if ($App::trace);
my ($self, $status) = @_;
my $entries = $self->{data};
my $num = $#$entries + 1;
if ($status) {
$num = 0;
my $status_attrib = $self->{status_attrib};
if ($self->{type} eq "ARRAY") {
my $colidx = $self->_colidx();
my $status_idx = $colidx->{$status_attrib};
foreach my $entry (@$entries) {
$num ++ if ($entry->[$status_idx] eq $status);
}
}
else {
foreach my $entry (@$entries) {
$num ++ if ($entry->{$status_attrib} eq $status);
}
}
}
&App::sub_exit($num) if ($App::trace);
return($num);
}
sub _num_entries_from_status_counts {
&App::sub_entry if ($App::trace);
my ($self, $status) = @_;
my $status_counts = $self->_status_counts();
my ($num);
if ($status) {
$num = $status_counts->{$status};
}
else {
$num = $status_counts->{N} + $status_counts->{W};
}
$num = 0 if (!defined $num);
&App::sub_exit($num) if ($App::trace);
return($num);
}
#############################################################################
# set_global_constraint()
#############################################################################
=head2 set_global_constraint()
* Signature: $q->set_global_constraint($counts, $limits, $count_attrib,
$limit_attrib);
* Param: $counts HASH
* Param: $limits HASH
* Param: $count_attrib string
* Param: $limit_attrib string
* Return: undef
* Throws: App::Exception::WorkQueue
* Since: 0.01
Sample Usage:
$context = App->context();
$q = $context->service("WorkQueue"); # or ...
$q = $context->work_queue();
$q->push({ name => "Joe", degrees => 3 });
$q->push({ name => "Mike", degrees => 1 });
my %state = (
count => 0,
limit => 20,
);
$q->set_global_constraint(\%state, \%state, "count", "limit");
=cut
sub set_global_constraint {
&App::sub_entry if ($App::trace);
my ($self, $counts, $limits, $count_attrib, $limit_attrib) = @_;
my $global_constraints = $self->{global_constraints};
if (!$global_constraints) {
$global_constraints = [];
$self->{global_constraints} = $global_constraints;
}
CORE::push(@$global_constraints, [ $counts, $limits, $count_attrib,
$limit_attrib ]);
&App::sub_exit() if ($App::trace);
}
#############################################################################
# set_constraint()
#############################################################################
=head2 set_constraint()
* Signature: $q->set_constraint($counts, $limits, $key_attrib,
$count_attrib);
* Signature: $q->set_constraint($counts, $limits, $key_attrib);
* Param: $key_attrib string
* Param: $count_attrib string
* Param: $counts HASH
* Param: $limits HASH
* Return: undef
* Throws: App::Exception::WorkQueue
* Since: 0.01
Sample Usage:
$context = App->context();
$q = $context->service("WorkQueue"); # or ...
$q = $context->work_queue();
$q->push({ name => "Joe", degrees => 3 });
$q->push({ name => "Mike", degrees => 1 });
my (%counts);
my %limits = (
);
$q->set_constraint(\%counts, \%limits, "degrees");
=cut
sub set_constraint {
&App::sub_entry if ($App::trace);
my ($self, $counts, $limits, $key_attrib, $count_attrib) = @_;
my $constraints = $self->{constraints};
if (!$constraints) {
$constraints = [];
$self->{constraints} = $constraints;
}
if ($self->{type} eq "ARRAY") {
my $colidx = $self->_colidx();
my $key_idx = $colidx->{$key_attrib};
die "key_attribute [$key_attrib] does not exist on elements of this
queue" if (!defined $key_idx);
my ($count_idx);
if ($count_attrib) {
$count_idx = $colidx->{$key_attrib};
die "count_attribute [$count_attrib] does not exist on elements
of this queue" if (!defined $count_idx);
}
CORE::push(@$constraints, [ $counts, $limits, $key_attrib,
$count_attrib, $key_idx, $count_idx ]);
}
else {
CORE::push(@$constraints, [ $counts, $limits, $key_attrib,
$count_attrib ]);
}
&App::sub_exit() if ($App::trace);
}
#############################################################################
# clear_constraints()
#############################################################################
=head2 clear_constraints()
* Signature: $q->clear_constraints();
* Param: void
* Return: void
* Throws: App::Exception::WorkQueue
* Since: 0.01
Sample Usage:
$q->clear_constraints();
=cut
sub clear_constraints {
&App::sub_entry if ($App::trace);
my ($self) = @_;
delete $self->{constraints};
&App::sub_exit() if ($App::trace);
}
#############################################################################
# count_entries_by_attrib()
#############################################################################
=head2 count_entries_by_attrib()
* Signature: $q->count_entries_by_attrib($attrib, $counts);
* Param: $attrib string
* Param: $counts HASH
* Return: undef
* Throws: App::Exception::WorkQueue
* Since: 0.01
Sample Usage:
$context = App->context();
$q = $context->service("WorkQueue"); # or ...
$q = $context->work_queue();
$q->push({ name => "Joe", degrees => 3 });
$q->push({ name => "Mike", degrees => 1 });
my (%counts);
$q->count_entries_by_attrib("degrees", \%counts);
=cut
sub count_entries_by_attrib {
&App::sub_entry if ($App::trace);
my ($self, $key_attrib, $counts, $count_attrib) = @_;
die "count_entries_by_attrib() not implemented";
&App::sub_exit() if ($App::trace);
}
sub _count_entries_by_attrib_in_mem {
&App::sub_entry if ($App::trace);
my ($self, $key_attrib, $counts, $count_attrib) = @_;
if ($self->{type} eq "ARRAY") {
my $colidx = $self->_colidx();
die "key_attribute [$key_attrib] does not exist on elements of this
queue" if (!exists $colidx->{$key_attrib});
my $i = $colidx->{$key_attrib};
if ($count_attrib) {
my $count_i = $colidx->{$count_attrib};
die "count_attribute [$count_attrib] does not exist on elements
of this queue" if (! defined $count_i);
foreach my $entry (@{$self->{data}}) {
$counts->{$entry->[$i]} += $entry->[$count_i];
}
}
else {
foreach my $entry (@{$self->{data}}) {
$counts->{$entry->[$i]} ++;
}
}
}
else {
if ($count_attrib) {
foreach my $entry (@{$self->{data}}) {
$counts->{$entry->{$key_attrib}} += $entry->{$count_attrib};
}
}
else {
foreach my $entry (@{$self->{data}}) {
$counts->{$entry->{$key_attrib}} ++;
}
}
}
&App::sub_exit() if ($App::trace);
}
#############################################################################
# PROTECTED METHODS
#############################################################################
=head1 Protected Methods:
=cut
#############################################################################
# Method: _colidx()
#############################################################################
=head2 _colidx()
Returns the column-to-index hashref.
* Signature: $colidx = $q->_colidx();
* Param: void
* Return: $colidx HASH
* Since: 0.01
$colidx = $q->_colidx();
$idx = $colidx->{$column}; # get the column index for a named $column
=cut
sub _colidx {
&App::sub_entry if ($App::trace);
my ($self) = @_;
my $colidx = $self->{colidx};
if (!$colidx) {
my $columns = $self->{columns} || die "columns of the queue elements
must be supplied in order to know the column indexes";
$colidx = {};
for (my $i = 0; $i <= $#$columns; $i++) {
$colidx->{$columns->[$i]} = $i;
}
$self->{colidx} = $colidx;
}
&App::sub_exit($colidx) if ($App::trace);
return($colidx);
}
#############################################################################
# _global_resources_exist()
#############################################################################
=head2 _global_resources_exist()
* Signature: $q->_global_resources_exist();
* Param: $constraints ARRAY
* Return: undef
* Throws: App::Exception::WorkQueue
* Since: 0.01
Sample Usage:
$context = App->context();
$q = $context->service("WorkQueue"); # or ...
$q = $context->work_queue();
$q->_global_resources_exist();
=cut
sub _global_resources_exist {
&App::sub_entry if ($App::trace);
my ($self) = @_;
my $complies = 1;
my $global_constraints = $self->{global_constraints};
if (!$global_constraints || $#$global_constraints == -1) {
# do nothing
}
else {
foreach my $c (@$global_constraints) {
if ($c->[$GCONSTR_COUNTS]{$c->[$GCONSTR_COUNT_ATTRIB]} >=
$c->[$GCONSTR_LIMITS]{$c->[$GCONSTR_LIMIT_ATTRIB]}) {
$complies = 0;
last;
}
}
}
&App::sub_exit($complies) if ($App::trace);
return($complies);
}
#############################################################################
# _acquire_resources()
#############################################################################
=head2 _acquire_resources()
* Signature: $q->_acquire_resources($entry, $constraints);
* Param: $entry ARRAY/HASH
* Param: $constraints ARRAY
* Return: undef
* Throws: App::Exception::WorkQueue
* Since: 0.01
Sample Usage:
$context = App->context();
$q = $context->service("WorkQueue"); # or ...
$q = $context->work_queue();
$q->push({ name => "Joe", degrees => 3 });
$q->push({ name => "Mike", degrees => 1 });
$q->_acquire_resources($entry);
$q->_acquire_resources($entry, $constraints);
=cut
sub _acquire_resources {
&App::sub_entry if ($App::trace);
my ($self, $entry, $constraints) = @_;
my $complies = 1;
my $global_constraints = $self->{global_constraints};
if (!$global_constraints || $#$global_constraints == -1) {
# do nothing
}
else {
foreach my $c (@$global_constraints) {
if ($c->[$GCONSTR_COUNTS]{$c->[$GCONSTR_COUNT_ATTRIB]} >=
$c->[$GCONSTR_LIMITS]{$c->[$GCONSTR_LIMIT_ATTRIB]}) {
$complies = 0;
last;
}
}
}
$constraints ||= $self->{constraints};
# print "_acquire_resources(): complies=[$complies]
constraints=[$constraints] #constraints=[$#$constraints]\n";
if (!$complies || !$constraints || $#$constraints == -1) {
# do nothing
}
else {
my ($key, $count_incr, $limit);
if ($self->{type} eq "ARRAY") {
foreach my $c (@$constraints) {
$key = $entry->[$c->[$CONSTR_KEY_IDX]];
$limit = $c->[$CONSTR_LIMITS]{$key};
$limit = $c->[$CONSTR_LIMITS]{_DEFAULT} if (!defined $limit);
if (defined $limit) {
$count_incr = (defined $c->[$CONSTR_COUNT_IDX]) ?
$entry->[$c->[$CONSTR_COUNT_IDX]] : 1;
if ($c->[$CONSTR_COUNTS]{$key} + $count_incr > $limit) {
$complies = 0;
last;
}
}
}
if ($complies) {
foreach my $c (@$constraints) {
$key = $entry->[$c->[$CONSTR_KEY_IDX]];
$limit = $c->[$CONSTR_LIMITS]{$key};
$limit = $c->[$CONSTR_LIMITS]{_DEFAULT} if (!defined
$limit);
if (defined $limit) {
$count_incr = (defined $c->[$CONSTR_COUNT_IDX]) ?
$entry->[$c->[$CONSTR_COUNT_IDX]] : 1;
$c->[$CONSTR_COUNTS]{$key} += $count_incr;
}
}
}
}
else {
foreach my $c (@$constraints) {
$key = $entry->{$c->[$CONSTR_KEY_ATTRIB]};
$limit = $c->[$CONSTR_LIMITS]{$key};
$limit = $c->[$CONSTR_LIMITS]{_DEFAULT} if (!defined $limit);
if (defined $limit) {
$count_incr = (defined $c->[$CONSTR_COUNT_ATTRIB]) ?
$entry->{$c->[$CONSTR_COUNT_ATTRIB]} : 1;
if ($c->[$CONSTR_COUNTS]{$key} + $count_incr > $limit) {
$complies = 0;
last;
}
}
}
if ($complies) {
foreach my $c (@$constraints) {
$key = $entry->{$c->[$CONSTR_KEY_ATTRIB]};
$limit = $c->[$CONSTR_LIMITS]{$key};
$limit = $c->[$CONSTR_LIMITS]{_DEFAULT} if (!defined
$limit);
if (defined $limit) {
$count_incr = (defined $c->[$CONSTR_COUNT_ATTRIB]) ?
$entry->{$c->[$CONSTR_COUNT_ATTRIB]} : 1;
$c->[$CONSTR_COUNTS]{$key} += $count_incr;
}
}
}
}
}
if ($complies) {
foreach my $c (@$global_constraints) {
$c->[$GCONSTR_COUNTS]{$c->[$GCONSTR_COUNT_ATTRIB]} ++;
}
}
&App::sub_exit($complies) if ($App::trace);
return($complies);
}
#############################################################################
# _release_resources()
#############################################################################
=head2 _release_resources()
* Signature: $q->_release_resources($entry, $constraints);
* Param: $entry ARRAY/HASH
* Param: $constraints ARRAY
* Return: undef
* Throws: App::Exception::WorkQueue
* Since: 0.01
Sample Usage:
$context = App->context();
$q = $context->service("WorkQueue"); # or ...
$q = $context->work_queue();
$q->push({ name => "Joe", degrees => 3 });
$q->push({ name => "Mike", degrees => 1 });
$q->_release_resources($entry);
$q->_release_resources($entry, $constraints);
=cut
sub _release_resources {
&App::sub_entry if ($App::trace);
my ($self, $entry, $constraints) = @_;
my $global_constraints = $self->{global_constraints};
if (!$global_constraints || $#$global_constraints == -1) {
# do nothing
}
else {
foreach my $c (@$global_constraints) {
$c->[$GCONSTR_COUNTS]{$c->[$GCONSTR_COUNT_ATTRIB]} --;
}
}
$constraints ||= $self->{constraints};
if (!$constraints || $#$constraints == -1) {
# do nothing
}
else {
my ($key, $count_incr, $limit);
if ($self->{type} eq "ARRAY") {
foreach my $c (@$constraints) {
$key = $entry->[$c->[$CONSTR_KEY_IDX]];
$limit = $c->[$CONSTR_LIMITS]{$key};
$limit = $c->[$CONSTR_LIMITS]{_DEFAULT} if (!defined $limit);
if (defined $limit) {
$count_incr = (defined $c->[$CONSTR_COUNT_IDX]) ?
$entry->[$c->[$CONSTR_COUNT_IDX]] : 1;
$c->[$CONSTR_COUNTS]{$key} -= $count_incr;
}
}
}
else {
foreach my $c (@$constraints) {
$key = $entry->{$c->[$CONSTR_KEY_ATTRIB]};
$limit = $c->[$CONSTR_LIMITS]{$key};
$limit = $c->[$CONSTR_LIMITS]{_DEFAULT} if (!defined $limit);
if (defined $limit) {
$count_incr = (defined $c->[$CONSTR_COUNT_ATTRIB]) ?
$entry->{$c->[$CONSTR_COUNT_ATTRIB]} : 1;
$c->[$CONSTR_COUNTS]{$key} -= $count_incr;
}
}
}
}
&App::sub_exit() if ($App::trace);
}
sub _status_counts {
my ($self) = @_;
my $status_counts = $self->{status_counts};
if (!$status_counts) {
$status_counts = {};
$self->{status_counts} = $status_counts;
}
return($status_counts);
}
sub _resource_counts {
my ($self) = @_;
my $resource_counts = $self->{resource_counts};
if (!$resource_counts) {
$resource_counts = $self->_initialize_resource_counts();
$self->{resource_counts} = $resource_counts;
}
return($resource_counts);
}
sub _initialize_resource_counts {
my ($self) = @_;
my $resource_counts = {
total => {},
buffer => {},
};
return($resource_counts);
}
#############################################################################
# _maintain_queue_buffers()
#############################################################################
=head2 _maintain_queue_buffers()
* Signature: $q->_maintain_queue_buffers($add, $entry);
* Param: $add [-1,0,1]
* Param: $entry ARRAY/HASH
* Return: undef
* Throws: App::Exception::WorkQueue
* Since: 0.01
Sample Usage:
$context = App->context();
$q = $context->service("WorkQueue"); # or ...
$q = $context->work_queue();
$q->push({ name => "Joe", degrees => 3 });
$q->push({ name => "Mike", degrees => 1 });
$q->_maintain_queue_buffers(1,$entry);
$q->_maintain_queue_buffers(-1,$entry);
$q->_maintain_queue_buffers();
Queues may be implemented with remote storage. In that case, there are local
queue buffers which are maintained in memory to increase performance.
There is conceptually a queue buffer for each combination of constraint values
(the constraint key of an entry).
The Queue Buffers are maintained each time an entry is push()ed release()d.
The count for the particular constraint key is modified.
If the count falls below a configured low-water mark, a new set of entries
are read in from the remote storage.
=cut
sub _maintain_queue_buffers {
&App::sub_entry if ($App::trace);
my ($self, $op, $entry) = @_;
$op ||= "";
my $BUFFER_SIZE = $self->{BUFFER_SIZE};
my $resource_key = $self->_resource_key($entry);
my $resource_counts = $self->_resource_counts();
if ($op eq "push") {
$resource_counts->{total}{$resource_key}++;
if ($resource_counts->{buffer}{$resource_key} < $BUFFER_SIZE) {
$resource_counts->{buffer}{$resource_key}++;
$self->_push_in_mem($entry);
}
else {
$self->_push_in_mem($entry,1); # release lowest
}
}
elsif ($op eq "acquire") {
$resource_counts->{total}{$resource_key}--;
$resource_counts->{buffer}{$resource_key}--;
}
elsif ($op eq "release") {
$self->_release_in_mem($entry);
}
elsif ($op eq "unacquire") {
$resource_counts->{total}{$resource_key}++;
$resource_counts->{buffer}{$resource_key}++;
}
my $num_total = $resource_counts->{total}{$resource_key};
my $num_in_buffer = $resource_counts->{buffer}{$resource_key};
if ($num_total > $num_in_buffer && $num_in_buffer < $BUFFER_SIZE) {
my $num_added = $self->_refill_buffer($resource_key);
$resource_counts->{buffer}{$resource_key} += $num_added;
}
&App::sub_exit() if ($App::trace);
}
#############################################################################
# _resource_key()
#############################################################################
=head2 _resource_key()
* Signature: $q->_resource_key($entry);
* Param: $entry ARRAY/HASH
* Return: undef
* Throws: App::Exception::WorkQueue
* Since: 0.01
Sample Usage:
$context = App->context();
$q = $context->service("WorkQueue"); # or ...
$q = $context->work_queue();
$q->push({ name => "Joe", degrees => 3 });
$q->push({ name => "Mike", degrees => 1 });
$resource_key = $q->_resource_key($entry);
=cut
sub _resource_key {
&App::sub_entry if ($App::trace);
my ($self, $entry) = @_;
my $resource_key = "";
my $constraints = $self->{constraints};
if (!$constraints || $#$constraints == -1) {
# do nothing
}
else {
my (@resource_key, $key, $count_incr, $limit);
if ($self->{type} eq "ARRAY") {
foreach my $c (@$constraints) {
$key = $entry->[$c->[$CONSTR_KEY_IDX]];
$key = "" if (!defined $key);
CORE::push(@resource_key, $key);
}
}
else {
foreach my $c (@$constraints) {
$key = $entry->{$c->[$CONSTR_KEY_ATTRIB]};
$key = "" if (!defined $key);
CORE::push(@resource_key, $key);
}
}
$resource_key = join(":",@resource_key);
}
&App::sub_exit($resource_key) if ($App::trace);
return($resource_key);
}
sub _resource_key_to_params {
&App::sub_entry if ($App::trace);
my ($self, $resource_key) = @_;
my $params = {};
my $constraints = $self->{constraints};
if (!$constraints || $#$constraints == -1) {
# do nothing
}
else {
my @resource_key = split(/:/, $resource_key);
for (my $i = 0; $i <= $#$constraints; $i++) {
$params->{$constraints->[$i][$CONSTR_KEY_ATTRIB]} =
$resource_key[$i];
}
}
&App::sub_exit($params) if ($App::trace);
return($params);
}
#############################################################################
# Method: service_type()
#############################################################################
=head2 service_type()
Returns "WorkQueue";
* Signature: $service_type = App::WorkQueue->service_type();
* Param: void
* Return: $service_type string
* Since: 0.01
$service_type = $serializer->service_type();
=cut
sub service_type () { "WorkQueue"; }
=head1 ACKNOWLEDGEMENTS
* Author: Stephen Adkins <[EMAIL PROTECTED]>
* License: This is free software. It is licensed under the same terms as
Perl itself.
=head1 SEE ALSO
L<C<App::Context>|App::Context>,
L<C<App::Service>|App::Service>
=cut
1;
__END__
#############################################################################
# _check_low_queue_buffers()
#############################################################################
=head2 _check_low_queue_buffers()
* Signature: $q->_check_low_queue_buffers($entry);
* Param: $entry ARRAY/HASH
* Return: undef
* Throws: App::Exception::WorkQueue
* Since: 0.01
Sample Usage:
$context = App->context();
$q = $context->service("WorkQueue"); # or ...
$q = $context->work_queue();
$q->push({ name => "Joe", degrees => 3 });
$q->push({ name => "Mike", degrees => 1 });
$q->_check_low_queue_buffers($entry);
Queues may be implemented with remote storage. In that case, there are local
queue buffers which are maintained in memory to increase performance.
There is conceptually a queue buffer for each combination of constraint values
(the constraint key of an entry).
The Queue Buffers are checked each time an entry is push()ed.
The count for the particular constraint key is incremented.
Only if the count is below a configured high-water mark is it necessary
to also push the entry onto the queue buffer.
=cut
sub _check_low_queue_buffers {
&App::sub_entry if ($App::trace);
my ($self, $entry) = @_;
my $BUFFER_SIZE = $self->{BUFFER_SIZE};
my $buffer_low = 0;
my $resource_key = $self->_resource_key($entry);
my $resource_counts = $self->_resource_counts();
$resource_counts->{total}{$resource_key}++;
if ($resource_counts->{buffer}{$resource_key} < $BUFFER_SIZE) {
$resource_counts->{buffer}{$resource_key} ++;
$buffer_low = 1;
}
&App::sub_exit($buffer_low) if ($App::trace);
return($buffer_low);
}