Re: [Dev] Issue in removing messages of non durable topics

2015-07-06 Thread Hasitha Hiranya
@Pamod,

Any idea on this?

Thanks

On Mon, Jul 6, 2015 at 4:30 PM, Hasitha Hiranya hasit...@wso2.com wrote:

 Hi,

 Identified an issue regarding message purging when last subscriber for a
 topic is disconnected from broker cluster.

 @org.wso2.andes.kernel.OrphanedMessageHandler


 private void removeMessagesOfDestinationForNode(String destination,
 String ownerName,
 boolean isTopic) throws AndesException {

 try {

 Long startMessageID = null;
 Long endMessageID = null;

 //Will first retrieve the last unassigned slot id
 String nodeID =
 ClusterResourceHolder.getInstance().getClusterManager().getMyNodeID();
 //Will get the storage queue name
 *String storageQueue =
 AndesUtils.getStorageQueueForDestination(destination, nodeID, isTopic);*
 *//We get the relevant slot from the coordinator if running on
 cluster mode*
 *Slot unassignedSlot =
 MessagingEngine.getInstance().getSlotCoordinator().getSlot(storageQueue);*
 *//We need to get the starting message ID to inform the DB to
 start slicing the message from there*
 *//This step would be done in order to ensure that tombstones
 will not be fetched during the querying*
 *//operation*
 *startMessageID = unassignedSlot.getStartMessageId();*
 endMessageID = unassignedSlot.getEndMessageId();

 // This is a class used by AndesSubscriptionManager. Andes
 Subscription Manager is behind Disruptor layer.
 // Hence the call should be made to MessagingEngine NOT Andes.
 // Calling Andes methods from here will lead to probable
 deadlocks if Futures are used.
 // NOTE: purge call should be made to MessagingEngine not Andes
 if (0  endMessageID) {
 //If the slot id is 0, which means for the given storage
 queue there're no unassigned slots which means
 //we don't need to purge messages in this case
 //The purpose of purge operation is to make sure that
 unassigned slots will be removed if no subs exists
 MessagingEngine.getInstance().purgeMessages(destination,
 ownerName, isTopic, startMessageID);
 }
 } catch (ConnectionException e) {
 String mesage = Error while establishing a connection with
 the thrift server;
 log.error(mesage);
 throw new AndesException(mesage, e);
 }

 }

 Why we need a start message id here ?
 What about purging the whole internal queue (related to topic) ?

 *MessagingEngine.getInstance().purgeMessages(destination, ownerName,
 isTopic, 0);*

 Thanks
 --
 *Hasitha Abeykoon*
 Senior Software Engineer; WSO2, Inc.; http://wso2.com
 *cell:* *+94 719363063*
 *blog: **abeykoon.blogspot.com* http://abeykoon.blogspot.com




-- 
*Hasitha Abeykoon*
Senior Software Engineer; WSO2, Inc.; http://wso2.com
*cell:* *+94 719363063*
*blog: **abeykoon.blogspot.com* http://abeykoon.blogspot.com
___
Dev mailing list
Dev@wso2.org
http://wso2.org/cgi-bin/mailman/listinfo/dev


Re: [Dev] Issue in removing messages of non durable topics

2015-07-06 Thread Pamod Sylvester
Hi Hasitha,

With reference to the discussion in [1], this was done to prevent Tombstone
overwhelming exception when Cassandra was used as the message store.

AFAIR The process of purging was as follows,

1) From MessageMetaData CF, retrieve the corresponding message ids
2) Delete the corresponding messages from MessageMetaData and then delete
them from the MessageContent.

When transacting  1,000,000 messages and disconnection of the subscription
will go through the above mentioned process.

The issue is,

a) Message content is not partitioned based on the queue name as Message
Meta data. so we cannot delete the content row wise, which restricted us to
retrieve ids from Message Meta data and remove them from content.
b) When retrieving message meta data (let's say we used a range query to
select all the ids) there will be tombstones that will be selected with
that query that will hit the limit, causing the Tombstone overwhelming
exception.

As a solution, when  removeMessagesOfDestinationForNode() is called we get
the current fresh slot, and the idea of getting the start id of the slot is
that it assures that the message ids that should be selected from the
message meta data should be greater than the value defined in the start id
(since messages with ids  start id are the messages that have being
delivered already that has caused tombstones).

