Author: spadkins
Date: Tue Dec 4 10:26:19 2007
New Revision: 10360
Modified:
p5ee/trunk/App-Context/lib/App/Context/POE/ClusterController.pm
p5ee/trunk/App-Context/lib/App/Context/POE/ClusterNode.pm
p5ee/trunk/App-Context/lib/App/Context/POE/Server.pm
Log:
clean up, and fixes to make splitting happen constantly and as fast as
possible, but also allow for shopping to be spun off to keep nodes busy
Modified: p5ee/trunk/App-Context/lib/App/Context/POE/ClusterController.pm
==============================================================================
---
p5ee/trunk/App-Context/lib/App/Context/POE/ClusterController.pm (original)
+++ p5ee/trunk/App-Context/lib/App/Context/POE/ClusterController.pm Tue
Dec 4 10:26:19 2007
@[EMAIL PROTECTED]
-76,8 +76,6 @[EMAIL PROTECTED]
&App::sub_entry if ($App::trace);
my ($self, $event, $callback_event) = @[EMAIL PROTECTED]
- #$self->log({level=>2}, "CC: send_async_event_now()
$event->{name}.$event->{method} : $event->{destination}\n") if
$self->{options}{poe_trace};
-
my $destination = $event->{destination};
if (! defined $destination) {
$self->log("ERROR: send_async_event_now()
$event->{name}.$event->{method} : destination not assigned\n");
@[EMAIL PROTECTED]
-147,10 +145,7 @[EMAIL PROTECTED]
sub _abort_running_async_event {
&App::sub_entry if ($App::trace);
my ($self, $runtime_event_token, $event, $callback_event) = @[EMAIL PROTECTED]
#$self->log({level=>2}, "CC: _abort_running_async_event :
runtime_event_token=[$runtime_event_token] : event=[$event] :
callback_event=[$callback_event]\n");
- #my $async_event =
$self->{running_async_event}{$runtime_event_token};
if ($runtime_event_token && $event && $callback_event) {
- #my ($event, $callback_event) = @[EMAIL PROTECTED]
if ($runtime_event_token =~ /^[0-9]+$/) {
kill(9, $runtime_event_token);
}
@[EMAIL PROTECTED]
-163,7 +158,6 @[EMAIL PROTECTED]
my $remote_session_state = "poe_cancel_async_event";
my $kernel = $self->{poe_kernel};
- #$self->log({level=>2},"CC: _abort_running_async_event :
calling remote cancel for $runtime_event_token\n");
$kernel->post("IKC", "post",
"poe://$remote_server_name/$remote_session_alias/$remote_session_state",
[ $runtime_event_token ]);
}
Modified: p5ee/trunk/App-Context/lib/App/Context/POE/ClusterNode.pm
==============================================================================
--- p5ee/trunk/App-Context/lib/App/Context/POE/ClusterNode.pm (original)
+++ p5ee/trunk/App-Context/lib/App/Context/POE/ClusterNode.pm Tue Dec 4
10:26:19 2007
@[EMAIL PROTECTED]
-254,18 +254,10 @[EMAIL PROTECTED]
for (my $i = 0; $i < @[EMAIL PROTECTED]
>{pending_async_events}}; $i++) {
my $event_token =
$self->{pending_async_events}[$i][0]{event_token};
- $self->log({level=>2},"CN : poe_cancel_async_event : z1 :
pending_async_events : event_token=[$event_token]\n");
}
### Find if running
for my $pid (keys %{$self->{running_async_event}}) {
- #$self->log({level=>2},"CN : poe_cancel_async_event :
running_async_event : pid=[$pid]\n");
- #my $ae = $self->{running_async_event}{$pid};
- #my $e = join(", ", map {sprintf("$_ = [$ae->[0]{$_}]")} keys
%{$ae->[0]});
- #my $ce = join(", ", map {sprintf("$_ = [$ae->[1]{$_}]")} keys
%{$ae->[1]});
- #$self->log({level=>2},"CN : poe_cancel_async_event : e=[$e]\n");
- #$self->log({level=>2},"CN : poe_cancel_async_event :
ce=[$ce]\n");
-
my $event_token =
$self->{running_async_event}{$pid}[0]{event_token};
if ($runtime_event_token eq $event_token) {
$self->log({level=>2},"CN : poe_cancel_async_event :
running_async_event : found event_token=[$event_token] pid=[$pid]\n");
@[EMAIL PROTECTED]
-285,14 +277,9 @[EMAIL PROTECTED]
### Find if pending
for (my $i = 0; $i < @[EMAIL PROTECTED]
>{pending_async_events}}; $i++) {
my $event_token =
$self->{pending_async_events}[$i][0]{event_token};
- $self->log({level=>2},"CN : poe_cancel_async_event : z2 :
pending_async_events : event_token=[$event_token]\n");
if ($runtime_event_token eq $event_token) {
splice(@[EMAIL PROTECTED]
>{pending_async_events}}, $i, 1);
}
- #my $ae = $self->{pending_async_events}{$foo};
- #my $e = join(", ", map {sprintf("$_ = [$ae->[0]{$_}]")} keys
%{$ae->[0]});
- #my $ce = join(", ", map {sprintf("$_ = [$ae->[1]{$_}]")} keys
%{$ae->[1]});
- #$self->log({level=>2},"CN : poe_cancel_async_event : e=[$e]\n");
}
&App::sub_exit() if ($App::trace);
}
Modified: p5ee/trunk/App-Context/lib/App/Context/POE/Server.pm
==============================================================================
--- p5ee/trunk/App-Context/lib/App/Context/POE/Server.pm (original)
+++ p5ee/trunk/App-Context/lib/App/Context/POE/Server.pm Tue Dec 4
10:26:19 2007
@[EMAIL PROTECTED]
-333,6 +333,7 @[EMAIL PROTECTED]
$state .= "\n";
+ ### Only enable this in development, requires a library uncomment as
well
#$state .= $self->_state_poe();
### THIS DOESN'T WORK YET
@[EMAIL PROTECTED]
-350,6 +351,7 @[EMAIL PROTECTED]
### POE state dumping - Currently commented out because it doesn't
gain us much
### in the way of visibility, and POE::API::Peek is a CPAN pain
+ ### UNCOMMENT THIS IF YOU NEED IT, DEPENDS ON A PAINFUL LIBRARY
#my $api = POE::API::Peek->new();
#my @[EMAIL PROTECTED]
= $api->event_queue_dump();
#$state .= "POE event_queue_dump\n";
@[EMAIL PROTECTED]
-448,7 +450,7 @[EMAIL PROTECTED]
$max_events ||= 9999;
my $pending_async_events = $self->{pending_async_events};
- my ($async_event, $assigned, $event, $in_process, %unique_events);
+ my ($async_event, $assigned, $event, $in_process);
my $events_occurred = 0;
my $i = 0;
my $event_capacity_exists = 1;
@[EMAIL PROTECTED]
-461,8 +463,6 @[EMAIL PROTECTED]
$events_occurred ++;
splice(@[EMAIL PROTECTED]
$i, 1); # remove
$pending_async_events->[$i]
$max_i--;
-
- $unique_events{"$event->{name} $event->{method}"}++;
}
elsif ($event_capacity_exists) {
$assigned = $self->assign_event_destination($event);
@[EMAIL PROTECTED]
-472,8 +472,6 @[EMAIL PROTECTED]
# keep $i the same
splice(@[EMAIL PROTECTED]
$i, 1); # remove
$pending_async_events->[$i]
$max_i--;
-
- $unique_events{"$event->{name} $event->{method}"}++;
}
else { # [undef] no servers are eligible for assignment
$event_capacity_exists = 0; # there's no sense looking
at the other pending async events
@[EMAIL PROTECTED]
-484,14 +482,8 @[EMAIL PROTECTED]
$i++; # look at the next one
}
}
- my $sum_not_clear_pending_events = 0;
- for my $key (keys %unique_events) {
- if ($key ne "mvworkd _clear_pending_hotel_shop_requests") {
- $sum_not_clear_pending_events += $unique_events{$key};
- }
- }
- $self->log({level=>2},"S: dispatch_pending_async_events : exiting :
events_occurred=[$events_occurred]
events_not_clear_pending=[$sum_not_clear_pending_events] time=[" .
sprintf("%.4f", tv_interval($t0, [gettimeofday])) . "]\n") if
$self->{options}{poe_trace};
+ $self->log({level=>2},"S: dispatch_pending_async_events : exiting :
events_occurred=[$events_occurred] time=[" . sprintf("%.4f",
tv_interval($t0, [gettimeofday])) . "]\n") if $self->{options}{poe_trace};
&App::sub_exit($events_occurred) if ($App::trace);
return($events_occurred);
}
@[EMAIL PROTECTED]
-621,8 +613,10 @[EMAIL PROTECTED]
if ($callback_event) {
$callback_event->{args} = [] if (! $callback_event->{args});
my $errmsg = ($exitval || $sig) ? "Exit $exitval on $pid
[sig=$sig]" : "";
- push(@[EMAIL PROTECTED]
>{args}},
- {event_token => $callback_event->{event_token}, returnval
=> $returnval, errnum => $exitval, errmsg => $errmsg});
+ push(@[EMAIL PROTECTED]
>{args}}, { event_token =>
$callback_event->{event_token},
+ returnval => $returnval,
+ errnum => $exitval,
+ errmsg => $errmsg });
$self->send_event($callback_event);
}
elsif ($sig == 9) { # killed without a chance to finish its work
@[EMAIL PROTECTED]
-690,8 +684,6 @[EMAIL PROTECTED]
my ($async_event);
my $aborted = 0;
- #$self->log({level=>2}, "S: abort_async_event :
event_token=[$event_token] : pending_async_events: ",
Dumper($pending_async_events));
-
# first look for it in the pending list
for (my $i = 0; $i <= $#$pending_async_events; $i++) {
$async_event = $pending_async_events->[$i];
@[EMAIL PROTECTED]
-715,12 +707,10 @[EMAIL PROTECTED]
sub abort_running_async_event {
&App::sub_entry if ($App::trace);
my ($self, $runtime_event_token) = @[EMAIL PROTECTED]
#$self->log({level=>2}, "S: abort_running_async_event :
runtime_event_token=[$runtime_event_token]\n");
my $running_async_event = $self->{running_async_event};
my $pending_async_events = $self->{pending_async_events};
my $async_event = $running_async_event->{$runtime_event_token};
if ($async_event) {
- #$self->log({level=>2}, "S: abort_running_async_event :
async_event=[$async_event]\n");
$self->{num_async_events}--;
delete $self->{running_async_event}{$runtime_event_token};
$self->_abort_running_async_event($runtime_event_token,
@[EMAIL PROTECTED]
);
@[EMAIL PROTECTED]
-850,7 +840,7 @[EMAIL PROTECTED]
sub poe_yield {
&App::sub_entry if ($App::trace);
- my ($self, $kernel, $state, $max_count) = @[EMAIL PROTECTED]
my ($self, $kernel, $state, $max_count, $calling_code) = @[EMAIL PROTECTED]
$max_count ||= 1;
if (!defined($self->{poe_count}{$state})) {
@[EMAIL PROTECTED]
-860,10 +850,9 @[EMAIL PROTECTED]
$self->{poe_count}{$state}++;
}
if ($self->{poe_count}{$state} <= $max_count) {
- $kernel->yield($state);
+ $kernel->yield($state, $calling_code);
}
- #$self->log({level=>2},"POE: poe_yield : poe_count : " .
Dumper($self->{poe_count}) . "\n");
&App::sub_exit() if ($App::trace);
return;
}
@[EMAIL PROTECTED]
-879,7 +868,6 @[EMAIL PROTECTED]
$self->{poe_count}{$state} = 0;
}
- #$self->log({level=>2},"POE: poe_yield_acknowledged : poe_count : " .
Dumper($self->{poe_count}) . "\n");
&App::sub_exit() if ($App::trace);
return;
}
@[EMAIL PROTECTED]
-910,12 +898,10 @[EMAIL PROTECTED]
sub poe_sigchld {
&App::sub_entry if ($App::trace);
my ( $self, $kernel, $heap, $pid, $status ) = @[EMAIL PROTECTED]
OBJECT, KERNEL,
HEAP, ARG1, ARG2 ];
- #print STDERR "NOTICE: STATE (poe_sigchld) invoked with ($pid,
$status) args\n";
my $exitval = $status >> 8;
my $sig = $status & 255;
$self->log({level=>2},"POE: poe_sigchld (Child $pid finished
[exitval=$exitval,sig=$sig])\n") if $self->{options}{poe_trace};
$self->finish_pid($pid, $exitval, $sig);
- $self->poe_yield($kernel, "poe_dispatch_pending_async_events");
&App::sub_exit() if ($App::trace);
}
@[EMAIL PROTECTED]
-930,12 +916,15 @[EMAIL PROTECTED]
if ($main_service &&
$main_service->can("format_async_event")) {
my ($event, $callback_event) = @[EMAIL PROTECTED]
my $str = $main_service->format_async_event($event,
$callback_event);
- #$self->log({level=>2},"POE: poe_alarm :
pending_async_events : $str\n");
+ $self->log({level=>2},"POE: poe_alarm :
pending_async_events : $str\n");
}
}
}
+
+ ### This is mostly for the node, which needs this to spawn queued
execute subrequest events
+ ### without it, subrequests get acquired by the node never spawns
children to shop it
$kernel->yield("poe_dispatch_pending_async_events");
- $self->poe_yield($kernel, "poe_dispatch_pending_async_events");
+
my $time = time();
my (@[EMAIL PROTECTED]
);
my $events_occurred = 0;
@[EMAIL PROTECTED]
-944,7 +933,7 @[EMAIL PROTECTED]
$time_of_next_event = $self->get_current_events(\@[EMAIL PROTECTED]
$time);
if ($#events > -1) {
foreach my $event (@[EMAIL PROTECTED]
) {
- #$self->log({level=>2},"POE: poe_alarm :
yield(poe_run_event, $event)\n");
+ $self->log({level=>2},"POE: poe_alarm :
yield(poe_run_event, $event)\n") if $self->{options}{poe_trace};
$kernel->yield("poe_run_event", $event); # put on the
POE run queue
$events_occurred++;
}
@[EMAIL PROTECTED]
-992,14 +981,11 @[EMAIL PROTECTED]
sub poe_dispatch_pending_async_events {
&App::sub_entry if ($App::trace);
- my ( $self, $kernel, $heap ) = @[EMAIL PROTECTED]
OBJECT, KERNEL, HEAP ];
+ my ( $self, $kernel, $heap, $arg ) = @[EMAIL PROTECTED]
OBJECT, KERNEL, HEAP, ARG0
];
$self->log({level=>2},"POE: poe_dispatch_pending_async_events\n") if
$self->{options}{poe_trace};
$self->poe_yield_acknowledged("poe_dispatch_pending_async_events");
my $events_occurred = $self->dispatch_pending_async_events();
- ### These are currently causing our serious slowness during lots of
splitting, fix the dogpile problem!
- #$kernel->yield("poe_dispatch_pending_async_events") if
($events_occurred > 0);
- $self->poe_yield($kernel, "poe_dispatch_pending_async_events") if
($events_occurred > 0);
&App::sub_exit() if ($App::trace);
}
@[EMAIL PROTECTED]
-1009,7 +995,6 @[EMAIL PROTECTED]
my ( $self, $kernel, $heap ) = @[EMAIL PROTECTED]
OBJECT, KERNEL, HEAP ];
$self->log({level=>2},"POE: poe_event_loop_extension\n") if
$self->{options}{poe_trace};
my $event_loop_extensions = $self->{event_loop_extensions};
- #$self->log({level=>2},"Event Loop extension ($event_loop_extensions:
#=" . ($#$event_loop_extensions+1) . ").\n") if
$self->{options}{poe_trace};
my $async_event_added = 0;
if ($event_loop_extensions && $#$event_loop_extensions > -1) {
my ($extension, $obj, $method, $args, $event_executed);
@[EMAIL PROTECTED]
-1018,19 +1003,12 @[EMAIL PROTECTED]
($obj, $method, $args) = @[EMAIL PROTECTED]
$event_executed = $obj->$method(@[EMAIL PROTECTED]
); # execute extension
$async_event_added = 1 if ($event_executed);
- #if ($event_executed) {
- # $self->log({level=>2},"Event Loop extension:
${obj}->${method}(@[EMAIL PROTECTED]
) = $event_executed\n") if
$self->{options}{poe_trace};
- #}
}
}
- if ($async_event_added) {
- #$kernel->yield("poe_dispatch_pending_async_events");
- $self->poe_yield($kernel, "poe_dispatch_pending_async_events");
- $kernel->yield("poe_event_loop_extension");
- }
- else {
- $kernel->delay_set("poe_event_loop_extension", 1);
- }
+ $self->poe_yield($kernel, "poe_dispatch_pending_async_events", undef,
"poe_event_loop_extension");
+ ### TODO: Do we want to constrain this if there is nothing to do (to
prevent spinning unnecessary cycles)?
+ $kernel->yield("poe_event_loop_extension");
+
&App::sub_exit() if ($App::trace);
}
@[EMAIL PROTECTED]
-1113,16 +1091,10 @[EMAIL PROTECTED]
if ($callback_event) {
$callback_event->{args} = $callback_args;
- if ($runtime_event_token && $callback_event && $callback_args
&& $callback_args->[0]) {
- $self->log({level=>2},"NOTE:
poe_remote_async_event_finished calling send_event:
runtime_event_token[$runtime_event_token]
callback_event[$callback_event->{method}]
subrequest_id[$callback_args->[0]{subrequest_id}]\n");
- }
- else {
- $self->log({level=>2},"NOTE:
poe_remote_async_event_finished calling send_event: cannot print extra
vars\n");
- }
$self->send_event($callback_event);
}
else {
- $self->log({level=>2},"NOTE: poe_remote_async_event_finished
called without callback_event :
runtime_event_token[$runtime_event_token]\n");
+ $self->log({level=>2},"Server: WARNING :
poe_remote_async_event_finished called without callback_event :
runtime_event_token[$runtime_event_token]\n");
}
}
else {
@[EMAIL PROTECTED]
-1135,7 +1107,6 @[EMAIL PROTECTED]
sub poe_server_state {
&App::sub_entry if ($App::trace);
my ( $self, $kernel, $heap ) = @[EMAIL PROTECTED]
OBJECT, KERNEL, HEAP ];
- #$self->log({level=>2},"POE: poe_server_state\n") if
$self->{options}{poe_trace};
my $server_state = $self->state();
&App::sub_exit($server_state) if ($App::trace);
return $server_state;


|