ActiveMQ (5.7 snapshot) and MQTT: connection times out while still connected to a topic for receiving
I'm using mosquitto's python client to connect to an mqtt topic. When using the mosquitto broker, all works (but that broker doesn't allow me to plug in authentication to my database, so it's no real option). When using activemq, the connection is disconnected. The clients does ping, and the broker does respond. But it doesn't have seem an effect as the connection still times out. Am i missing some configuration option? broker config (scala): =================================================================== val broker = new BrokerService() broker.setPlugins(Array(new UpoAuthenticationPlugin(admin_user, admin_pw))) broker.addConnector("mqtt+nio://127.0.0.1:1883") broker.addConnector("vm://localhost") broker.start() =================================================================== python client: =================================================================== import mosquitto import os import time client = mosquitto.Mosquitto("test-client") client.username_pw_set("user", "password") def on_connect(mosq, obj, rc): if rc == 0: print("Connected successfully.") client.on_connect = on_connect def on_disconnect(mosq, obj, rc): print("Disconnected successfully.") client.on_disconnect = on_disconnect def on_publish(mosq, obj, mid): print("Message "+str(mid)+" published.") client.on_publish = on_publish def on_message(mosq, obj, msg): print("Message received on topic "+msg.topic+" with QoS "+str(msg.qos)+" and payload "+msg.payload) client.on_message = on_message def on_subscribe(mosq, obj, mid, qos_list): print("Subscribe with mid "+str(mid)+" received.") client.on_subscribe = on_subscribe client.connect("127.0.0.1", keepalive=5) client.subscribe("groups/Nameless", 1) client.publish("groups/Nameless", "hello world", 1) while True: print "loop" , client.loop() time.sleep(2) pass =================================================================== ActiveMQ log =================================================================== [debug] o.a.a.t.m.MQTTProtocolConverter - MQTT Client connected. [debug] o.a.a.b.r.AbstractRegion - localhost adding consumer: ID:Jeroen-Laptop-60782-1348606334006-9:4:-1:1 for destination: topic://groups.Nameless [debug] o.a.a.b.r.AbstractRegion - localhost adding destination: topic://ActiveMQ.Advisory.Consumer.Topic.groups.Nameless [trace] o.a.a.t.PooledTaskRunner - Running task iteration 0 - Transport Connection to: tcp://127.0.0.1:61332 [trace] o.a.a.t.PooledTaskRunner - Running task iteration 1 - Transport Connection to: tcp://127.0.0.1:61332 [trace] o.a.a.t.PooledTaskRunner - Run task done: Transport Connection to: tcp://127.0.0.1:61332 [debug] o.a.a.s.k.MessageDatabase - Checkpoint started. [debug] o.a.a.s.k.MessageDatabase - Checkpoint done. [debug] o.a.a.t.m.MQTTProtocolConverter - Sent Ping Response to [debug] o.a.a.b.r.Queue - queue://test expiring messages .. [debug] o.a.a.b.r.Queue - test toPageIn: 0, Inflight: 0, pagedInMessages.size 0, enqueueCount: 0, dequeueCount: 0 [trace] o.a.a.b.r.c.AbstractStoreCursor - org.apache.activemq.broker.region.cursors QueueStorePrefetch@a7fd9bb:test,batchResetNeeded=false,storeHasMessages=false,size=0,cacheEnabled=true,maxBatchSize:200 - fillBatch [trace] o.a.a.b.r.c.AbstractStoreCursor - org.apache.activemq.broker.region.cursors.QueueStorePrefetch@a7fd9bb:test,batchResetNeeded=false,storeHasMessages=false,size=0,cacheEnabled=true,maxBatchSize:200 - fillBatch [debug] o.a.a.b.r.Queue - queue://test expiring messages done. [trace] o.a.a.t.PooledTaskRunner - Running task iteration 0 - queue://test, subscriptions=0, memory=0%, size=0, in flight groups=null [debug] o.a.a.b.r.Queue - queue://test expiring messages .. [trace] o.a.a.t.PooledTaskRunner - Run task done: queue://test, subscriptions=0, memory=0%, size=0, in flight groups=null [debug] o.a.a.b.r.Queue - test toPageIn: 0, Inflight: 0, pagedInMessages.size 0, enqueueCount: 0, dequeueCount: 0 [trace] o.a.a.b.r.c.AbstractStoreCursor - org.apache.activemq.broker.region.cursors.QueueStorePrefetch@a7fd9bb:test,batchResetNeeded=false,storeHasMessages=false,size=0,cacheEnabled=true,maxBatchSize:200 - fillBatch [trace] o.a.a.b.r.c.AbstractStoreCursor - org.apache.activemq.broker.region.cursors.QueueStorePrefetch@a7fd9bb:test,batchResetNeeded=false,storeHasMessages=false,size=0,cacheEnabled=true,maxBatchSize:200 - fillBatch [debug] o.a.a.b.r.Queue - queue://test expiring messages done. [trace] o.a.a.t.PooledTaskRunner - Running task iteration 0 - queue://test, subscriptions=0, memory=0%, size=0, in flight groups=null [trace] o.a.a.t.PooledTaskRunner - Run task done: queue://test, subscriptions=0, memory=0%, size=0, in flight groups=null [debug] o.a.a.s.k.MessageDatabase - Checkpoint started. [trace] o.a.a.s.k.MessageDatabase - Last update: 1:3698624, full gc candidates set: [1] [trace] o.a.a.s.k.MessageDatabase - gc candidates after first tx:1:3698624, [] [trace] o.a.a.s.k.MessageDatabase - gc candidates: [] [debug] o.a.a.s.k.MessageDatabase - Checkpoint done. [debug] o.a.a.t.m.MQTTProtocolConverter - Sent Ping Response to [debug] o.a.a.s.k.MessageDatabase - Checkpoint started. [debug] o.a.a.s.k.MessageDatabase - Checkpoint done. [debug] o.a.a.t.m.MQTTProtocolConverter - Sent Ping Response to [debug] o.a.a.s.k.MessageDatabase - Checkpoint started. [debug] o.a.a.s.k.MessageDatabase - Checkpoint done. [debug] o.a.a.t.m.MQTTProtocolConverter - Sent Ping Response to [debug] o.a.a.s.k.MessageDatabase - Checkpoint started. [debug] o.a.a.s.k.MessageDatabase - Checkpoint done. [debug] o.a.a.t.m.MQTTProtocolConverter - Sent Ping Response to [debug] o.a.a.s.k.MessageDatabase - Checkpoint started. [debug] o.a.a.s.k.MessageDatabase - Checkpoint done. [trace] o.a.a.t.m.MQTTInactivityMonitor - Message received since last read check, resetting flag: [debug] o.a.a.t.m.MQTTProtocolConverter - Sent Ping Response to [debug] o.a.a.s.k.MessageDatabase - Checkpoint started. [debug] o.a.a.s.k.MessageDatabase - Checkpoint done. [debug] o.a.a.t.m.MQTTProtocolConverter - Sent Ping Response to [debug] o.a.a.b.r.Queue - queue://test expiring messages .. [debug] o.a.a.b.r.Queue - test toPageIn: 0, Inflight: 0, pagedInMessages.size 0, enqueueCount: 0, dequeueCount: 0 [trace] o.a.a.b.r.c.AbstractStoreCursor - org.apache.activemq.broker.region.cursors.QueueStorePrefetch@a7fd9bb:test,batchResetNeeded=false,storeHasMessages=false,size=0,cacheEnabled=true,maxBatchSize:200 - fillBatch [trace] o.a.a.b.r.c.AbstractStoreCursor - org.apache.activemq.broker.region.cursors.QueueStorePrefetch@a7fd9bb:test,batchResetNeeded=false,storeHasMessages=false,size=0,cacheEnabled=true,maxBatchSize:200 - fillBatch [debug] o.a.a.b.r.Queue - queue://test expiring messages done. [trace] o.a.a.t.PooledTaskRunner - Running task iteration 0 - queue://test, subscriptions=0, memory=0%, size=0, in flight groups=null [debug] o.a.a.b.r.Queue - queue://test expiring messages .. [trace] o.a.a.t.PooledTaskRunner - Run task done: queue://test, subscriptions=0, memory=0%, size=0, in flight groups=null [debug] o.a.a.b.r.Queue - test toPageIn: 0, Inflight: 0, pagedInMessages.size 0, enqueueCount: 0, dequeueCount: 0 [trace] o.a.a.b.r.c.AbstractStoreCursor - org.apache.activemq.broker.region.cursors.QueueStorePrefetch@a7fd9bb:test,batchResetNeeded=false,storeHasMessages=false,size=0,cacheEnabled=true,maxBatchSize:200 - fillBatch [trace] o.a.a.b.r.c.AbstractStoreCursor - org.apache.activemq.broker.region.cursors.QueueStorePrefetch@a7fd9bb:test,batchResetNeeded=false,storeHasMessages=false,size=0,cacheEnabled=true,maxBatchSize:200 - fillBatch [debug] o.a.a.b.r.Queue - queue://test expiring messages done. [trace] o.a.a.t.PooledTaskRunner - Running task iteration 0 - queue://test, subscriptions=0, memory=0%, size=0, in flight groups=null [trace] o.a.a.t.PooledTaskRunner - Run task done: queue://test, subscriptions=0, memory=0%, size=0, in flight groups=null [debug] o.a.a.s.k.MessageDatabase - Checkpoint started. [trace] o.a.a.s.k.MessageDatabase - Last update: 1:3701042, full gc candidates set: [1] [trace] o.a.a.s.k.MessageDatabase - gc candidates after first tx:1:3701042, [] [trace] o.a.a.s.k.MessageDatabase - gc candidates: [] [debug] o.a.a.s.k.MessageDatabase - Checkpoint done. [debug] o.a.a.t.m.MQTTProtocolConverter - Sent Ping Response to [debug] o.a.a.s.k.MessageDatabase - Checkpoint started. [debug] o.a.a.s.k.MessageDatabase - Checkpoint done. [debug] o.a.a.t.m.MQTTProtocolConverter - Sent Ping Response to [debug] o.a.a.s.k.MessageDatabase - Checkpoint started. [debug] o.a.a.s.k.MessageDatabase - Checkpoint done. [debug] o.a.a.t.m.MQTTProtocolConverter - Sent Ping Response to [debug] o.a.a.s.k.MessageDatabase - Checkpoint started. [debug] o.a.a.s.k.MessageDatabase - Checkpoint done. [debug] o.a.a.t.m.MQTTProtocolConverter - Sent Ping Response to [debug] o.a.a.s.k.MessageDatabase - Checkpoint started. [debug] o.a.a.s.k.MessageDatabase - Checkpoint done. [debug] o.a.a.t.m.MQTTInactivityMonitor - 30001 ms elapsed since last read check. [debug] o.a.a.t.m.MQTTInactivityMonitor - No message received since last read check for tcp:///127.0.0.1:61332@1883! Throwing InactivityIOException. [trace] o.a.a.u.ThreadPoolUtils - Shutdown of ExecutorService: java.util.concurrent.ThreadPoolExecutor@6e3271eb with await termination: 0 millis [debug] o.a.a.u.ThreadPoolUtils - Shutdown of ExecutorService: java.util.concurrent.ThreadPoolExecutor@6e3271eb is shutdown: true and terminated: false took: 0.014 seconds. [debug] o.a.a.b.T.Transport - Transport Connection to: tcp://127.0.0.1:61332 failed: org.apache.activemq.transport.InactivityIOException: Channel was inactive for too (>5000) long: tcp://127.0.0.1:61332org.apache.activemq.transport.InactivityIOException: Channel was inactive for too (>5000) long: tcp://127.0.0.1:61332 at org.apache.activemq.transport.mqtt.MQTTInactivityMonitor$2.run(MQTTInactivityMonitor.java:133) [activemq-core-5.7-SNAPSHOT.jar:5.7-SNAPSHOT] at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(Unknown Source) [na:1.6.0_24] at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) [na:1.6.0_24] at java.lang.Thread.run(Unknown Source) [na:1.6.0_24] [debug] o.a.a.b.j.ManagementContext - Unregistering MBean org.apache.activemq:BrokerName=localhost,Type=Connection,ConnectorName=mqtt+nio_//localhost_1883,Connection=test-client [debug] o.a.a.b.j.ManagementContext - Unregistering MBean org.apache.activemq:BrokerName=localhost,Type=Connection,ConnectorName=mqtt+nio_//localhost_1883,ViewType=address,Name=tcp_//127.0.0.1_61332 [trace] o.a.a.t.TaskRunnerFactory - Execute[ActiveMQ BrokerService[localhost] Task] runnable: org.apache.activemq.broker.TransportConnection$4@79bd7026 [debug] o.a.a.b.TransportConnection - Stopping connection: tcp://127.0.0.1:61332 [debug] o.a.a.t.t.TcpTransport - Stopping transport tcp:///127.0.0.1:61332@1883 [debug] o.a.a.t.TaskRunnerFactory - Initialized TaskRunnerFactory[ActiveMQ Task] using ExecutorService: java.util.concurrent.ThreadPoolExecutor@40e0d3b [trace] o.a.a.t.TaskRunnerFactory - Execute[ActiveMQ Task] runnable: org.apache.activemq.transport.tcp.TcpTransport$1@36869e91 [trace] o.a.a.t.TaskRunnerFactory - Created thread[ActiveMQ Task-1]: Thread[ActiveMQ Task-1,5,main] [trace] o.a.a.t.t.TcpTransport - Closing socket Socket[addr=/127.0.0.1,port=61332,localport=1883] [debug] o.a.a.t.t.TcpTransport - Closed socket Socket[unconnected] [debug] o.a.a.u.ThreadPoolUtils - Forcing shutdown of ExecutorService: java.util.concurrent.ThreadPoolExecutor@40e0d3b [trace] o.a.a.u.ThreadPoolUtils - Shutdown of ExecutorService: java.util.concurrent.ThreadPoolExecutor@40e0d3b is shutdown: true and terminated: false. [debug] o.a.a.b.TransportConnection - Stopped transport: null [trace] o.a.a.t.PooledTaskRunner - Shutdown timeout: Transport Connection to: null task: {} [debug] o.a.a.b.TransportConnection - Cleaning up connection resources: null [debug] o.a.a.b.TransportConnection - remove connection id: ID:Jeroen-Laptop-60782-1348606334006-9:4 [debug] o.a.a.b.j.ManagementContext - Unregistering MBean org.apache.activemq:BrokerName=localhost,Type=Subscription,persistentMode=Non-Durable,destinationType=Topic,destinationName=groups.Nameless,clientId=test-client,consumerId=ID_Jeroen-Laptop-60782-1348606334006-9_4_-1_1 [debug] o.a.a.b.r.AbstractRegion - localhost removing consumer: ID:Jeroen-Laptop-60782-1348606334006-9:4:-1:1 for destination: topic://groups.Nameless [debug] o.a.a.b.j.ManagementContext - Unregistering MBean org.apache.activemq:BrokerName=localhost,Type=Producer,destinationType=Dynamic,clientId=test-client,producerId=ID_Jeroen-Laptop-60782-1348606334006-9_4_-1_1 [debug] o.a.a.b.TransportConnection - Connection Stopped: null [debug] o.a.a.s.k.MessageDatabase - Checkpoint started. [debug] o.a.a.s.k.MessageDatabase - Checkpoint done. [debug] o.a.a.b.r.Queue - queue://test expiring messages .. [debug] o.a.a.b.r.Queue - test toPageIn: 0, Inflight: 0, pagedInMessages.size 0, enqueueCount: 0, dequeueCount: 0 [trace] o.a.a.b.r.c.AbstractStoreCursor - org.apache.activemq.broker.region.cursors.QueueStorePrefetch@a7fd9bb:test,batchResetNeeded=false,storeHasMessages=false,size=0,cacheEnabled=true,maxBatchSize:200 - fillBatch [trace] o.a.a.b.r.c.AbstractStoreCursor - org.apache.activemq.broker.region.cursors.QueueStorePrefetch@a7fd9bb:test,batchResetNeeded=false,storeHasMessages=false,size=0,cacheEnabled=true,maxBatchSize:200 - fillBatch [debug] o.a.a.b.r.Queue - queue://test expiring messages done. [trace] o.a.a.t.PooledTaskRunner - Running task iteration 0 - queue://test, subscriptions=0, memory=0%, size=0, in flight groups=null [debug] o.a.a.b.r.Queue - queue://test expiring messages .. [trace] o.a.a.t.PooledTaskRunner - Run task done: queue://test, subscriptions=0, memory=0%, size=0, in flight groups=null [debug] o.a.a.b.r.Queue - test toPageIn: 0, Inflight: 0, pagedInMessages.size 0, enqueueCount: 0, dequeueCount: 0 [trace] o.a.a.b.r.c.AbstractStoreCursor - org.apache.activemq.broker.region.cursors.QueueStorePrefetch@a7fd9bb:test,batchResetNeeded=false,storeHasMessages=false,size=0,cacheEnabled=true,maxBatchSize:200 - fillBatch [trace] o.a.a.b.r.c.AbstractStoreCursor - org.apache.activemq.broker.region.cursors.QueueStorePrefetch@a7fd9bb:test,batchResetNeeded=false,storeHasMessages=false,size=0,cacheEnabled=true,maxBatchSize:200 - fillBatch [debug] o.a.a.b.r.Queue - queue://test expiring messages done. [trace] o.a.a.t.PooledTaskRunner - Running task iteration 0 - queue://test, subscriptions=0, memory=0%, size=0, in flight groups=null [trace] o.a.a.t.PooledTaskRunner - Run task done: queue://test, subscriptions=0, memory=0%, size=0, in flight groups=null =================================================================== -- View this message in context: http://activemq.2283324.n4.nabble.com/ActiveMQ-5-7-snapshot-and-MQTT-connection-times-out-while-still-connected-to-a-topic-for-receiving-tp4656976.html Sent from the ActiveMQ - User mailing list archive at Nabble.com.