Also, I believe we could change the flow now since we discontinue
Cassandra. Since in RDBMS you could select all ids without any issue.

Hope this explains your query.

[1] [Dev][MB] TombstoneOverwhelmingException When Purge Operation is
Triggered When Subscription Disconnection/Deletion

Thanks,
Pamod

On Mon, Jul 6, 2015 at 7:00 AM, Hasitha Hiranya hasit...@wso2.com wrote:

 Hi,

 Identified an issue regarding message purging when last subscriber for a
 topic is disconnected from broker cluster.

 @org.wso2.andes.kernel.OrphanedMessageHandler


 private void removeMessagesOfDestinationForNode(String destination,
 String ownerName,
 boolean isTopic) throws AndesException {

 try {

 Long startMessageID = null;
 Long endMessageID = null;

 //Will first retrieve the last unassigned slot id
 String nodeID =
 ClusterResourceHolder.getInstance().getClusterManager().getMyNodeID();
 //Will get the storage queue name
 *String storageQueue =
 AndesUtils.getStorageQueueForDestination(destination, nodeID, isTopic);*
 *//We get the relevant slot from the coordinator if running on
 cluster mode*
 *Slot unassignedSlot =
 MessagingEngine.getInstance().getSlotCoordinator().getSlot(storageQueue);*
 *//We need to get the starting message ID to inform the DB to
 start slicing the message from there*
 *//This step would be done in order to ensure that tombstones
 will not be fetched during the querying*
 *//operation*
 *startMessageID = unassignedSlot.getStartMessageId();*
 endMessageID = unassignedSlot.getEndMessageId();

 // This is a class used by AndesSubscriptionManager. Andes
 Subscription Manager is behind Disruptor layer.
 // Hence the call should be made to MessagingEngine NOT Andes.
 // Calling Andes methods from here will lead to probable
 deadlocks if Futures are used.
 // NOTE: purge call should be made to MessagingEngine not Andes
 if (0  endMessageID) {
 //If the slot id is 0, which means for the given storage
 queue there're no unassigned slots which means
 //we don't need to purge messages in this case
 //The purpose of purge operation is to make sure that
 unassigned slots will be removed if no subs exists
 MessagingEngine.getInstance().purgeMessages(destination,
 ownerName, isTopic, startMessageID);
 }
 } catch (ConnectionException e) {
 String mesage = Error while establishing a connection with
 the thrift server;
 log.error(mesage);
 throw new AndesException(mesage, e);
 }

 }

 Why we need a start message id here ?
 What about purging the whole internal queue (related to topic) ?

 *MessagingEngine.getInstance().purgeMessages(destination, ownerName,
 isTopic, 0);*

 Thanks
 --
 *Hasitha Abeykoon*
 Senior Software Engineer; WSO2, Inc.; http://wso2.com
 *cell:* *+94 719363063*
 *blog: **abeykoon.blogspot.com* http://abeykoon.blogspot.com




-- 
*Pamod Sylvester *

*WSO2 Inc.; http://wso2.com http://wso2.com*
cell: +94 77 7779495
___
Dev mailing list
Dev@wso2.org
http://wso2.org/cgi-bin/mailman/listinfo/dev


Re: [Dev] Issue in removing messages of non durable topics

2015-07-06 Thread Ramith Jayasinghe
I think we can simplify this one. and do a delete without even getting
message ids.


