I think we can simplify this one. and do a delete without even getting message ids.
On Mon, Jul 6, 2015 at 5:05 PM, Pamod Sylvester <pa...@wso2.com> wrote: > Hi Hasitha, > > With reference to the discussion in [1], this was done to prevent > Tombstone overwhelming exception when Cassandra was used as the message > store. > > AFAIR The process of purging was as follows, > > 1) From MessageMetaData CF, retrieve the corresponding message ids > 2) Delete the corresponding messages from MessageMetaData and then delete > them from the MessageContent. > > When transacting > 1,000,000 messages and disconnection of the > subscription will go through the above mentioned process. > > The issue is, > > a) Message content is not partitioned based on the queue name as Message > Meta data. so we cannot delete the content row wise, which restricted us to > retrieve ids from Message Meta data and remove them from content. > b) When retrieving message meta data (let's say we used a range query to > select all the ids) there will be tombstones that will be selected with > that query that will hit the limit, causing the Tombstone overwhelming > exception. > > As a solution, when removeMessagesOfDestinationForNode() is called we > get the current fresh slot, and the idea of getting the start id of the > slot is that it assures that the message ids that should be selected from > the message meta data should be greater than the value defined in the start > id (since messages with ids < start id are the messages that have being > delivered already that has caused tombstones). > > Also, I believe we could change the flow now since we discontinue > Cassandra. Since in RDBMS you could select all ids without any issue. > > Hope this explains your query. > > [1] [Dev][MB] TombstoneOverwhelmingException When Purge Operation is > Triggered When Subscription Disconnection/Deletion > > Thanks, > Pamod > > On Mon, Jul 6, 2015 at 7:00 AM, Hasitha Hiranya <hasit...@wso2.com> wrote: > >> Hi, >> >> Identified an issue regarding message purging when last subscriber for a >> topic is disconnected from broker cluster. >> >> @org.wso2.andes.kernel.OrphanedMessageHandler >> >> >> private void removeMessagesOfDestinationForNode(String destination, >> String ownerName, >> boolean isTopic) throws AndesException { >> >> try { >> >> Long startMessageID = null; >> Long endMessageID = null; >> >> //Will first retrieve the last unassigned slot id >> String nodeID = >> ClusterResourceHolder.getInstance().getClusterManager().getMyNodeID(); >> //Will get the storage queue name >> * String storageQueue = >> AndesUtils.getStorageQueueForDestination(destination, nodeID, isTopic);* >> * //We get the relevant slot from the coordinator if running >> on cluster mode* >> * Slot unassignedSlot = >> MessagingEngine.getInstance().getSlotCoordinator().getSlot(storageQueue);* >> * //We need to get the starting message ID to inform the DB to >> start slicing the message from there* >> * //This step would be done in order to ensure that tombstones >> will not be fetched during the querying* >> * //operation* >> * startMessageID = unassignedSlot.getStartMessageId();* >> endMessageID = unassignedSlot.getEndMessageId(); >> >> // This is a class used by AndesSubscriptionManager. Andes >> Subscription Manager is behind Disruptor layer. >> // Hence the call should be made to MessagingEngine NOT Andes. >> // Calling Andes methods from here will lead to probable >> deadlocks if Futures are used. >> // NOTE: purge call should be made to MessagingEngine not >> Andes >> if (0 < endMessageID) { >> //If the slot id is 0, which means for the given storage >> queue there're no unassigned slots which means >> //we don't need to purge messages in this case >> //The purpose of purge operation is to make sure that >> unassigned slots will be removed if no subs exists >> MessagingEngine.getInstance().purgeMessages(destination, >> ownerName, isTopic, startMessageID); >> } >> } catch (ConnectionException e) { >> String mesage = "Error while establishing a connection with >> the thrift server"; >> log.error(mesage); >> throw new AndesException(mesage, e); >> } >> >> } >> >> Why we need a start message id here ? >> What about purging the whole internal queue (related to topic) ? >> >> *MessagingEngine.getInstance().purgeMessages(destination, ownerName, >> isTopic, 0);* >> >> Thanks >> -- >> *Hasitha Abeykoon* >> Senior Software Engineer; WSO2, Inc.; http://wso2.com >> *cell:* *+94 719363063* >> *blog: **abeykoon.blogspot.com* <http://abeykoon.blogspot.com> >> >> > > > -- > *Pamod Sylvester * > > *WSO2 Inc.; http://wso2.com <http://wso2.com>* > cell: +94 77 7779495 > -- Ramith Jayasinghe Technical Lead WSO2 Inc., http://wso2.com lean.enterprise.middleware E: ram...@wso2.com P: +94 777542851
_______________________________________________ Dev mailing list Dev@wso2.org http://wso2.org/cgi-bin/mailman/listinfo/dev