Nope, you're not missing anything. There is a bug in there. Taking a look right now...
On Tue, Sep 25, 2012 at 2:49 PM, Aliquip <aliq...@gmail.com> wrote: > ActiveMQ (5.7 snapshot) and MQTT: connection times out while still > connected > to a topic for receiving > > I'm using mosquitto's python client to connect to an mqtt topic. When using > the mosquitto broker, > all works (but that broker doesn't allow me to plug in authentication to my > database, so it's no real option). > > When using activemq, the connection is disconnected. The clients does ping, > and the broker does respond. But it doesn't have seem an effect as the > connection still times out. > > Am i missing some configuration option? > > > broker config (scala): > =================================================================== > val broker = new BrokerService() > broker.setPlugins(Array(new UpoAuthenticationPlugin(admin_user, admin_pw))) > > broker.addConnector("mqtt+nio://127.0.0.1:1883") > broker.addConnector("vm://localhost") > broker.start() > =================================================================== > > python client: > =================================================================== > import mosquitto > import os > import time > > client = mosquitto.Mosquitto("test-client") > client.username_pw_set("user", "password") > > def on_connect(mosq, obj, rc): > if rc == 0: > print("Connected successfully.") > > client.on_connect = on_connect > > def on_disconnect(mosq, obj, rc): > print("Disconnected successfully.") > > client.on_disconnect = on_disconnect > > def on_publish(mosq, obj, mid): > print("Message "+str(mid)+" published.") > > client.on_publish = on_publish > > def on_message(mosq, obj, msg): > print("Message received on topic "+msg.topic+" with QoS > "+str(msg.qos)+" > and payload "+msg.payload) > > client.on_message = on_message > > def on_subscribe(mosq, obj, mid, qos_list): > print("Subscribe with mid "+str(mid)+" received.") > > client.on_subscribe = on_subscribe > > client.connect("127.0.0.1", keepalive=5) > > client.subscribe("groups/Nameless", 1) > client.publish("groups/Nameless", "hello world", 1) > > while True: > print "loop" , client.loop() > time.sleep(2) > pass > =================================================================== > > ActiveMQ log > =================================================================== > [debug] o.a.a.t.m.MQTTProtocolConverter - MQTT Client connected. > [debug] o.a.a.b.r.AbstractRegion - localhost adding consumer: > ID:Jeroen-Laptop-60782-1348606334006-9:4:-1:1 for destination: > topic://groups.Nameless > [debug] o.a.a.b.r.AbstractRegion - localhost adding destination: > topic://ActiveMQ.Advisory.Consumer.Topic.groups.Nameless > [trace] o.a.a.t.PooledTaskRunner - Running task iteration 0 - Transport > Connection to: tcp://127.0.0.1:61332 > [trace] o.a.a.t.PooledTaskRunner - Running task iteration 1 - Transport > Connection to: tcp://127.0.0.1:61332 > [trace] o.a.a.t.PooledTaskRunner - Run task done: Transport Connection to: > tcp://127.0.0.1:61332 > [debug] o.a.a.s.k.MessageDatabase - Checkpoint started. > [debug] o.a.a.s.k.MessageDatabase - Checkpoint done. > [debug] o.a.a.t.m.MQTTProtocolConverter - Sent Ping Response to > [debug] o.a.a.b.r.Queue - queue://test expiring messages .. > [debug] o.a.a.b.r.Queue - test toPageIn: 0, Inflight: 0, > pagedInMessages.size 0, enqueueCount: 0, dequeueCount: 0 > [trace] o.a.a.b.r.c.AbstractStoreCursor - > org.apache.activemq.broker.region.cursors > QueueStorePrefetch@a7fd9bb > :test,batchResetNeeded=false,storeHasMessages=false,size=0,cacheEnabled=true,maxBatchSize:200 > - fillBatch > [trace] o.a.a.b.r.c.AbstractStoreCursor - > org.apache.activemq.broker.region.cursors.QueueStorePrefetch@a7fd9bb > :test,batchResetNeeded=false,storeHasMessages=false,size=0,cacheEnabled=true,maxBatchSize:200 > - fillBatch > [debug] o.a.a.b.r.Queue - queue://test expiring messages done. > [trace] o.a.a.t.PooledTaskRunner - Running task iteration 0 - queue://test, > subscriptions=0, memory=0%, size=0, in flight groups=null > [debug] o.a.a.b.r.Queue - queue://test expiring messages .. > [trace] o.a.a.t.PooledTaskRunner - Run task done: queue://test, > subscriptions=0, memory=0%, size=0, in flight groups=null > [debug] o.a.a.b.r.Queue - test toPageIn: 0, Inflight: 0, > pagedInMessages.size 0, enqueueCount: 0, dequeueCount: 0 > [trace] o.a.a.b.r.c.AbstractStoreCursor - > org.apache.activemq.broker.region.cursors.QueueStorePrefetch@a7fd9bb > :test,batchResetNeeded=false,storeHasMessages=false,size=0,cacheEnabled=true,maxBatchSize:200 > - fillBatch > [trace] o.a.a.b.r.c.AbstractStoreCursor - > org.apache.activemq.broker.region.cursors.QueueStorePrefetch@a7fd9bb > :test,batchResetNeeded=false,storeHasMessages=false,size=0,cacheEnabled=true,maxBatchSize:200 > - fillBatch > [debug] o.a.a.b.r.Queue - queue://test expiring messages done. > [trace] o.a.a.t.PooledTaskRunner - Running task iteration 0 - queue://test, > subscriptions=0, memory=0%, size=0, in flight groups=null > [trace] o.a.a.t.PooledTaskRunner - Run task done: queue://test, > subscriptions=0, memory=0%, size=0, in flight groups=null > [debug] o.a.a.s.k.MessageDatabase - Checkpoint started. > [trace] o.a.a.s.k.MessageDatabase - Last update: 1:3698624, full gc > candidates set: [1] > [trace] o.a.a.s.k.MessageDatabase - gc candidates after first tx:1:3698624, > [] > [trace] o.a.a.s.k.MessageDatabase - gc candidates: [] > [debug] o.a.a.s.k.MessageDatabase - Checkpoint done. > [debug] o.a.a.t.m.MQTTProtocolConverter - Sent Ping Response to > [debug] o.a.a.s.k.MessageDatabase - Checkpoint started. > [debug] o.a.a.s.k.MessageDatabase - Checkpoint done. > [debug] o.a.a.t.m.MQTTProtocolConverter - Sent Ping Response to > [debug] o.a.a.s.k.MessageDatabase - Checkpoint started. > [debug] o.a.a.s.k.MessageDatabase - Checkpoint done. > [debug] o.a.a.t.m.MQTTProtocolConverter - Sent Ping Response to > [debug] o.a.a.s.k.MessageDatabase - Checkpoint started. > [debug] o.a.a.s.k.MessageDatabase - Checkpoint done. > [debug] o.a.a.t.m.MQTTProtocolConverter - Sent Ping Response to > [debug] o.a.a.s.k.MessageDatabase - Checkpoint started. > [debug] o.a.a.s.k.MessageDatabase - Checkpoint done. > [trace] o.a.a.t.m.MQTTInactivityMonitor - Message received since last read > check, resetting flag: > [debug] o.a.a.t.m.MQTTProtocolConverter - Sent Ping Response to > [debug] o.a.a.s.k.MessageDatabase - Checkpoint started. > [debug] o.a.a.s.k.MessageDatabase - Checkpoint done. > [debug] o.a.a.t.m.MQTTProtocolConverter - Sent Ping Response to > [debug] o.a.a.b.r.Queue - queue://test expiring messages .. > [debug] o.a.a.b.r.Queue - test toPageIn: 0, Inflight: 0, > pagedInMessages.size 0, enqueueCount: 0, dequeueCount: 0 > [trace] o.a.a.b.r.c.AbstractStoreCursor - > org.apache.activemq.broker.region.cursors.QueueStorePrefetch@a7fd9bb > :test,batchResetNeeded=false,storeHasMessages=false,size=0,cacheEnabled=true,maxBatchSize:200 > - fillBatch > [trace] o.a.a.b.r.c.AbstractStoreCursor - > org.apache.activemq.broker.region.cursors.QueueStorePrefetch@a7fd9bb > :test,batchResetNeeded=false,storeHasMessages=false,size=0,cacheEnabled=true,maxBatchSize:200 > - fillBatch > [debug] o.a.a.b.r.Queue - queue://test expiring messages done. > [trace] o.a.a.t.PooledTaskRunner - Running task iteration 0 - queue://test, > subscriptions=0, memory=0%, size=0, in flight groups=null > [debug] o.a.a.b.r.Queue - queue://test expiring messages .. > [trace] o.a.a.t.PooledTaskRunner - Run task done: queue://test, > subscriptions=0, memory=0%, size=0, in flight groups=null > [debug] o.a.a.b.r.Queue - test toPageIn: 0, Inflight: 0, > pagedInMessages.size 0, enqueueCount: 0, dequeueCount: 0 > [trace] o.a.a.b.r.c.AbstractStoreCursor - > org.apache.activemq.broker.region.cursors.QueueStorePrefetch@a7fd9bb > :test,batchResetNeeded=false,storeHasMessages=false,size=0,cacheEnabled=true,maxBatchSize:200 > - fillBatch > [trace] o.a.a.b.r.c.AbstractStoreCursor - > org.apache.activemq.broker.region.cursors.QueueStorePrefetch@a7fd9bb > :test,batchResetNeeded=false,storeHasMessages=false,size=0,cacheEnabled=true,maxBatchSize:200 > - fillBatch > [debug] o.a.a.b.r.Queue - queue://test expiring messages done. > [trace] o.a.a.t.PooledTaskRunner - Running task iteration 0 - queue://test, > subscriptions=0, memory=0%, size=0, in flight groups=null > [trace] o.a.a.t.PooledTaskRunner - Run task done: queue://test, > subscriptions=0, memory=0%, size=0, in flight groups=null > [debug] o.a.a.s.k.MessageDatabase - Checkpoint started. > [trace] o.a.a.s.k.MessageDatabase - Last update: 1:3701042, full gc > candidates set: [1] > [trace] o.a.a.s.k.MessageDatabase - gc candidates after first tx:1:3701042, > [] > [trace] o.a.a.s.k.MessageDatabase - gc candidates: [] > [debug] o.a.a.s.k.MessageDatabase - Checkpoint done. > [debug] o.a.a.t.m.MQTTProtocolConverter - Sent Ping Response to > [debug] o.a.a.s.k.MessageDatabase - Checkpoint started. > [debug] o.a.a.s.k.MessageDatabase - Checkpoint done. > [debug] o.a.a.t.m.MQTTProtocolConverter - Sent Ping Response to > [debug] o.a.a.s.k.MessageDatabase - Checkpoint started. > [debug] o.a.a.s.k.MessageDatabase - Checkpoint done. > [debug] o.a.a.t.m.MQTTProtocolConverter - Sent Ping Response to > [debug] o.a.a.s.k.MessageDatabase - Checkpoint started. > [debug] o.a.a.s.k.MessageDatabase - Checkpoint done. > [debug] o.a.a.t.m.MQTTProtocolConverter - Sent Ping Response to > [debug] o.a.a.s.k.MessageDatabase - Checkpoint started. > [debug] o.a.a.s.k.MessageDatabase - Checkpoint done. > [debug] o.a.a.t.m.MQTTInactivityMonitor - 30001 ms elapsed since last read > check. > [debug] o.a.a.t.m.MQTTInactivityMonitor - No message received since last > read check for tcp:///127.0.0.1:61332@1883! Throwing > InactivityIOException. > [trace] o.a.a.u.ThreadPoolUtils - Shutdown of ExecutorService: > java.util.concurrent.ThreadPoolExecutor@6e3271eb with await termination: 0 > millis > [debug] o.a.a.u.ThreadPoolUtils - Shutdown of ExecutorService: > java.util.concurrent.ThreadPoolExecutor@6e3271eb is shutdown: true and > terminated: false took: 0.014 seconds. > [debug] o.a.a.b.T.Transport - Transport Connection to: tcp:// > 127.0.0.1:61332 > failed: org.apache.activemq.transport.InactivityIOException: Channel was > inactive for too (>5000) long: > tcp://127.0.0.1:61332org.apache.activemq.transport.InactivityIOException: > Channel was inactive for too (>5000) long: tcp://127.0.0.1:61332 > at > > org.apache.activemq.transport.mqtt.MQTTInactivityMonitor$2.run(MQTTInactivityMonitor.java:133) > [activemq-core-5.7-SNAPSHOT.jar:5.7-SNAPSHOT] > at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(Unknown > Source) [na:1.6.0_24] > at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown > Source) [na:1.6.0_24] > at java.lang.Thread.run(Unknown Source) [na:1.6.0_24] > [debug] o.a.a.b.j.ManagementContext - Unregistering MBean > > org.apache.activemq:BrokerName=localhost,Type=Connection,ConnectorName=mqtt+nio_//localhost_1883,Connection=test-client > [debug] o.a.a.b.j.ManagementContext - Unregistering MBean > > org.apache.activemq:BrokerName=localhost,Type=Connection,ConnectorName=mqtt+nio_//localhost_1883,ViewType=address,Name=tcp_//127.0.0.1_61332 > [trace] o.a.a.t.TaskRunnerFactory - Execute[ActiveMQ > BrokerService[localhost] Task] runnable: > org.apache.activemq.broker.TransportConnection$4@79bd7026 > [debug] o.a.a.b.TransportConnection - Stopping connection: > tcp://127.0.0.1:61332 > [debug] o.a.a.t.t.TcpTransport - Stopping transport > tcp:///127.0.0.1:61332@1883 > [debug] o.a.a.t.TaskRunnerFactory - Initialized TaskRunnerFactory[ActiveMQ > Task] using ExecutorService: > java.util.concurrent.ThreadPoolExecutor@40e0d3b > [trace] o.a.a.t.TaskRunnerFactory - Execute[ActiveMQ Task] runnable: > org.apache.activemq.transport.tcp.TcpTransport$1@36869e91 > [trace] o.a.a.t.TaskRunnerFactory - Created thread[ActiveMQ Task-1]: > Thread[ActiveMQ Task-1,5,main] > [trace] o.a.a.t.t.TcpTransport - Closing socket > Socket[addr=/127.0.0.1,port=61332,localport=1883] > [debug] o.a.a.t.t.TcpTransport - Closed socket Socket[unconnected] > [debug] o.a.a.u.ThreadPoolUtils - Forcing shutdown of ExecutorService: > java.util.concurrent.ThreadPoolExecutor@40e0d3b > [trace] o.a.a.u.ThreadPoolUtils - Shutdown of ExecutorService: > java.util.concurrent.ThreadPoolExecutor@40e0d3b is shutdown: true and > terminated: false. > [debug] o.a.a.b.TransportConnection - Stopped transport: null > [trace] o.a.a.t.PooledTaskRunner - Shutdown timeout: Transport Connection > to: null task: {} > [debug] o.a.a.b.TransportConnection - Cleaning up connection resources: > null > [debug] o.a.a.b.TransportConnection - remove connection id: > ID:Jeroen-Laptop-60782-1348606334006-9:4 > [debug] o.a.a.b.j.ManagementContext - Unregistering MBean > > org.apache.activemq:BrokerName=localhost,Type=Subscription,persistentMode=Non-Durable,destinationType=Topic,destinationName=groups.Nameless,clientId=test-client,consumerId=ID_Jeroen-Laptop-60782-1348606334006-9_4_-1_1 > [debug] o.a.a.b.r.AbstractRegion - localhost removing consumer: > ID:Jeroen-Laptop-60782-1348606334006-9:4:-1:1 for destination: > topic://groups.Nameless > [debug] o.a.a.b.j.ManagementContext - Unregistering MBean > > org.apache.activemq:BrokerName=localhost,Type=Producer,destinationType=Dynamic,clientId=test-client,producerId=ID_Jeroen-Laptop-60782-1348606334006-9_4_-1_1 > [debug] o.a.a.b.TransportConnection - Connection Stopped: null > [debug] o.a.a.s.k.MessageDatabase - Checkpoint started. > [debug] o.a.a.s.k.MessageDatabase - Checkpoint done. > [debug] o.a.a.b.r.Queue - queue://test expiring messages .. > [debug] o.a.a.b.r.Queue - test toPageIn: 0, Inflight: 0, > pagedInMessages.size 0, enqueueCount: 0, dequeueCount: 0 > [trace] o.a.a.b.r.c.AbstractStoreCursor - > org.apache.activemq.broker.region.cursors.QueueStorePrefetch@a7fd9bb > :test,batchResetNeeded=false,storeHasMessages=false,size=0,cacheEnabled=true,maxBatchSize:200 > - fillBatch > [trace] o.a.a.b.r.c.AbstractStoreCursor - > org.apache.activemq.broker.region.cursors.QueueStorePrefetch@a7fd9bb > :test,batchResetNeeded=false,storeHasMessages=false,size=0,cacheEnabled=true,maxBatchSize:200 > - fillBatch > [debug] o.a.a.b.r.Queue - queue://test expiring messages done. > [trace] o.a.a.t.PooledTaskRunner - Running task iteration 0 - queue://test, > subscriptions=0, memory=0%, size=0, in flight groups=null > [debug] o.a.a.b.r.Queue - queue://test expiring messages .. > [trace] o.a.a.t.PooledTaskRunner - Run task done: queue://test, > subscriptions=0, memory=0%, size=0, in flight groups=null > [debug] o.a.a.b.r.Queue - test toPageIn: 0, Inflight: 0, > pagedInMessages.size 0, enqueueCount: 0, dequeueCount: 0 > [trace] o.a.a.b.r.c.AbstractStoreCursor - > org.apache.activemq.broker.region.cursors.QueueStorePrefetch@a7fd9bb > :test,batchResetNeeded=false,storeHasMessages=false,size=0,cacheEnabled=true,maxBatchSize:200 > - fillBatch > [trace] o.a.a.b.r.c.AbstractStoreCursor - > org.apache.activemq.broker.region.cursors.QueueStorePrefetch@a7fd9bb > :test,batchResetNeeded=false,storeHasMessages=false,size=0,cacheEnabled=true,maxBatchSize:200 > - fillBatch > [debug] o.a.a.b.r.Queue - queue://test expiring messages done. > [trace] o.a.a.t.PooledTaskRunner - Running task iteration 0 - queue://test, > subscriptions=0, memory=0%, size=0, in flight groups=null > [trace] o.a.a.t.PooledTaskRunner - Run task done: queue://test, > subscriptions=0, memory=0%, size=0, in flight groups=null > =================================================================== > > > > -- > View this message in context: > http://activemq.2283324.n4.nabble.com/ActiveMQ-5-7-snapshot-and-MQTT-connection-times-out-while-still-connected-to-a-topic-for-receiving-tp4656976.html > Sent from the ActiveMQ - User mailing list archive at Nabble.com. > -- *Christian Posta* http://www.christianposta.com/blog