On Mon, Jul 6, 2015 at 5:05 PM, Pamod Sylvester pa...@wso2.com wrote:

 Hi Hasitha,

 With reference to the discussion in [1], this was done to prevent
 Tombstone overwhelming exception when Cassandra was used as the message
 store.

 AFAIR The process of purging was as follows,

 1) From MessageMetaData CF, retrieve the corresponding message ids
 2) Delete the corresponding messages from MessageMetaData and then delete
 them from the MessageContent.

 When transacting  1,000,000 messages and disconnection of the
 subscription will go through the above mentioned process.

 The issue is,

 a) Message content is not partitioned based on the queue name as Message
 Meta data. so we cannot delete the content row wise, which restricted us to
 retrieve ids from Message Meta data and remove them from content.
 b) When retrieving message meta data (let's say we used a range query to
 select all the ids) there will be tombstones that will be selected with
 that query that will hit the limit, causing the Tombstone overwhelming
 exception.

 As a solution, when  removeMessagesOfDestinationForNode() is called we
 get the current fresh slot, and the idea of getting the start id of the
 slot is that it assures that the message ids that should be selected from
 the message meta data should be greater than the value defined in the start
 id (since messages with ids  start id are the messages that have being
 delivered already that has caused tombstones).

 Also, I believe we could change the flow now since we discontinue
 Cassandra. Since in RDBMS you could select all ids without any issue.

 Hope this explains your query.

 [1] [Dev][MB] TombstoneOverwhelmingException When Purge Operation is
 Triggered When Subscription Disconnection/Deletion

 Thanks,
 Pamod

 On Mon, Jul 6, 2015 at 7:00 AM, Hasitha Hiranya hasit...@wso2.com wrote:

 Hi,

 Identified an issue regarding message purging when last subscriber for a
 topic is disconnected from broker cluster.

 @org.wso2.andes.kernel.OrphanedMessageHandler


 private void removeMessagesOfDestinationForNode(String destination,
 String ownerName,
 boolean isTopic) throws AndesException {

 try {

 Long startMessageID = null;
 Long endMessageID = null;

 //Will first retrieve the last unassigned slot id
 String nodeID =
 ClusterResourceHolder.getInstance().getClusterManager().getMyNodeID();
 //Will get the storage queue name
 *String storageQueue =
 AndesUtils.getStorageQueueForDestination(destination, nodeID, isTopic);*
 *//We get the relevant slot from the coordinator if running
 on cluster mode*
 *Slot unassignedSlot =
 MessagingEngine.getInstance().getSlotCoordinator().getSlot(storageQueue);*
 *//We need to get the starting message ID to inform the DB to
 start slicing the message from there*
 *//This step would be done in order to ensure that tombstones
 will not be fetched during the querying*
 *//operation*
 *startMessageID = unassignedSlot.getStartMessageId();*
 endMessageID = unassignedSlot.getEndMessageId();

 // This is a class used by AndesSubscriptionManager. Andes
 Subscription Manager is behind Disruptor layer.
 // Hence the call should be made to MessagingEngine NOT Andes.
 // Calling Andes methods from here will lead to probable
 deadlocks if Futures are used.
 // NOTE: purge call should be made to MessagingEngine not
 Andes
 if (0  endMessageID) {
 //If the slot id is 0, which means for the given storage
 queue there're no unassigned slots which means
 //we don't need to purge messages in this case
 //The purpose of purge operation is to make sure that
 unassigned slots will be removed if no subs exists
 MessagingEngine.getInstance().purgeMessages(destination,
 ownerName, isTopic, startMessageID);
 }
 } catch (ConnectionException e) {
 String mesage = Error while establishing a connection with
 the thrift server;
 log.error(mesage);
 throw new AndesException(mesage, e);
 }

 }

 Why we need a start message id here ?
 What about purging the whole internal queue (related to topic) ?

 *MessagingEngine.getInstance().purgeMessages(destination, ownerName,
 isTopic, 0);*

 Thanks
 --
 *Hasitha Abeykoon*
 Senior Software Engineer; WSO2, Inc.; http://wso2.com
 *cell:* *+94 719363063*
 *blog: **abeykoon.blogspot.com* http://abeykoon.blogspot.com




 --
 *Pamod Sylvester *

 *WSO2 Inc.; http://wso2.com http://wso2.com*
 cell: +94 77 7779495




-- 
Ramith Jayasinghe
Technical Lead
WSO2 Inc., http://wso2.com
lean.enterprise.middleware

E: ram...@wso2.com
P: +94 777542851

Re: [Dev] Issue in removing messages of non durable topics

2015-07-06 Thread Pamod Sylvester
+1 if we can partition contents by the queue name, we could delete it
directly.

