Hi,
Identified an issue regarding message purging when last subscriber for a
topic is disconnected from broker cluster.
@org.wso2.andes.kernel.OrphanedMessageHandler
private void removeMessagesOfDestinationForNode(String destination,
String ownerName,
boolean isTopic) throws AndesException {
try {
Long startMessageID = null;
Long endMessageID = null;
//Will first retrieve the last unassigned slot id
String nodeID =
ClusterResourceHolder.getInstance().getClusterManager().getMyNodeID();
//Will get the storage queue name
* String storageQueue =
AndesUtils.getStorageQueueForDestination(destination, nodeID, isTopic);*
* //We get the relevant slot from the coordinator if running on
cluster mode*
* Slot unassignedSlot =
MessagingEngine.getInstance().getSlotCoordinator().getSlot(storageQueue);*
* //We need to get the starting message ID to inform the DB to
start slicing the message from there*
* //This step would be done in order to ensure that tombstones
will not be fetched during the querying*
* //operation*
* startMessageID = unassignedSlot.getStartMessageId();*
endMessageID = unassignedSlot.getEndMessageId();
// This is a class used by AndesSubscriptionManager. Andes
Subscription Manager is behind Disruptor layer.
// Hence the call should be made to MessagingEngine NOT Andes.
// Calling Andes methods from here will lead to probable
deadlocks if Futures are used.
// NOTE: purge call should be made to MessagingEngine not Andes
if (0 < endMessageID) {
//If the slot id is 0, which means for the given storage
queue there're no unassigned slots which means
//we don't need to purge messages in this case
//The purpose of purge operation is to make sure that
unassigned slots will be removed if no subs exists
MessagingEngine.getInstance().purgeMessages(destination,
ownerName, isTopic, startMessageID);
}
} catch (ConnectionException e) {
String mesage = "Error while establishing a connection with the
thrift server";
log.error(mesage);
throw new AndesException(mesage, e);
}
}
Why we need a start message id here ?
What about purging the whole internal queue (related to topic) ?
*MessagingEngine.getInstance().purgeMessages(destination, ownerName,
isTopic, 0);*
Thanks
--
*Hasitha Abeykoon*
Senior Software Engineer; WSO2, Inc.; http://wso2.com
*cell:* *+94 719363063*
*blog: **abeykoon.blogspot.com* <http://abeykoon.blogspot.com>
_______________________________________________
Dev mailing list
[email protected]
http://wso2.org/cgi-bin/mailman/listinfo/dev