I am writing a custom SubscriptionRecoveryPolicy and one of the things I need
to do while recovering is create a temporary topic and consume messages from
it.
What is the right way to do this?
I don't see a straightforward API for it (such as the one for sending
messages: BrokerFilter.send).
I have tried going the standard JMS route with a ActiveMQConnectionFactory
to the vm://localhost
transport, but I keep running into all kinds of locking problems and
deadlocks that not only hold
up what I am trying to do, but stop ANY connections from succeeding from
then on. It seems that
creating a connection and trying to create a consumer inside another attempt
to create a consumer
doesn't work.
For the record, here is what I am trying. This is inside
SubscriptionRecoveryPolicy.recover:
ActiveMQConnectionFactory factory = new
ActiveMQConnectionFactory(this.broker.getVmConnectorURI());
factory.setUseAsyncSend(true);
factory.setWatchTopicAdvisories(false);
Connection connection = factory.createConnection();
connection.start();
Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
TemporaryTopic replaySource = session.createTemporaryTopic();
MessageConsumer replayConsumer =
session.createConsumer(replaySource);
This hangs on createConsumer:
"ActiveMQ Transport: ssl:///10.1.210.140:59574" - Thread t...@53
java.lang.Thread.State: WAITING on
java.util.concurrent.locks.abstractqueuedsynchronizer$conditionobj...@3fd48679
at sun.misc.Unsafe.park(Native Method)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:158)
at
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987)
at
java.util.concurrent.ArrayBlockingQueue.take(ArrayBlockingQueue.java:317)
at
org.apache.activemq.transport.FutureResponse.getResult(FutureResponse.java:40)
at
org.apache.activemq.transport.ResponseCorrelator.request(ResponseCorrelator.java:87)
at
org.apache.activemq.ActiveMQConnection.syncSendPacket(ActiveMQConnection.java:1276)
at
org.apache.activemq.ActiveMQSession.syncSendPacket(ActiveMQSession.java:1874)
at
org.apache.activemq.ActiveMQMessageConsumer.<init>(ActiveMQMessageConsumer.java:254)
at
org.apache.activemq.ActiveMQSession.createConsumer(ActiveMQSession.java:1116)
at
org.apache.activemq.ActiveMQSession.createConsumer(ActiveMQSession.java:1060)
at
org.apache.activemq.ActiveMQSession.createConsumer(ActiveMQSession.java:973)
at
org.apache.activemq.ActiveMQSession.createConsumer(ActiveMQSession.java:946)
Note that if I don't set watchTopicAdvisories to false on the connection
factory, then
this hangs during Connection.start() in a different area, but one also
related to creating
a consumer.
Also, this hang seems to prevent any other new connections to activemq from
completing.
Anyway, I'm not attached to this methodology. I just want to create a temp
topic and
consume from it. I don't care how :)
So what's the right way to do this? There is an easy way to send messages
via the
broker object, but nothing for consumers that I can find. Any help is much
appreciated!
-adam
--
View this message in context:
http://activemq.2283324.n4.nabble.com/How-to-safely-consume-a-topic-from-INSIDE-the-broker-tp3068674p3068674.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.