Author: spadkins
Date: Thu Jan 17 05:39:09 2008
New Revision: 10577
Modified:
p5ee/trunk/App-WorkQueue/lib/App/WorkQueue.pm
p5ee/trunk/App-WorkQueue/lib/App/WorkQueue/Repository.pm
Log:
a lot of refactoring, mostly surrounding the release processes and the
related _maintain_queue_buffers call
Modified: p5ee/trunk/App-WorkQueue/lib/App/WorkQueue.pm
==============================================================================
--- p5ee/trunk/App-WorkQueue/lib/App/WorkQueue.pm (original)
+++ p5ee/trunk/App-WorkQueue/lib/App/WorkQueue.pm Thu Jan 17 05:39:09 2008
@[EMAIL PROTECTED]
-460,7 +460,6 @[EMAIL PROTECTED]
my ($self) = @[EMAIL PROTECTED]
my $context = $self->{context};
- my $db = $self->_db();
my ($entry);
my $STATUS_UNACQUIRED = $self->{STATUS_UNACQUIRED};
my $STATUS_ACQUIRED = $self->{STATUS_ACQUIRED};
@[EMAIL PROTECTED]
-496,7 +495,10 @[EMAIL PROTECTED]
my ($acquired);
foreach my $e (@[EMAIL PROTECTED]
) {
next if ($e->{$status_attrib} ne $STATUS_UNACQUIRED);
- if ($self->_acquire_resources($e)) {
+ if (!$e || !%$e) {
+ $context->log({level=>1}, "WorkQueue : $self->{name}
: EMPTY ENTRY [$e]\n");
+ }
+ elsif ($self->_acquire_resources($e)) {
$entry = $e;
$acquired = $self->_acquire_entry($entry);
if ($acquired) {
@[EMAIL PROTECTED]
-504,8 +506,13 @[EMAIL PROTECTED]
last;
}
else {
+ ### THIS SHOULD NEVER HAPPEN, SOMEDAY FIGURE OUT
WHY
$self->_release_resources($entry);
- $context->log({level=>1}, "WorkQueue :
$self->{name} : ACQUISITION FAILED :
[$entry->{shop_request_id}|$entry->{subrequest_id}] : last sql
stmt[$db->{sql}]\n");
+ if ($self->can("_db")) {
+ ### TODO: this is a debugging hack that
shouldn't stay in it
+ my $db = $self->_db();
+ $context->log({level=>1}, "WorkQueue :
$self->{name} : ACQUISITION FAILED :
[$entry->{shop_request_id}|$entry->{subrequest_id}] : last sql
stmt[$db->{sql}]\n");
+ }
$entry = undef;
# the following line appears to have been a bug
#$self->_maintain_queue_buffers("release",$entry);
@[EMAIL PROTECTED]
-599,6 +606,7 @[EMAIL PROTECTED]
my $STATUS_RELEASED = $self->{STATUS_RELEASED};
my $status_attrib = $self->{status_attrib};
my $data = $self->{data};
+ my $context = $self->{context};
my ($e, $ent, $entry_key);
my @[EMAIL PROTECTED]
= ( $self->{status_attrib} );
@[EMAIL PROTECTED]
-638,9 +646,14 @[EMAIL PROTECTED]
$self->_release_resources($ent);
}
else {
- $self->_release_resources($ent);
+ ### We only expect to get here via cancels, so we
don't want to release_resources
+ #$self->_release_resources($ent);
+ $context->log("WorkQueue: _release_in_mem: release
with $ent->{$status_attrib} not equal $STATUS_ACQUIRED\n");
+ }
+ my $released = $self->update($ent,\@[EMAIL PROTECTED]
);
+ if (!$released) {
+ $context->log("WorkQueue: _release_in_mem: update of
db failed : " . join("|", %$ent) . "\n");
}
- $self->update($ent,\@[EMAIL PROTECTED]
);
splice(@[EMAIL PROTECTED]
$e, 1);
print "RELEASED[M]: {", join("|",%$entry), "}\n" if
($verbose);
$released = 1;
@[EMAIL PROTECTED]
-1557,12 +1570,8 @[EMAIL PROTECTED]
if ($complies) {
foreach my $c (@[EMAIL PROTECTED]
) {
$key = $entry->[$c->[$CONSTR_KEY_IDX]];
- $limit = $c->[$CONSTR_LIMITS]{$key};
- $limit = $c->[$CONSTR_LIMITS]{_DEFAULT} if (!defined
$limit);
- if (defined $limit) {
- $count_incr = (defined $c->[$CONSTR_COUNT_IDX]) ?
$entry->[$c->[$CONSTR_COUNT_IDX]] : 1;
- $c->[$CONSTR_COUNTS]{$key} += $count_incr;
- }
+ $count_incr = (defined $c->[$CONSTR_COUNT_IDX]) ?
$entry->[$c->[$CONSTR_COUNT_IDX]] : 1;
+ $c->[$CONSTR_COUNTS]{$key} += $count_incr;
}
}
}
@[EMAIL PROTECTED]
-1583,12 +1592,8 @[EMAIL PROTECTED]
if ($complies) {
foreach my $c (@[EMAIL PROTECTED]
) {
$key = $entry->{$c->[$CONSTR_KEY_ATTRIB]};
- $limit = $c->[$CONSTR_LIMITS]{$key};
- $limit = $c->[$CONSTR_LIMITS]{_DEFAULT} if (!defined
$limit);
- if (defined $limit) {
- $count_incr = (defined
$c->[$CONSTR_COUNT_ATTRIB]) ? $entry->{$c->[$CONSTR_COUNT_ATTRIB]} : 1;
- $c->[$CONSTR_COUNTS]{$key} += $count_incr;
- }
+ $count_incr = (defined $c->[$CONSTR_COUNT_ATTRIB]) ?
$entry->{$c->[$CONSTR_COUNT_ATTRIB]} : 1;
+ $c->[$CONSTR_COUNTS]{$key} += $count_incr;
}
}
}
@[EMAIL PROTECTED]
-1653,27 +1658,20 @[EMAIL PROTECTED]
if ($self->{type} eq "ARRAY") {
foreach my $c (@[EMAIL PROTECTED]
) {
$key = $entry->[$c->[$CONSTR_KEY_IDX]];
- $limit = $c->[$CONSTR_LIMITS]{$key};
- $limit = $c->[$CONSTR_LIMITS]{_DEFAULT} if (!defined
$limit);
- if (defined $limit) {
- $count_incr = (defined $c->[$CONSTR_COUNT_IDX]) ?
$entry->[$c->[$CONSTR_COUNT_IDX]] : 1;
- $c->[$CONSTR_COUNTS]{$key} -= $count_incr;
- }
+ $count_incr = (defined $c->[$CONSTR_COUNT_IDX]) ?
$entry->[$c->[$CONSTR_COUNT_IDX]] : 1;
+ $c->[$CONSTR_COUNTS]{$key} -= $count_incr;
}
}
else {
foreach my $c (@[EMAIL PROTECTED]
) {
$key = $entry->{$c->[$CONSTR_KEY_ATTRIB]};
- $limit = $c->[$CONSTR_LIMITS]{$key};
- $limit = $c->[$CONSTR_LIMITS]{_DEFAULT} if (!defined
$limit);
- if (defined $limit) {
- $count_incr = (defined $c->[$CONSTR_COUNT_ATTRIB]) ?
$entry->{$c->[$CONSTR_COUNT_ATTRIB]} : 1;
- $c->[$CONSTR_COUNTS]{$key} -= $count_incr;
- }
+ $count_incr = (defined $c->[$CONSTR_COUNT_ATTRIB]) ?
$entry->{$c->[$CONSTR_COUNT_ATTRIB]} : 1;
+ $c->[$CONSTR_COUNTS]{$key} -= $count_incr;
}
}
}
- &App::sub_exit() if ($App::trace);
+ &App::sub_exit(1) if ($App::trace);
+ return(1);
}
sub _resource_counts {
@[EMAIL PROTECTED]
-1745,7 +1743,7 @[EMAIL PROTECTED]
my $resource_counts = $self->_resource_counts();
if ($op eq "push") {
- $context->log({level=>3}, "WQ: _maintain_queue_buffers :
entry push begin\n");
+ #$context->log({level=>3}, "WQ: _maintain_queue_buffers :
entry push begin\n");
$resource_counts->{total}{$resource_key}++;
if ($resource_counts->{buffer}{$resource_key} < $BUFFER_SIZE)
{
$resource_counts->{buffer}{$resource_key}++;
@[EMAIL PROTECTED]
-1754,48 +1752,88 @[EMAIL PROTECTED]
else {
$self->_push_in_mem($entry,1); # release lowest
}
- $context->log({level=>3}, "WQ: _maintain_queue_buffers :
entry push end\n");
+ #$context->log({level=>3}, "WQ: _maintain_queue_buffers :
entry push end\n");
}
elsif ($op eq "acquire") {
- $context->log({level=>3}, "WQ: _maintain_queue_buffers :
entry acquire begin\n");
+ #$context->log({level=>3}, "WQ: _maintain_queue_buffers :
entry acquire begin\n");
$resource_counts->{total}{$resource_key}--;
$resource_counts->{buffer}{$resource_key}--;
- $context->log({level=>3}, "WQ: _maintain_queue_buffers :
entry acquire end\n");
+ #$context->log({level=>3}, "WQ: _maintain_queue_buffers :
entry acquire end\n");
}
elsif ($op eq "release") {
- $context->log({level=>3}, "WQ: _maintain_queue_buffers :
entry release begin\n");
+ #$context->log({level=>3}, "WQ: _maintain_queue_buffers :
entry release begin\n");
$self->_release_in_mem($entry, $columns, $values);
- $context->log({level=>3}, "WQ: _maintain_queue_buffers :
entry release end\n");
+
+ my $status_attrib = $self->{status_attrib};
+ my $STATUS_ACQUIRED = $self->{STATUS_ACQUIRED};
+ my $release_without_acquire = 0;
+ if ($self->{type} eq "ARRAY") {
+ my $colidx = $self->_colidx();
+ my $status_idx = $colidx->{$status_attrib};
+ if ($entry->[$status_idx] ne $STATUS_ACQUIRED) {
+ $release_without_acquire = 1;
+ }
+ }
+ else {
+ if ($entry->{$status_attrib} ne $STATUS_ACQUIRED) {
+ $release_without_acquire = 1;
+ }
+ }
+ ### TODO: figure out how to maintain numbers when $released
is false, causing constraint issues
+ my $released = $self->_release_in_mem($entry, $columns,
$values);
+ if ($released) {
+ if ($release_without_acquire) {
+ $resource_counts->{total}{$resource_key}--;
+ $resource_counts->{buffer}{$resource_key}--;
+ }
+ else {
+ # do nothing
+ }
+ }
+ else {
+ my $nrows =
$self->_release_in_db($entry,$columns,$values);
+ if ($release_without_acquire) {
+ $resource_counts->{total}{$resource_key}--;
+ }
+ else {
+ # do nothing
+ #$self->_release_resources($entry) if ($nrows &&
!$release_without_acquire);
+ }
+ my $context = $self->{context};
+ $context->log("WorkQueue: release: released something not
in memory entry=[$entry] nrows=[$nrows]
release_without_acquire=[$release_without_acquire]\n");
+ }
+
+ #$context->log({level=>3}, "WQ: _maintain_queue_buffers :
entry release end\n");
}
elsif ($op eq "unacquire") {
- $context->log({level=>3}, "WQ: _maintain_queue_buffers :
entry unacquire begin\n");
+ #$context->log({level=>3}, "WQ: _maintain_queue_buffers :
entry unacquire begin\n");
$resource_counts->{total}{$resource_key}++;
$resource_counts->{buffer}{$resource_key}++;
- $context->log({level=>3}, "WQ: _maintain_queue_buffers :
entry unacquire end\n");
+ #$context->log({level=>3}, "WQ: _maintain_queue_buffers :
entry unacquire end\n");
}
- $context->log({level=>3}, "WQ: _maintain_queue_buffers : entry
call _refill_buffer begin\n");
+ #$context->log({level=>3}, "WQ: _maintain_queue_buffers : entry
call _refill_buffer begin\n");
my $num_total = $resource_counts->{total}{$resource_key};
my $num_in_buffer = $resource_counts->{buffer}{$resource_key};
if ($num_total > $num_in_buffer && $num_in_buffer < $BUFFER_SIZE)
{
my $num_added = $self->_refill_buffer($resource_key);
$resource_counts->{buffer}{$resource_key} += $num_added;
}
- $context->log({level=>3}, "WQ: _maintain_queue_buffers : entry
call _refill_buffer end\n");
+ #$context->log({level=>3}, "WQ: _maintain_queue_buffers : entry
call _refill_buffer end\n");
}
else {
- $context->log({level=>3}, "WQ: _maintain_queue_buffers : no entry
foreach resource_count key begin\n");
+ #$context->log({level=>3}, "WQ: _maintain_queue_buffers : no
entry foreach resource_count key begin\n");
my $resource_counts = $self->_resource_counts();
foreach my $resource_key (keys %{$resource_counts->{total}}) {
- $context->log({level=>3}, "WQ: _maintain_queue_buffers : no
entry _refill_buffer begin\n");
+ #$context->log({level=>3}, "WQ: _maintain_queue_buffers : no
entry _refill_buffer begin\n");
my $num_total =
$resource_counts->{total}{$resource_key};
my $num_in_buffer =
$resource_counts->{buffer}{$resource_key};
if ($num_total > $num_in_buffer && $num_in_buffer <
$BUFFER_SIZE) {
my $num_added = $self->_refill_buffer($resource_key);
$resource_counts->{buffer}{$resource_key} += $num_added;
}
- $context->log({level=>3}, "WQ: _maintain_queue_buffers : no
entry _refill_buffer end\n");
+ #$context->log({level=>3}, "WQ: _maintain_queue_buffers : no
entry _refill_buffer end\n");
}
- $context->log({level=>3}, "WQ: _maintain_queue_buffers : no entry
foreach resource_count key end\n");
+ #$context->log({level=>3}, "WQ: _maintain_queue_buffers : no
entry foreach resource_count key end\n");
}
&App::sub_exit() if ($App::trace);
Modified: p5ee/trunk/App-WorkQueue/lib/App/WorkQueue/Repository.pm
==============================================================================
--- p5ee/trunk/App-WorkQueue/lib/App/WorkQueue/Repository.pm (original)
+++ p5ee/trunk/App-WorkQueue/lib/App/WorkQueue/Repository.pm Thu Jan 17
05:39:09 2008
@[EMAIL PROTECTED]
-305,41 +305,12 @[EMAIL PROTECTED]
&App::sub_entry if ($App::trace);
my ($self, $entry, $columns, $values) = @[EMAIL PROTECTED]
- my $status_attrib = $self->{status_attrib};
- my $STATUS_ACQUIRED = $self->{STATUS_ACQUIRED};
- my ($resource_counts, $resource_key, $release_without_acquire);
- $resource_counts = $self->_resource_counts();
- $resource_key = $self->_resource_key($entry);
- if ($self->{type} eq "ARRAY") {
- my $colidx = $self->_colidx();
- my $status_idx = $colidx->{$status_attrib};
- if ($entry->[$status_idx] ne $STATUS_ACQUIRED) {
- $release_without_acquire = 1;
- }
- }
- else {
- if ($entry->{$status_attrib} ne $STATUS_ACQUIRED) {
- $release_without_acquire = 1;
- }
- }
- if ($release_without_acquire) {
- $resource_counts->{total}{$resource_key}--;
- }
- ### TODO: figure out how to maintain numbers when $released is false,
causing constraint issues
- my $released = $self->_release_in_mem($entry, $columns, $values);
- if ($released) {
- $resource_counts->{buffer}{$resource_key}-- if
($release_without_acquire);
- $self->_maintain_queue_buffers(undef,$entry,$columns,$values);
- }
- else {
- $released = $self->_release_in_db($entry,$columns,$values);
- $self->_release_resources($entry) if ($released &&
!$release_without_acquire);
- #$resource_counts->{total}{$resource_key}-- if
(!$release_without_acquire);
- }
+ $self->_maintain_queue_buffers("release",$entry,$columns,$values);
$self->print() if ($self->{verbose});
$self->{context}->trigger_event_loop_extension();
- &App::sub_exit($released) if ($App::trace);
- return($released);
+
+ &App::sub_exit(1) if ($App::trace);
+ return(1);
}
sub _release_in_db {
@[EMAIL PROTECTED]
-507,6 +478,9 @[EMAIL PROTECTED]
$self->_update_ref($entry, $columns, $values) if ($acquired);
}
else {
+ ### THIS SHOULD NEVER HAPPEN
+ my $context = $self->{context};
+ $context->log("WorkQueue: _acquire_entry: tried to acquire an
entry whose $self->{status_attrib} == $self->{STATUS_ACQUIRED}\n");
$acquired = 0;
}
&App::sub_exit($acquired) if ($App::trace);


|