Here is a patch (against latest CVS version) that implements an AutoPause feature in POE::Wheel::SocketFactory. It includes documentation but no test case, as yet, because Rocco has allowed that he doesn't like this approche to the problem.
AutoPause, when on, basically calls pause_accept() on itself as soon as a new connection is accept()ed. This allows atomic control of concurrent connections at a low level. -Philip "Leolo" Gwyn
? CHANGES ? META.yml ? Makefile ? blib ? pm_to_blib ? poe_report.xml ? run_network_tests ? test-output.err ? t/20_resources/10_perl ? t/20_resources/20_xs ? t/30_loops/10_select ? t/30_loops/20_poll ? t/30_loops/30_event ? t/30_loops/40_gtk ? t/30_loops/50_tk Index: lib/POE/Wheel/SocketFactory.pm =================================================================== RCS file: /cvsroot/poe/poe/lib/POE/Wheel/SocketFactory.pm,v retrieving revision 1.78 diff -u -r1.78 SocketFactory.pm --- lib/POE/Wheel/SocketFactory.pm 14 Oct 2004 04:24:20 -0000 1.78 +++ lib/POE/Wheel/SocketFactory.pm 20 Oct 2004 19:28:10 -0000 @@ -33,6 +33,7 @@ sub MY_SOCKET_TYPE () { 10 } sub MY_STATE_ERROR () { 11 } sub MY_SOCKET_SELECTED () { 12 } +sub MY_AUTOPAUSE () { 13 } # Fletch has subclassed SSLSocketFactory from SocketFactory. He's # added new members after MY_SOCKET_SELECTED. Be sure, if you extend @@ -177,6 +178,10 @@ my $event_success = \$self->[MY_EVENT_SUCCESS]; my $event_failure = \$self->[MY_EVENT_FAILURE]; my $unique_id = $self->[MY_UNIQUE_ID]; + my $socket_handle = \$self->[MY_SOCKET_HANDLE]; + my $autopause = \$self->[MY_AUTOPAUSE]; + my $state_accept = \$self->[MY_STATE_ACCEPT]; + my $socket_selected = \$self->[MY_SOCKET_SELECTED]; $poe_kernel->state ( $self->[MY_STATE_ACCEPT] = ref($self) . "($unique_id) -> select accept", @@ -205,6 +210,11 @@ else { die "sanity failure: socket domain == $domain"; } + if($$autopause) { + warn "$$: AUTOPAUSE\n"; + _pause_accept($socket_handle, + $state_accept, $socket_selected); + } $k->call( $me, $$event_success, $new_socket, $peer_addr, $peer_port, $unique_id @@ -471,6 +481,7 @@ croak 'FailureEvent required' unless (defined $params{FailureEvent}); my $event_success = $params{SuccessEvent}; my $event_failure = $params{FailureEvent}; + my $autopause = $params{AutoPause}||0; # Create the SocketServer. Cache a copy of the socket handle. my $socket_handle = gensym(); @@ -488,6 +499,7 @@ undef, # MY_SOCKET_TYPE undef, # MY_STATE_ERROR undef, # MY_SOCKET_SELECTED + $autopause, # MY_AUTOPAUSE ], $type ); @@ -1045,24 +1057,47 @@ } # Pause and resume accept. +sub _pause_accept { + my($socket_handle, $state_accept, $socket_selected)[EMAIL PROTECTED]; + if ( defined $$socket_handle and + defined $$state_accept and + defined $$socket_selected + ) { + # warn "$$: RESUME\n"; + $poe_kernel->select_pause_read($$socket_handle); + } +} sub pause_accept { my $self = shift; - if ( defined $self->[MY_SOCKET_HANDLE] and - defined $self->[MY_STATE_ACCEPT] and - defined $self->[MY_SOCKET_SELECTED] + _pause_accept(\$self->[MY_SOCKET_HANDLE], + \$self->[MY_STATE_ACCEPT], + \$self->[MY_SOCKET_SELECTED]); +} + +sub _resume_accept { + my($socket_handle, $state_accept, $socket_selected)[EMAIL PROTECTED]; + if ( defined $$socket_handle and + defined $$state_accept and + defined $$socket_selected ) { - $poe_kernel->select_pause_read($self->[MY_SOCKET_HANDLE]); + # warn "$$: RESUME\n"; + $poe_kernel->select_resume_read($$socket_handle); } -} - +} sub resume_accept { my $self = shift; - if ( defined $self->[MY_SOCKET_HANDLE] and - defined $self->[MY_STATE_ACCEPT] and - defined $self->[MY_SOCKET_SELECTED] - ) { - $poe_kernel->select_resume_read($self->[MY_SOCKET_HANDLE]); + _resume_accept(\$self->[MY_SOCKET_HANDLE], + \$self->[MY_STATE_ACCEPT], + \$self->[MY_SOCKET_SELECTED]); +} + +sub autopause { + my $self = shift; + if(1 <= @_) { +# warn "Turning socket's AUTOPAUSE ".($_[0] ? 'on' : 'off'); + $self->[MY_AUTOPAUSE]=$_[0]; } + $self->[MY_AUTOPAUSE]; } #------------------------------------------------------------------------------ @@ -1073,6 +1108,7 @@ sub DESTROY { my $self = shift; + # warn "$$: DESTROY\n"; _shutdown( \$self->[MY_SOCKET_SELECTED], \$self->[MY_SOCKET_HANDLE], @@ -1186,6 +1222,7 @@ $wheel->ID(); + $wheel->autopause(1); $wheel->pause_accept(); $wheel->resume_accept(); @@ -1306,10 +1343,23 @@ =back -This parameter is used for listening sockets. +These parameters are used for listening sockets. =over 2 +=item AutoPause + +If you want to prevent, or at least control, concurrent connections, you +would normaly call pause_accept() as soon as possible after SocketFactory +has accepted a new connection. AutoPause moves this task into +SocketFactory. If AutoPause is on, SocketFactory will call pause_accept() +as soon as it's accepted a new connection. If AutoPause is off (the +default), it's up to the user code to do this, if it's needed. + +AutoPause can be queried and set after creation via the L<autopause> member +function. + + =item ListenQueue ListenQueue specifies the length of the socket's listen() queue. It @@ -1355,6 +1405,22 @@ processes start, they call resume_accept() to begin accepting connections. +=item autopause + +autopause() allows you to query and modify the SocketFactory's AutoPause +setting. Calling autopause() with a parameter allows you to set or clear +AutoPause. + + $wheel->autopause(1); # turn it on + $wheel->autopause(0); # turn it off + +autopause() also returns the state of AutoPause: + + my $autop=$wheel->autopause(); + $wheel->autopause(1); + .... + $wheel->autopause($autop); + =back =head1 EVENTS AND PARAMETERS