Hi Josh,
that's the job of inactivity monitor when using the OpenWire.
Unfortunately Stomp doesn't support that in version 1.0 and it
is something we want to add in the next version of the spec.
Maybe implementing something like that on the application level
would help in your case?
Cheers
--
Dejan Bosanac - http://twitter.com/dejanb
Open Source Integration - http://fusesource.com/
ActiveMQ in Action - http://www.manning.com/snyder/
Blog - http://www.nighttale.net
On Wed, Apr 14, 2010 at 5:41 PM, Josh Carlson
<jcarl...@e-dialog.com <mailto:jcarl...@e-dialog.com>> wrote:
Hmm. If a timeout was the solution to this problem how would
you be able to tell the difference between something being
wrong and the client just being slow.
I did an strace on the server and discovered how the timeout
is being used. As a parameter to poll
6805 10:31:15 poll([{fd=94, events=POLLIN|POLLERR}], 1,
60000 <unfinished ...>
6805 10:31:15 <... poll resumed> ) = 1 ([{fd=94,
revents=POLLIN}])
6805 10:31:15 recvfrom(94, "CONNECT\npasscode:...."...,
8192, 0, NULL, NULL) = 39
6805 10:31:15 sendto(94,
"CONNECTED\nsession:ID:mmq1-40144-"..., 53, 0, NULL, 0) = 53
6805 10:31:15 poll([{fd=94, events=POLLIN|POLLERR}], 1,
60000) = 1 ([{fd=94, revents=POLLIN}])
6805 10:31:15 recvfrom(94,
"SUBSCRIBE\nactivemq.prefetchSize:"..., 8192, 0, NULL, NULL)
= 138
6805 10:31:15 sendto(94,
"RECEIPT\nreceipt-id:39ef0e071a549"..., 55, 0, NULL, 0) = 55
6805 10:31:15 poll([{fd=94, events=POLLIN|POLLERR}], 1,
60000 <unfinished ...>
6805 10:32:15 <... poll resumed> ) = 0 (Timeout)
6805 10:32:15 poll([{fd=94, events=POLLIN|POLLERR}], 1,
60000 <unfinished ...>
6805 10:33:15 <... poll resumed> ) = 0 (Timeout)
6805 10:33:15 poll([{fd=94, events=POLLIN|POLLERR}], 1,
60000 <unfinished ...>
6805 10:34:15 <... poll resumed> ) = 0 (Timeout)
In the output above I stripped lines that were not
operations directly on the socket. The poll Timeouts
continue on ... with nothing in between.
[r...@mmq1 tmp]# lsof -p 6755 | grep mmq1
java 6755 root 85u IPv6 1036912
TCP mmq1.eng.e-dialog.com:61613
<http://mmq1.eng.e-dialog.com:61613> (LISTEN)
java 6755 root 92u IPv6 1038039
TCP mmq1.eng.e-dialog.com:61613->10.0.13.230:46542
<http://10.0.13.230:46542> (ESTABLISHED)
java 6755 root 94u IPv6 1036997
TCP
mmq1.eng.e-dialog.com:61613->mmd2.eng.e-dialog.com:41743
<http://mmd2.eng.e-dialog.com:41743> (ESTABLISHED)
The connection to mmd2 is the host that is gone. The one to
10.0.13.230 is up and active. When I kill -9 the process on
10.0.13.230 I see this in the logs:
2010-04-13 17:13:55,322 | DEBUG | Transport failed:
java.io.EOFException |
org.apache.activemq.broker.TransportConnection.Transport |
ActiveMQ Transport: tcp:///10.0.13.230:45463
<http://10.0.13.230:45463>
java.io.EOFException
at java.io.DataInputStream.readByte(Unknown Source)
at
org.apache.activemq.transport.stomp.StompWireFormat.readLine(StompWireFormat.java:186)
at
org.apache.activemq.transport.stomp.StompWireFormat.unmarshal(StompWireFormat.java:94)
at
org.apache.activemq.transport.tcp.TcpTransport.readCommand(TcpTransport.java:211)
at
org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:203)
at
org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:186)
at java.lang.Thread.run(Unknown Source)
2010-04-13 17:13:55,325 | DEBUG | Stopping connection:
/10.0.13.230:45463 <http://10.0.13.230:45463> |
org.apache.activemq.broker.TransportConnection | ActiveMQ Task
2010-04-13 17:13:55,325 | DEBUG | Stopping transport
tcp:///10.0.13.230:45463 <http://10.0.13.230:45463> |
org.apache.activemq.transport.tcp.TcpTransport | ActiveMQ Task
2010-04-13 17:13:55,326 | DEBUG | Stopped transport:
/10.0.13.230:45463 <http://10.0.13.230:45463> |
org.apache.activemq.broker.TransportConnection | ActiveMQ Task
2010-04-13 17:13:55,327 | DEBUG | Cleaning up connection
resources: /10.0.13.230:45463 <http://10.0.13.230:45463> |
org.apache.activemq.broker.TransportConnection | ActiveMQ Task
2010-04-13 17:13:55,327 | DEBUG | remove connection id:
ID:mmq1-58415-1271193024658-2:3 |
org.apache.activemq.broker.TransportConnection | ActiveMQ Task
2010-04-13 17:13:55,328 | DEBUG | masterb1 removing
consumer: ID:mmq1-58415-1271193024658-2:3:-1:1 for
destination: queue://Producer/TESTING/weight/three/ |
org.apache.activemq.broker.region.AbstractRegion | ActiveMQ Task
Which is what I want to happen when the host goes down.
It seems to be that something should be noticing that the
connection is really gone. Maybe this is more of a kernel
issue. I would think that when the poll is done that it
would trigger the connection to move from the ESTABLISHED
state and get closed.
We are using Linux, kernel version 2.6.18, but I've seen
this same issue on a range of different 2.6 versions.
-Josh
On 04/14/2010 09:38 AM, Josh Carlson wrote:
Thanks Gary for the, as usual, helpful information.
It looks like the broker maybe suffering from exactly
the same problem
we encountered when implementing client-side failover.
Namely that when
the master broker went down a subsequent read on the
socket by the
client would hang (well actually take a very long time
to fail/timeout).
In that case our TCP connection was ESTABLISHED and
looking at the
broker I see the same thing after the client host goes
away (the
connection is ESTABLISHED). We fixed this issue in our
client by setting
the socket option SO_RCVTIMEO on the connection to the
broker.
I noted what the broker appears to do the same thing
with the TCP
transport option soTimeout. It looks like when this is
set it winds up
as a call to java.net.Socket.setSoTimeout when the
socket is getting
initialized. I have not done any socket programming in
Java but my
assumption is that SO_TIMEOUT maps to both SO_RCVTIMEO
and SO_SNDTIMEO
in the C world.
I was hopeful with this option but when I set in in my
transport connector:
<transportConnector name="stomp"
uri="stomp://mmq1:61613?soTimeout=60000"/>
the timeout does not occur. I actually ran my test case
about 15 hours
ago and I can still see that the broker still has an
ESTABLISHED
connection to the dead client and has a message
dispatched to it.
Am I miss understanding what soTimeout is for? I can see in
org.apache.activemq.transport.tcp.TcpTransport.initialiseSocket
that
setSoTimeout is getting called unconditionally. So what
I'm wondering is
if it is actually calling it with a 0 value despite the
way I set up my
transport connector. I suppose setting this to 0 would
explain why it
apparently never times out where in our client case it
eventually did
timeout (because we were not setting the option at all
before).
On 04/14/2010 05:15 AM, Gary Tully wrote:
The re-dispatch is triggered by the tcp connection
dying, netstat can
help with the diagnosis here. Check the connection
state of the broker
port after the client host is rebooted, if the
connection is still
active, possibly in a timed_wait state, you may need
to configure some
additional timeout options on the broker side.
On 13 April 2010 19:43, Josh
Carlson<jcarl...@e-dialog.com
<mailto:jcarl...@e-dialog.com>
<mailto:jcarl...@e-dialog.com
<mailto:jcarl...@e-dialog.com>>> wrote:
I am using client acknowledgements with a
prefetch size of 1 with
no message expiration policy. When a consumer
subscribes to a
queue I can see that the message gets dispatched
correctly. If the
process gets killed before retrieving and
acknowledging the
message I see the message getting re-dispatched
(correctly). I
expected this same behaviour if the host running
the process gets
rebooted or crashes. However, after reboot I can
see that the
message is stuck in the dispatched state to the
consumer that is
long gone. Is there a way that I can get
messages re-dispatched
when a host hosting consumer processes gets
re-booted? How does it
detect the case when a process dies (even with
SIGKILL)?
I did notice that if I increase my prefetch size
and enqueue
another message after the reboot, that activemq
will re-dispatch
the original message. However with prefetch size
equal to one the
message never seems to get re-dispatched.
--
http://blog.garytully.com
Open Source Integration
http://fusesource.com