@Pamod, Any idea on this?
Thanks On Mon, Jul 6, 2015 at 4:30 PM, Hasitha Hiranya <[email protected]> wrote: > Hi, > > Identified an issue regarding message purging when last subscriber for a > topic is disconnected from broker cluster. > > @org.wso2.andes.kernel.OrphanedMessageHandler > > > private void removeMessagesOfDestinationForNode(String destination, > String ownerName, > boolean isTopic) throws AndesException { > > try { > > Long startMessageID = null; > Long endMessageID = null; > > //Will first retrieve the last unassigned slot id > String nodeID = > ClusterResourceHolder.getInstance().getClusterManager().getMyNodeID(); > //Will get the storage queue name > * String storageQueue = > AndesUtils.getStorageQueueForDestination(destination, nodeID, isTopic);* > * //We get the relevant slot from the coordinator if running on > cluster mode* > * Slot unassignedSlot = > MessagingEngine.getInstance().getSlotCoordinator().getSlot(storageQueue);* > * //We need to get the starting message ID to inform the DB to > start slicing the message from there* > * //This step would be done in order to ensure that tombstones > will not be fetched during the querying* > * //operation* > * startMessageID = unassignedSlot.getStartMessageId();* > endMessageID = unassignedSlot.getEndMessageId(); > > // This is a class used by AndesSubscriptionManager. Andes > Subscription Manager is behind Disruptor layer. > // Hence the call should be made to MessagingEngine NOT Andes. > // Calling Andes methods from here will lead to probable > deadlocks if Futures are used. > // NOTE: purge call should be made to MessagingEngine not Andes > if (0 < endMessageID) { > //If the slot id is 0, which means for the given storage > queue there're no unassigned slots which means > //we don't need to purge messages in this case > //The purpose of purge operation is to make sure that > unassigned slots will be removed if no subs exists > MessagingEngine.getInstance().purgeMessages(destination, > ownerName, isTopic, startMessageID); > } > } catch (ConnectionException e) { > String mesage = "Error while establishing a connection with > the thrift server"; > log.error(mesage); > throw new AndesException(mesage, e); > } > > } > > Why we need a start message id here ? > What about purging the whole internal queue (related to topic) ? > > *MessagingEngine.getInstance().purgeMessages(destination, ownerName, > isTopic, 0);* > > Thanks > -- > *Hasitha Abeykoon* > Senior Software Engineer; WSO2, Inc.; http://wso2.com > *cell:* *+94 719363063* > *blog: **abeykoon.blogspot.com* <http://abeykoon.blogspot.com> > > -- *Hasitha Abeykoon* Senior Software Engineer; WSO2, Inc.; http://wso2.com *cell:* *+94 719363063* *blog: **abeykoon.blogspot.com* <http://abeykoon.blogspot.com>
_______________________________________________ Dev mailing list [email protected] http://wso2.org/cgi-bin/mailman/listinfo/dev
