I think we can simplify this one. and do a delete without even getting
message ids.


On Mon, Jul 6, 2015 at 5:05 PM, Pamod Sylvester <pa...@wso2.com> wrote:

> Hi Hasitha,
>
> With reference to the discussion in [1], this was done to prevent
> Tombstone overwhelming exception when Cassandra was used as the message
> store.
>
> AFAIR The process of purging was as follows,
>
> 1) From MessageMetaData CF, retrieve the corresponding message ids
> 2) Delete the corresponding messages from MessageMetaData and then delete
> them from the MessageContent.
>
> When transacting > 1,000,000 messages and disconnection of the
> subscription will go through the above mentioned process.
>
> The issue is,
>
> a) Message content is not partitioned based on the queue name as Message
> Meta data. so we cannot delete the content row wise, which restricted us to
> retrieve ids from Message Meta data and remove them from content.
> b) When retrieving message meta data (let's say we used a range query to
> select all the ids) there will be tombstones that will be selected with
> that query that will hit the limit, causing the Tombstone overwhelming
> exception.
>
> As a solution, when  removeMessagesOfDestinationForNode() is called we
> get the current fresh slot, and the idea of getting the start id of the
> slot is that it assures that the message ids that should be selected from
> the message meta data should be greater than the value defined in the start
> id (since messages with ids < start id are the messages that have being
> delivered already that has caused tombstones).
>
> Also, I believe we could change the flow now since we discontinue
> Cassandra. Since in RDBMS you could select all ids without any issue.
>
> Hope this explains your query.
>
> [1] [Dev][MB] TombstoneOverwhelmingException When Purge Operation is
> Triggered When Subscription Disconnection/Deletion
>
> Thanks,
> Pamod
>
> On Mon, Jul 6, 2015 at 7:00 AM, Hasitha Hiranya <hasit...@wso2.com> wrote:
>
>> Hi,
>>
>> Identified an issue regarding message purging when last subscriber for a
>> topic is disconnected from broker cluster.
>>
>> @org.wso2.andes.kernel.OrphanedMessageHandler
>>
>>
>> private void removeMessagesOfDestinationForNode(String destination,
>>                                                     String ownerName,
>> boolean isTopic) throws AndesException {
>>
>>         try {
>>
>>             Long startMessageID = null;
>>             Long endMessageID = null;
>>
>>             //Will first retrieve the last unassigned slot id
>>             String nodeID =
>> ClusterResourceHolder.getInstance().getClusterManager().getMyNodeID();
>>             //Will get the storage queue name
>> *            String storageQueue =
>> AndesUtils.getStorageQueueForDestination(destination, nodeID, isTopic);*
>> *            //We get the relevant slot from the coordinator if running
>> on cluster mode*
>> *            Slot unassignedSlot =
>> MessagingEngine.getInstance().getSlotCoordinator().getSlot(storageQueue);*
>> *            //We need to get the starting message ID to inform the DB to
>> start slicing the message from there*
>> *            //This step would be done in order to ensure that tombstones
>> will not be fetched during the querying*
>> *            //operation*
>> *            startMessageID = unassignedSlot.getStartMessageId();*
>>             endMessageID = unassignedSlot.getEndMessageId();
>>
>>             // This is a class used by AndesSubscriptionManager. Andes
>> Subscription Manager is behind Disruptor layer.
>>             // Hence the call should be made to MessagingEngine NOT Andes.
>>             // Calling Andes methods from here will lead to probable
>> deadlocks if Futures are used.
>>             // NOTE: purge call should be made to MessagingEngine not
>> Andes
>>             if (0 < endMessageID) {
>>                 //If the slot id is 0, which means for the given storage
>> queue there're no unassigned slots which means
>>                 //we don't need to purge messages in this case
>>                 //The purpose of purge operation is to make sure that
>> unassigned slots will be removed if no subs exists
>>                 MessagingEngine.getInstance().purgeMessages(destination,
>> ownerName, isTopic, startMessageID);
>>             }
>>         } catch (ConnectionException e) {
>>             String mesage = "Error while establishing a connection with
>> the thrift server";
>>             log.error(mesage);
>>             throw new AndesException(mesage, e);
>>         }
>>
>>     }
>>
>> Why we need a start message id here ?
>> What about purging the whole internal queue (related to topic) ?
>>
>> *MessagingEngine.getInstance().purgeMessages(destination, ownerName,
>> isTopic, 0);*
>>
>> Thanks
>> --
>> *Hasitha Abeykoon*
>> Senior Software Engineer; WSO2, Inc.; http://wso2.com
>> *cell:* *+94 719363063*
>> *blog: **abeykoon.blogspot.com* <http://abeykoon.blogspot.com>
>>
>>
>
>
> --
> *Pamod Sylvester *
>
> *WSO2 Inc.; http://wso2.com <http://wso2.com>*
> cell: +94 77 7779495
>



-- 
Ramith Jayasinghe
Technical Lead
WSO2 Inc., http://wso2.com
lean.enterprise.middleware

E: ram...@wso2.com
P: +94 777542851
_______________________________________________
Dev mailing list
Dev@wso2.org
http://wso2.org/cgi-bin/mailman/listinfo/dev

Reply via email to