On Mon, Jul 6, 2015 at 7:36 AM, Ramith Jayasinghe ram...@wso2.com wrote:

 I think we can simplify this one. and do a delete without even getting
 message ids.


 On Mon, Jul 6, 2015 at 5:05 PM, Pamod Sylvester pa...@wso2.com wrote:

 Hi Hasitha,

 With reference to the discussion in [1], this was done to prevent
 Tombstone overwhelming exception when Cassandra was used as the message
 store.

 AFAIR The process of purging was as follows,

 1) From MessageMetaData CF, retrieve the corresponding message ids
 2) Delete the corresponding messages from MessageMetaData and then delete
 them from the MessageContent.

 When transacting  1,000,000 messages and disconnection of the
 subscription will go through the above mentioned process.

 The issue is,

 a) Message content is not partitioned based on the queue name as Message
 Meta data. so we cannot delete the content row wise, which restricted us to
 retrieve ids from Message Meta data and remove them from content.
 b) When retrieving message meta data (let's say we used a range query to
 select all the ids) there will be tombstones that will be selected with
 that query that will hit the limit, causing the Tombstone overwhelming
 exception.

 As a solution, when  removeMessagesOfDestinationForNode() is called we
 get the current fresh slot, and the idea of getting the start id of the
 slot is that it assures that the message ids that should be selected from
 the message meta data should be greater than the value defined in the start
 id (since messages with ids  start id are the messages that have being
 delivered already that has caused tombstones).

 Also, I believe we could change the flow now since we discontinue
 Cassandra. Since in RDBMS you could select all ids without any issue.

 Hope this explains your query.

 [1] [Dev][MB] TombstoneOverwhelmingException When Purge Operation is
 Triggered When Subscription Disconnection/Deletion

 Thanks,
 Pamod

 On Mon, Jul 6, 2015 at 7:00 AM, Hasitha Hiranya hasit...@wso2.com
 wrote:

 Hi,

 Identified an issue regarding message purging when last subscriber for a
 topic is disconnected from broker cluster.

 @org.wso2.andes.kernel.OrphanedMessageHandler


 private void removeMessagesOfDestinationForNode(String destination,
 String ownerName,
 boolean isTopic) throws AndesException {

 try {

 Long startMessageID = null;
 Long endMessageID = null;

 //Will first retrieve the last unassigned slot id
 String nodeID =
 ClusterResourceHolder.getInstance().getClusterManager().getMyNodeID();
 //Will get the storage queue name
 *String storageQueue =
 AndesUtils.getStorageQueueForDestination(destination, nodeID, isTopic);*
 *//We get the relevant slot from the coordinator if running
 on cluster mode*
 *Slot unassignedSlot =
 MessagingEngine.getInstance().getSlotCoordinator().getSlot(storageQueue);*
 *//We need to get the starting message ID to inform the DB
 to start slicing the message from there*
 *//This step would be done in order to ensure that
 tombstones will not be fetched during the querying*
 *//operation*
 *startMessageID = unassignedSlot.getStartMessageId();*
 endMessageID = unassignedSlot.getEndMessageId();

 // This is a class used by AndesSubscriptionManager. Andes
 Subscription Manager is behind Disruptor layer.
 // Hence the call should be made to MessagingEngine NOT
 Andes.
 // Calling Andes methods from here will lead to probable
 deadlocks if Futures are used.
 // NOTE: purge call should be made to MessagingEngine not
 Andes
 if (0  endMessageID) {
 //If the slot id is 0, which means for the given storage
 queue there're no unassigned slots which means
 //we don't need to purge messages in this case
 //The purpose of purge operation is to make sure that
 unassigned slots will be removed if no subs exists
 MessagingEngine.getInstance().purgeMessages(destination,
 ownerName, isTopic, startMessageID);
 }
 } catch (ConnectionException e) {
 String mesage = Error while establishing a connection with
 the thrift server;
 log.error(mesage);
 throw new AndesException(mesage, e);
 }

 }

 Why we need a start message id here ?
 What about purging the whole internal queue (related to topic) ?

 *MessagingEngine.getInstance().purgeMessages(destination, ownerName,
 isTopic, 0);*

 Thanks
 --
 *Hasitha Abeykoon*
 Senior Software Engineer; WSO2, Inc.; http://wso2.com
 *cell:* *+94 719363063*
 *blog: **abeykoon.blogspot.com* http://abeykoon.blogspot.com




 --
 *Pamod Sylvester *

 *WSO2 Inc.; http://wso2.com http://wso2.com*
 cell: