Well, having "real" destinations come and go that quickly is definitely not
a typical use case.  Admittedly, I have not been following the conversation
that closely, but you do seem to have done your homework.  I assume
temporary destinations wouldn't suit your needs.  Have you considered
something like Akka?  If you have this kind of throughput, do you really
need persistence?  Is each message a precious commodity or is your
application just extremely chatty?  I'll try to dig through the
conversation...

On Sunday, February 8, 2015, Kevin Burton <[email protected]> wrote:

> I could probably make one..
>
> I think it would just be a broker with leveldb config, inactive GC of about
> 60 seconds, code which writes 1 message to each of about 5000 queues,
> consumes them, then closes all consumers.
>
> After once the GC kicks in it will take a long time for ActiveMQ to GC all
> the destinations.
>
> Once I get to a state where I have my code at least reliable/stable, I’ll
> try to code up something.
>
> I’m wondering if I’m in anti-pattern territory though.  If this is NOT a
> use case which ActiveMQ services , then I should really expect a fix.
>
> But it seems that the performance of GCing destinations should be higher
> than this.
>
> Seems you should be able to GC something like 100-500 destinations per
> second.
>
> On Sun, Feb 8, 2015 at 5:22 PM, James Carman <[email protected]
> <javascript:;>>
> wrote:
>
> > Do you have an example project which will exhibit the issue reliably?
> >
> > On Sunday, February 8, 2015, Kevin Burton <[email protected]
> <javascript:;>> wrote:
> >
> > > OK. Now I have proof that my theory was correct :)
> > >
> > > If you have a lot of inactive queues that need to be GCd.. these will
> > block
> > > consumers and producers and effectively shut down ActiveMQ during this
> > > process.
> > >
> > > In my usage, we see 30 minutes of activity where ActiveMQ is
> unresponsive
> > > and effectively dead :-(
> > >
> > > I spent all of last week trying to work around this by allowing
> ActiveMQ
> > to
> > > GC queues by releasing them more aggressively hoping to amortize this
> > > process but I don’t think that fixed the issue.
> > >
> > > This is a report of a ‘stack report’ tool I wrote 5 or so years ago
> which
> > > we use internally.  It runs jstack then builds a graph looking at the
> > lock
> > > IDs and then ranks them accordingly, sorting by hot spot descending.
> > >
> > > So here we have 13 threads, which have 12 inbound thread blocking them
> (3
> > > distinct stack traces)
> > >
> > > Note the addConsumer and addProducer… so no new consumers can be
> created
> > > while this process happens.
> > >
> > > Is this mutex lock strictly needed? Perhaps one strategy is to have one
> > > thread which does the removeDestination and then just an AtomicBoolean
> > > flagging this reference as “gone” and pending removal.
> > >
> > > the main thing I’m worried about is that I’m the only one seeing this
> > issue
> > > …
> > >
> > > I don’t really understand why this is happening though and why it’s so
> > > slow.
> > >
> > > I’m looking at my log right now and it has about 3200 queues that it’s
> > > GCing at the moment.
> > >
> > > It looks like it’s able to do about 10 per minute.  So obviously this
> is
> > > just going to take a long time.
> > >
> > > So I think my main idea is to move to an all memory ActiveMQ broker for
> > > now.  I can just get good at QUICKLY rebuilding the queue during total
> > > queue failure.  This is just a short term work around though..
> > >
> > > I’ll either have to figure out away to fix this, completely redesign my
> > app
> > > (which will be no fun), build my own queue server with special
> semantics,
> > > or implement some sort of snapshotting support and logging.
> > >
> > > If I were to build this myself doing logs and checkpoints, while
> keeping
> > > the whole thing in memory, I think that would be faster.  But of course
> > > that would take more time :-(
> > >
> > > -------------
> > > Threads: 13 , Unique waiting threads: 3 , Total waiting threads: 12
> > > -------------
> > >     java.lang.Thread.State: WAITING (on object monitor)
> > >         at java.lang.Object.wait(Native Method)
> > >         at java.lang.Object.wait(Object.java:503)
> > >         at
> > > org.fusesource.hawtdispatch.SettableFuture$class.await(Future.scala:71)
> > >         - locked <0xXXXXXXXXXXXXXXXX> (a
> > > org.fusesource.hawtdispatch.SettableFuture$mutex$)
> > >         at
> > > org.fusesource.hawtdispatch.Future$$anon$1.await(Future.scala:122)
> > >         at
> > org.fusesource.hawtdispatch.Future$class.apply(Future.scala:28)
> > >         at
> > > org.fusesource.hawtdispatch.Future$$anon$1.apply(Future.scala:122)
> > >         at
> > >
> > >
> >
> org.fusesource.hawtdispatch.package$RichExecutorTrait$class.sync(hawtdispatch.scala:106)
> > >         at
> > >
> > >
> >
> org.fusesource.hawtdispatch.package$RichExecutor.sync(hawtdispatch.scala:142)
> > >         at
> > >
> > >
> >
> org.apache.activemq.leveldb.DBManager.destroyQueueStore(DBManager.scala:769)
> > >         at
> > >
> > >
> >
> org.apache.activemq.leveldb.LevelDBStore$$anonfun$removeQueueMessageStore$1.apply(LevelDBStore.scala:588)
> > >         at
> > >
> > >
> >
> org.apache.activemq.leveldb.LevelDBStore$$anonfun$removeQueueMessageStore$1.apply(LevelDBStore.scala:587)
> > >         at scala.Option.foreach(Option.scala:245)
> > >         at
> > >
> > >
> >
> org.apache.activemq.leveldb.LevelDBStore.removeQueueMessageStore(LevelDBStore.scala:587)
> > >         - locked <0xXXXXXXXXXXXXXXXX> (a
> > > org.apache.activemq.store.leveldb.LevelDBPersistenceAdapter)
> > >         at
> > >
> > >
> >
> org.apache.activemq.broker.region.DestinationFactoryImpl.removeDestination(DestinationFactoryImpl.java:113)
> > >         at
> > >
> > >
> >
> org.apache.activemq.broker.region.AbstractRegion.dispose(AbstractRegion.java:592)
> > >         at
> > >
> > >
> >
> org.apache.activemq.broker.region.AbstractRegion.removeDestination(AbstractRegion.java:222)
> > >         at
> > >
> > >
> >
> org.apache.activemq.broker.jmx.ManagedQueueRegion.removeDestination(ManagedQueueRegion.java:62)
> > >         at
> > >
> > >
> >
> org.apache.activemq.broker.region.RegionBroker.removeDestination(RegionBroker.java:340)
> > >         at
> > >
> > >
> >
> org.apache.activemq.broker.BrokerFilter.removeDestination(BrokerFilter.java:172)
> > >         at
> > >
> > >
> >
> org.apache.activemq.broker.BrokerFilter.removeDestination(BrokerFilter.java:172)
> > >         at
> > >
> > >
> >
> org.apache.activemq.advisory.AdvisoryBroker.removeDestination(AdvisoryBroker.java:212)
> > >         at
> > >
> > >
> >
> org.apache.activemq.broker.BrokerFilter.removeDestination(BrokerFilter.java:172)
> > >         at
> > >
> > >
> >
> org.apache.activemq.broker.BrokerFilter.removeDestination(BrokerFilter.java:172)
> > >         at
> > >
> > >
> >
> org.apache.activemq.broker.MutableBrokerFilter.removeDestination(MutableBrokerFilter.java:177)
> > >         at
> > >
> > >
> >
> org.apache.activemq.broker.region.RegionBroker.purgeInactiveDestinations(RegionBroker.java:860)
> > >         at
> > >
> >
> org.apache.activemq.broker.region.RegionBroker$1.run(RegionBroker.java:109)
> > >         at
> > >
> > >
> >
> org.apache.activemq.thread.SchedulerTimerTask.run(SchedulerTimerTask.java:33)
> > >         at java.util.TimerThread.mainLoop(Timer.java:555)
> > >         at java.util.TimerThread.run(Timer.java:505)
> > >
> > >     Lockable ownable synchronizers:
> > >          - <0x00000005cdb4ac00>
> > >          - <0x00000005ce152b78>
> > >
> > >         5 waiting threads:
> > >         --------
> > >         java.lang.Thread.State: WAITING (parking)
> > >             at sun.misc.Unsafe.park(Native Method)
> > >             - parking to wait for  <0x00000005ce152b78> (a
> > > java.util.concurrent.locks.ReentrantReadWriteLock$NonfairSync)
> > >             at
> > > java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
> > >             at
> > >
> > >
> >
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:834)
> > >             at
> > >
> > >
> >
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireShared(AbstractQueuedSynchronizer.java:964)
> > >             at
> > >
> > >
> >
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireShared(AbstractQueuedSynchronizer.java:1282)
> > >             at
> > >
> > >
> >
> java.util.concurrent.locks.ReentrantReadWriteLock$ReadLock.lock(ReentrantReadWriteLock.java:731)
> > >             at
> > >
> > >
> >
> org.apache.activemq.broker.region.RegionBroker.addConsumer(RegionBroker.java:402)
> > >             at
> > >
> > >
> >
> org.apache.activemq.broker.jmx.ManagedRegionBroker.addConsumer(ManagedRegionBroker.java:244)
> > >             at
> > >
> org.apache.activemq.broker.BrokerFilter.addConsumer(BrokerFilter.java:97)
> > >             at
> > >
> org.apache.activemq.broker.BrokerFilter.addConsumer(BrokerFilter.java:97)
> > >             at
> > >
> > >
> >
> org.apache.activemq.advisory.AdvisoryBroker.addConsumer(AdvisoryBroker.java:101)
> > >             at
> > >
> org.apache.activemq.broker.BrokerFilter.addConsumer(BrokerFilter.java:97)
> > >             at
> > >
> org.apache.activemq.broker.BrokerFilter.addConsumer(BrokerFilter.java:97)
> > >             at
> > >
> > >
> >
> org.apache.activemq.broker.MutableBrokerFilter.addConsumer(MutableBrokerFilter.java:102)
> > >             at
> > >
> > >
> >
> org.apache.activemq.broker.TransportConnection.processAddConsumer(TransportConnection.java:618)
> > >             at
> > > org.apache.activemq.command.ConsumerInfo.visit(ConsumerInfo.java:349)
> > >             at
> > >
> > >
> >
> org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:294)
> > >             at
> > >
> > >
> >
> org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:148)
> > >             at
> > >
> > >
> >
> org.apache.activemq.transport.MutexTransport.onCommand(MutexTransport.java:50)
> > >             at
> > >
> > >
> >
> org.apache.activemq.transport.WireFormatNegotiator.onCommand(WireFormatNegotiator.java:113)
> > >             at
> > >
> > >
> >
> org.apache.activemq.transport.AbstractInactivityMonitor.onCommand(AbstractInactivityMonitor.java:270)
> > >             at
> > >
> > >
> >
> org.apache.activemq.transport.TransportSupport.doConsume(TransportSupport.java:83)
> > >             at
> > >
> >
> org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:214)
> > >             at
> > >
> org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:196)
> > >             at java.lang.Thread.run(Thread.java:745)
> > >
> > >
> > >         1 waiting threads:
> > >         --------
> > >         java.lang.Thread.State: WAITING (parking)
> > >             at sun.misc.Unsafe.park(Native Method)
> > >             - parking to wait for  <0x00000005ce152b78> (a
> > > java.util.concurrent.locks.ReentrantReadWriteLock$NonfairSync)
> > >             at
> > > java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
> > >             at
> > >
> > >
> >
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:834)
> > >             at
> > >
> > >
> >
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireShared(AbstractQueuedSynchronizer.java:964)
> > >             at
> > >
> > >
> >
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireShared(AbstractQueuedSynchronizer.java:1282)
> > >             at
> > >
> > >
> >
> java.util.concurrent.locks.ReentrantReadWriteLock$ReadLock.lock(ReentrantReadWriteLock.java:731)
> > >             at
> > >
> > >
> >
> org.apache.activemq.broker.region.RegionBroker.addProducer(RegionBroker.java:371)
> > >             at
> > >
> > >
> >
> org.apache.activemq.broker.jmx.ManagedRegionBroker.addProducer(ManagedRegionBroker.java:267)
> > >             at
> > >
> >
> org.apache.activemq.broker.BrokerFilter.addProducer(BrokerFilter.java:102)
> > >             at
> > >
> >
> org.apache.activemq.broker.BrokerFilter.addProducer(BrokerFilter.java:102)
> > >             at
> > >
> > >
> >
> org.apache.activemq.advisory.AdvisoryBroker.addProducer(AdvisoryBroker.java:172)
> > >             at
> > >
> > >
> >
> org.apache.activemq.broker.CompositeDestinationBroker.addProducer(CompositeDestinationBroker.java:56)
> > >             at
> > >
> >
> org.apache.activemq.broker.BrokerFilter.addProducer(BrokerFilter.java:102)
> > >             at
> > >
> > >
> >
> org.apache.activemq.broker.MutableBrokerFilter.addProducer(MutableBrokerFilter.java:107)
> > >             at
> > >
> > >
> >
> org.apache.activemq.broker.TransportConnection.processAddProducer(TransportConnection.java:565)
> > >             at
> > > org.apache.activemq.command.ProducerInfo.visit(ProducerInfo.java:108)
> > >             at
> > >
> > >
> >
> org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:294)
> > >             at
> > >
> > >
> >
> org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:148)
> > >             at
> > >
> > >
> >
> org.apache.activemq.transport.MutexTransport.onCommand(MutexTransport.java:50)
> > >             at
> > >
> > >
> >
> org.apache.activemq.transport.WireFormatNegotiator.onCommand(WireFormatNegotiator.java:113)
> > >             at
> > >
> > >
> >
> org.apache.activemq.transport.AbstractInactivityMonitor.onCommand(AbstractInactivityMonitor.java:270)
> > >             at
> > >
> > >
> >
> org.apache.activemq.transport.TransportSupport.doConsume(TransportSupport.java:83)
> > >             at
> > >
> >
> org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:214)
> > >             at
> > >
> org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:196)
> > >             at java.lang.Thread.run(Thread.java:745)
> > >
> > >
> > >         6 waiting threads:
> > >         --------
> > >         java.lang.Thread.State: WAITING (parking)
> > >             at sun.misc.Unsafe.park(Native Method)
> > >             - parking to wait for  <0x00000005ce152b78> (a
> > > java.util.concurrent.locks.ReentrantReadWriteLock$NonfairSync)
> > >             at
> > > java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
> > >             at
> > >
> > >
> >
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:834)
> > >             at
> > >
> > >
> >
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireShared(AbstractQueuedSynchronizer.java:964)
> > >             at
> > >
> > >
> >
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireShared(AbstractQueuedSynchronizer.java:1282)
> > >             at
> > >
> > >
> >
> java.util.concurrent.locks.ReentrantReadWriteLock$ReadLock.lock(ReentrantReadWriteLock.java:731)
> > >             at
> > >
> > >
> >
> org.apache.activemq.broker.region.RegionBroker.removeConsumer(RegionBroker.java:413)
> > >             at
> > >
> > >
> >
> org.apache.activemq.broker.jmx.ManagedRegionBroker.removeConsumer(ManagedRegionBroker.java:262)
> > >             at
> > >
> > >
> >
> org.apache.activemq.broker.BrokerFilter.removeConsumer(BrokerFilter.java:132)
> > >             at
> > >
> > >
> >
> org.apache.activemq.broker.BrokerFilter.removeConsumer(BrokerFilter.java:132)
> > >             at
> > >
> > >
> >
> org.apache.activemq.advisory.AdvisoryBroker.removeConsumer(AdvisoryBroker.java:263)
> > >             at
> > >
> > >
> >
> org.apache.activemq.broker.BrokerFilter.removeConsumer(BrokerFilter.java:132)
> > >             at
> > >
> > >
> >
> org.apache.activemq.broker.BrokerFilter.removeConsumer(BrokerFilter.java:132)
> > >             at
> > >
> > >
> >
> org.apache.activemq.broker.MutableBrokerFilter.removeConsumer(MutableBrokerFilter.java:137)
> > >             at
> > >
> > >
> >
> org.apache.activemq.broker.TransportConnection.processRemoveConsumer(TransportConnection.java:650)
> > >             at
> > >
> > >
> >
> org.apache.activemq.broker.TransportConnection.processRemoveSession(TransportConnection.java:689)
> > >             at
> > >
> > >
> >
> org.apache.activemq.broker.TransportConnection.processRemoveConnection(TransportConnection.java:801)
> > >             - locked <0xXXXXXXXXXXXXXXXX> (a
> > > org.apache.activemq.broker.jmx.ManagedTransportConnection)
> > >             at
> > >
> > >
> >
> org.apache.activemq.broker.TransportConnection.doStop(TransportConnection.java:1138)
> > >             at
> > >
> > >
> >
> org.apache.activemq.broker.TransportConnection$4.run(TransportConnection.java:1068)
> > >             at
> > >
> > >
> >
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> > >             at
> > >
> > >
> >
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> > >             at java.lang.Thread.run(Thread.java:745)
> > >
> > >
> > >
> > > --
> > >
> > > Founder/CEO Spinn3r.com
> > > Location: *San Francisco, CA*
> > > blog: http://burtonator.wordpress.com
> > > … or check out my Google+ profile
> > > <https://plus.google.com/102718274791889610666/posts>
> > > <http://spinn3r.com>
> > >
> >
>
>
>
> --
>
> Founder/CEO Spinn3r.com
> Location: *San Francisco, CA*
> blog: http://burtonator.wordpress.com
> … or check out my Google+ profile
> <https://plus.google.com/102718274791889610666/posts>
> <http://spinn3r.com>
>

Reply via email to