ActiveMQ (5.7 snapshot) and MQTT: connection times out while still connected
to a topic for receiving

I'm using mosquitto's python client to connect to an mqtt topic. When using
the mosquitto broker,
all works (but that broker doesn't allow me to plug in authentication to my
database, so it's no real option).

When using activemq, the connection is disconnected. The clients does ping,
and the broker does respond. But it doesn't have seem an effect as the
connection still times out.

Am i missing some configuration option?


broker config (scala):
===================================================================
val broker = new BrokerService()
broker.setPlugins(Array(new UpoAuthenticationPlugin(admin_user, admin_pw)))

broker.addConnector("mqtt+nio://127.0.0.1:1883")
broker.addConnector("vm://localhost")
broker.start()
===================================================================

python client:
===================================================================
import mosquitto
import os
import time

client = mosquitto.Mosquitto("test-client")
client.username_pw_set("user", "password")

def on_connect(mosq, obj, rc):
    if rc == 0:
        print("Connected successfully.")

client.on_connect = on_connect

def on_disconnect(mosq, obj, rc):
    print("Disconnected successfully.")

client.on_disconnect = on_disconnect

def on_publish(mosq, obj, mid):
    print("Message "+str(mid)+" published.")

client.on_publish = on_publish

def on_message(mosq, obj, msg):
    print("Message received on topic "+msg.topic+" with QoS "+str(msg.qos)+"
and payload "+msg.payload)

client.on_message = on_message

def on_subscribe(mosq, obj, mid, qos_list):
    print("Subscribe with mid "+str(mid)+" received.")

client.on_subscribe = on_subscribe

client.connect("127.0.0.1", keepalive=5)

client.subscribe("groups/Nameless", 1)
client.publish("groups/Nameless", "hello world", 1)

while True:
    print "loop" , client.loop()
    time.sleep(2)    
    pass
===================================================================

ActiveMQ log
===================================================================
[debug] o.a.a.t.m.MQTTProtocolConverter - MQTT Client  connected.
[debug] o.a.a.b.r.AbstractRegion - localhost adding consumer:
ID:Jeroen-Laptop-60782-1348606334006-9:4:-1:1 for destination:
topic://groups.Nameless
[debug] o.a.a.b.r.AbstractRegion - localhost adding destination:
topic://ActiveMQ.Advisory.Consumer.Topic.groups.Nameless
[trace] o.a.a.t.PooledTaskRunner - Running task iteration 0 - Transport
Connection to: tcp://127.0.0.1:61332
[trace] o.a.a.t.PooledTaskRunner - Running task iteration 1 - Transport
Connection to: tcp://127.0.0.1:61332
[trace] o.a.a.t.PooledTaskRunner - Run task done: Transport Connection to:
tcp://127.0.0.1:61332
[debug] o.a.a.s.k.MessageDatabase - Checkpoint started.
[debug] o.a.a.s.k.MessageDatabase - Checkpoint done.
[debug] o.a.a.t.m.MQTTProtocolConverter - Sent Ping Response to
[debug] o.a.a.b.r.Queue - queue://test expiring messages ..
[debug] o.a.a.b.r.Queue - test toPageIn: 0, Inflight: 0,
pagedInMessages.size 0, enqueueCount: 0, dequeueCount: 0
[trace] o.a.a.b.r.c.AbstractStoreCursor -
org.apache.activemq.broker.region.cursors
QueueStorePrefetch@a7fd9bb:test,batchResetNeeded=false,storeHasMessages=false,size=0,cacheEnabled=true,maxBatchSize:200
- fillBatch
[trace] o.a.a.b.r.c.AbstractStoreCursor -
org.apache.activemq.broker.region.cursors.QueueStorePrefetch@a7fd9bb:test,batchResetNeeded=false,storeHasMessages=false,size=0,cacheEnabled=true,maxBatchSize:200
- fillBatch
[debug] o.a.a.b.r.Queue - queue://test expiring messages done.
[trace] o.a.a.t.PooledTaskRunner - Running task iteration 0 - queue://test,
subscriptions=0, memory=0%, size=0, in flight groups=null
[debug] o.a.a.b.r.Queue - queue://test expiring messages ..
[trace] o.a.a.t.PooledTaskRunner - Run task done: queue://test,
subscriptions=0, memory=0%, size=0, in flight groups=null
[debug] o.a.a.b.r.Queue - test toPageIn: 0, Inflight: 0,
pagedInMessages.size 0, enqueueCount: 0, dequeueCount: 0
[trace] o.a.a.b.r.c.AbstractStoreCursor -
org.apache.activemq.broker.region.cursors.QueueStorePrefetch@a7fd9bb:test,batchResetNeeded=false,storeHasMessages=false,size=0,cacheEnabled=true,maxBatchSize:200
- fillBatch
[trace] o.a.a.b.r.c.AbstractStoreCursor -
org.apache.activemq.broker.region.cursors.QueueStorePrefetch@a7fd9bb:test,batchResetNeeded=false,storeHasMessages=false,size=0,cacheEnabled=true,maxBatchSize:200
- fillBatch
[debug] o.a.a.b.r.Queue - queue://test expiring messages done.
[trace] o.a.a.t.PooledTaskRunner - Running task iteration 0 - queue://test,
subscriptions=0, memory=0%, size=0, in flight groups=null
[trace] o.a.a.t.PooledTaskRunner - Run task done: queue://test,
subscriptions=0, memory=0%, size=0, in flight groups=null
[debug] o.a.a.s.k.MessageDatabase - Checkpoint started.
[trace] o.a.a.s.k.MessageDatabase - Last update: 1:3698624, full gc
candidates set: [1]
[trace] o.a.a.s.k.MessageDatabase - gc candidates after first tx:1:3698624,
[]
[trace] o.a.a.s.k.MessageDatabase - gc candidates: []
[debug] o.a.a.s.k.MessageDatabase - Checkpoint done.
[debug] o.a.a.t.m.MQTTProtocolConverter - Sent Ping Response to
[debug] o.a.a.s.k.MessageDatabase - Checkpoint started.
[debug] o.a.a.s.k.MessageDatabase - Checkpoint done.
[debug] o.a.a.t.m.MQTTProtocolConverter - Sent Ping Response to
[debug] o.a.a.s.k.MessageDatabase - Checkpoint started.
[debug] o.a.a.s.k.MessageDatabase - Checkpoint done.
[debug] o.a.a.t.m.MQTTProtocolConverter - Sent Ping Response to
[debug] o.a.a.s.k.MessageDatabase - Checkpoint started.
[debug] o.a.a.s.k.MessageDatabase - Checkpoint done.
[debug] o.a.a.t.m.MQTTProtocolConverter - Sent Ping Response to
[debug] o.a.a.s.k.MessageDatabase - Checkpoint started.
[debug] o.a.a.s.k.MessageDatabase - Checkpoint done.
[trace] o.a.a.t.m.MQTTInactivityMonitor - Message received since last read
check, resetting flag:
[debug] o.a.a.t.m.MQTTProtocolConverter - Sent Ping Response to
[debug] o.a.a.s.k.MessageDatabase - Checkpoint started.
[debug] o.a.a.s.k.MessageDatabase - Checkpoint done.
[debug] o.a.a.t.m.MQTTProtocolConverter - Sent Ping Response to
[debug] o.a.a.b.r.Queue - queue://test expiring messages ..
[debug] o.a.a.b.r.Queue - test toPageIn: 0, Inflight: 0,
pagedInMessages.size 0, enqueueCount: 0, dequeueCount: 0
[trace] o.a.a.b.r.c.AbstractStoreCursor -
org.apache.activemq.broker.region.cursors.QueueStorePrefetch@a7fd9bb:test,batchResetNeeded=false,storeHasMessages=false,size=0,cacheEnabled=true,maxBatchSize:200
- fillBatch
[trace] o.a.a.b.r.c.AbstractStoreCursor -
org.apache.activemq.broker.region.cursors.QueueStorePrefetch@a7fd9bb:test,batchResetNeeded=false,storeHasMessages=false,size=0,cacheEnabled=true,maxBatchSize:200
- fillBatch
[debug] o.a.a.b.r.Queue - queue://test expiring messages done.
[trace] o.a.a.t.PooledTaskRunner - Running task iteration 0 - queue://test,
subscriptions=0, memory=0%, size=0, in flight groups=null
[debug] o.a.a.b.r.Queue - queue://test expiring messages ..
[trace] o.a.a.t.PooledTaskRunner - Run task done: queue://test,
subscriptions=0, memory=0%, size=0, in flight groups=null
[debug] o.a.a.b.r.Queue - test toPageIn: 0, Inflight: 0,
pagedInMessages.size 0, enqueueCount: 0, dequeueCount: 0
[trace] o.a.a.b.r.c.AbstractStoreCursor -
org.apache.activemq.broker.region.cursors.QueueStorePrefetch@a7fd9bb:test,batchResetNeeded=false,storeHasMessages=false,size=0,cacheEnabled=true,maxBatchSize:200
- fillBatch
[trace] o.a.a.b.r.c.AbstractStoreCursor -
org.apache.activemq.broker.region.cursors.QueueStorePrefetch@a7fd9bb:test,batchResetNeeded=false,storeHasMessages=false,size=0,cacheEnabled=true,maxBatchSize:200
- fillBatch
[debug] o.a.a.b.r.Queue - queue://test expiring messages done.
[trace] o.a.a.t.PooledTaskRunner - Running task iteration 0 - queue://test,
subscriptions=0, memory=0%, size=0, in flight groups=null
[trace] o.a.a.t.PooledTaskRunner - Run task done: queue://test,
subscriptions=0, memory=0%, size=0, in flight groups=null
[debug] o.a.a.s.k.MessageDatabase - Checkpoint started.
[trace] o.a.a.s.k.MessageDatabase - Last update: 1:3701042, full gc
candidates set: [1]
[trace] o.a.a.s.k.MessageDatabase - gc candidates after first tx:1:3701042,
[]
[trace] o.a.a.s.k.MessageDatabase - gc candidates: []
[debug] o.a.a.s.k.MessageDatabase - Checkpoint done.
[debug] o.a.a.t.m.MQTTProtocolConverter - Sent Ping Response to
[debug] o.a.a.s.k.MessageDatabase - Checkpoint started.
[debug] o.a.a.s.k.MessageDatabase - Checkpoint done.
[debug] o.a.a.t.m.MQTTProtocolConverter - Sent Ping Response to
[debug] o.a.a.s.k.MessageDatabase - Checkpoint started.
[debug] o.a.a.s.k.MessageDatabase - Checkpoint done.
[debug] o.a.a.t.m.MQTTProtocolConverter - Sent Ping Response to
[debug] o.a.a.s.k.MessageDatabase - Checkpoint started.
[debug] o.a.a.s.k.MessageDatabase - Checkpoint done.
[debug] o.a.a.t.m.MQTTProtocolConverter - Sent Ping Response to
[debug] o.a.a.s.k.MessageDatabase - Checkpoint started.
[debug] o.a.a.s.k.MessageDatabase - Checkpoint done.
[debug] o.a.a.t.m.MQTTInactivityMonitor - 30001 ms elapsed since last read
check.
[debug] o.a.a.t.m.MQTTInactivityMonitor - No message received since last
read check for tcp:///127.0.0.1:61332@1883! Throwing InactivityIOException.
[trace] o.a.a.u.ThreadPoolUtils - Shutdown of ExecutorService:
java.util.concurrent.ThreadPoolExecutor@6e3271eb with await termination: 0
millis
[debug] o.a.a.u.ThreadPoolUtils - Shutdown of ExecutorService:
java.util.concurrent.ThreadPoolExecutor@6e3271eb is shutdown: true and
terminated: false took: 0.014 seconds.
[debug] o.a.a.b.T.Transport - Transport Connection to: tcp://127.0.0.1:61332
failed: org.apache.activemq.transport.InactivityIOException: Channel was
inactive for too (>5000) long:
tcp://127.0.0.1:61332org.apache.activemq.transport.InactivityIOException:
Channel was inactive for too (>5000) long: tcp://127.0.0.1:61332
        at
org.apache.activemq.transport.mqtt.MQTTInactivityMonitor$2.run(MQTTInactivityMonitor.java:133)
[activemq-core-5.7-SNAPSHOT.jar:5.7-SNAPSHOT]
        at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(Unknown
Source) [na:1.6.0_24]
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown
Source) [na:1.6.0_24]
        at java.lang.Thread.run(Unknown Source) [na:1.6.0_24]
[debug] o.a.a.b.j.ManagementContext - Unregistering MBean
org.apache.activemq:BrokerName=localhost,Type=Connection,ConnectorName=mqtt+nio_//localhost_1883,Connection=test-client
[debug] o.a.a.b.j.ManagementContext - Unregistering MBean
org.apache.activemq:BrokerName=localhost,Type=Connection,ConnectorName=mqtt+nio_//localhost_1883,ViewType=address,Name=tcp_//127.0.0.1_61332
[trace] o.a.a.t.TaskRunnerFactory - Execute[ActiveMQ
BrokerService[localhost] Task] runnable:
org.apache.activemq.broker.TransportConnection$4@79bd7026
[debug] o.a.a.b.TransportConnection - Stopping connection:
tcp://127.0.0.1:61332
[debug] o.a.a.t.t.TcpTransport - Stopping transport
tcp:///127.0.0.1:61332@1883
[debug] o.a.a.t.TaskRunnerFactory - Initialized TaskRunnerFactory[ActiveMQ
Task] using ExecutorService: java.util.concurrent.ThreadPoolExecutor@40e0d3b
[trace] o.a.a.t.TaskRunnerFactory - Execute[ActiveMQ Task] runnable:
org.apache.activemq.transport.tcp.TcpTransport$1@36869e91
[trace] o.a.a.t.TaskRunnerFactory - Created thread[ActiveMQ Task-1]:
Thread[ActiveMQ Task-1,5,main]
[trace] o.a.a.t.t.TcpTransport - Closing socket
Socket[addr=/127.0.0.1,port=61332,localport=1883]
[debug] o.a.a.t.t.TcpTransport - Closed socket Socket[unconnected]
[debug] o.a.a.u.ThreadPoolUtils - Forcing shutdown of ExecutorService:
java.util.concurrent.ThreadPoolExecutor@40e0d3b
[trace] o.a.a.u.ThreadPoolUtils - Shutdown of ExecutorService:
java.util.concurrent.ThreadPoolExecutor@40e0d3b is shutdown: true and
terminated: false.
[debug] o.a.a.b.TransportConnection - Stopped transport: null
[trace] o.a.a.t.PooledTaskRunner - Shutdown timeout: Transport Connection
to: null task: {}
[debug] o.a.a.b.TransportConnection - Cleaning up connection resources: null
[debug] o.a.a.b.TransportConnection - remove connection id:
ID:Jeroen-Laptop-60782-1348606334006-9:4
[debug] o.a.a.b.j.ManagementContext - Unregistering MBean
org.apache.activemq:BrokerName=localhost,Type=Subscription,persistentMode=Non-Durable,destinationType=Topic,destinationName=groups.Nameless,clientId=test-client,consumerId=ID_Jeroen-Laptop-60782-1348606334006-9_4_-1_1
[debug] o.a.a.b.r.AbstractRegion - localhost removing consumer:
ID:Jeroen-Laptop-60782-1348606334006-9:4:-1:1 for destination:
topic://groups.Nameless
[debug] o.a.a.b.j.ManagementContext - Unregistering MBean
org.apache.activemq:BrokerName=localhost,Type=Producer,destinationType=Dynamic,clientId=test-client,producerId=ID_Jeroen-Laptop-60782-1348606334006-9_4_-1_1
[debug] o.a.a.b.TransportConnection - Connection Stopped: null
[debug] o.a.a.s.k.MessageDatabase - Checkpoint started.
[debug] o.a.a.s.k.MessageDatabase - Checkpoint done.
[debug] o.a.a.b.r.Queue - queue://test expiring messages ..
[debug] o.a.a.b.r.Queue - test toPageIn: 0, Inflight: 0,
pagedInMessages.size 0, enqueueCount: 0, dequeueCount: 0
[trace] o.a.a.b.r.c.AbstractStoreCursor -
org.apache.activemq.broker.region.cursors.QueueStorePrefetch@a7fd9bb:test,batchResetNeeded=false,storeHasMessages=false,size=0,cacheEnabled=true,maxBatchSize:200
- fillBatch
[trace] o.a.a.b.r.c.AbstractStoreCursor -
org.apache.activemq.broker.region.cursors.QueueStorePrefetch@a7fd9bb:test,batchResetNeeded=false,storeHasMessages=false,size=0,cacheEnabled=true,maxBatchSize:200
- fillBatch
[debug] o.a.a.b.r.Queue - queue://test expiring messages done.
[trace] o.a.a.t.PooledTaskRunner - Running task iteration 0 - queue://test,
subscriptions=0, memory=0%, size=0, in flight groups=null
[debug] o.a.a.b.r.Queue - queue://test expiring messages ..
[trace] o.a.a.t.PooledTaskRunner - Run task done: queue://test,
subscriptions=0, memory=0%, size=0, in flight groups=null
[debug] o.a.a.b.r.Queue - test toPageIn: 0, Inflight: 0,
pagedInMessages.size 0, enqueueCount: 0, dequeueCount: 0
[trace] o.a.a.b.r.c.AbstractStoreCursor -
org.apache.activemq.broker.region.cursors.QueueStorePrefetch@a7fd9bb:test,batchResetNeeded=false,storeHasMessages=false,size=0,cacheEnabled=true,maxBatchSize:200
- fillBatch
[trace] o.a.a.b.r.c.AbstractStoreCursor -
org.apache.activemq.broker.region.cursors.QueueStorePrefetch@a7fd9bb:test,batchResetNeeded=false,storeHasMessages=false,size=0,cacheEnabled=true,maxBatchSize:200
- fillBatch
[debug] o.a.a.b.r.Queue - queue://test expiring messages done.
[trace] o.a.a.t.PooledTaskRunner - Running task iteration 0 - queue://test,
subscriptions=0, memory=0%, size=0, in flight groups=null
[trace] o.a.a.t.PooledTaskRunner - Run task done: queue://test,
subscriptions=0, memory=0%, size=0, in flight groups=null
===================================================================



--
View this message in context: 
http://activemq.2283324.n4.nabble.com/ActiveMQ-5-7-snapshot-and-MQTT-connection-times-out-while-still-connected-to-a-topic-for-receiving-tp4656976.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.

Reply via email to