Author: tabish
Date: Sat Sep 10 19:48:54 2011
New Revision: 1167582
URL: http://svn.apache.org/viewvc?rev=1167582&view=rev
Log:
fix for https://issues.apache.org/jira/browse/AMQ-3493
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java?rev=1167582&r1=1167581&r2=1167582&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java
Sat Sep 10 19:48:54 2011
@@ -113,7 +113,11 @@ public class StompSubscription {
}
}
- unconsumedMessage.clear();
+ if (!unconsumedMessage.isEmpty()) {
+ MessageAck ack = new MessageAck(unconsumedMessage.getLast(),
MessageAck.STANDARD_ACK_TYPE, unconsumedMessage.size());
+ protocolConverter.getStompTransport().sendToActiveMQ(ack);
+ unconsumedMessage.clear();
+ }
}
synchronized MessageAck onStompMessageAck(String messageId, TransactionId
transactionId) {
@@ -129,7 +133,11 @@ public class StompSubscription {
ack.setConsumerId(consumerInfo.getConsumerId());
if (ackMode == CLIENT_ACK) {
- ack.setAckType(MessageAck.STANDARD_ACK_TYPE);
+ if (transactionId == null) {
+ ack.setAckType(MessageAck.STANDARD_ACK_TYPE);
+ } else {
+ ack.setAckType(MessageAck.DELIVERED_ACK_TYPE);
+ }
int count = 0;
for (Iterator<?> iter = dispatchedMessage.entrySet().iterator();
iter.hasNext();) {
@@ -138,20 +146,16 @@ public class StompSubscription {
MessageId id = (MessageId)entry.getKey();
MessageDispatch msg = (MessageDispatch)entry.getValue();
- if (ack.getFirstMessageId() == null) {
- ack.setFirstMessageId(id);
- }
-
if (transactionId != null) {
if (!unconsumedMessage.contains(msg)) {
unconsumedMessage.add(msg);
+ count++;
}
} else {
iter.remove();
+ count++;
}
- count++;
-
if (id.equals(msgId)) {
ack.setLastMessageId(id);
break;
Modified:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java?rev=1167582&r1=1167581&r2=1167582&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java
(original)
+++
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java
Sat Sep 10 19:48:54 2011
@@ -36,6 +36,7 @@ import javax.jms.MessageProducer;
import javax.jms.ObjectMessage;
import javax.jms.Session;
import javax.jms.TextMessage;
+import javax.management.MalformedObjectNameException;
import javax.management.ObjectName;
import org.apache.activemq.ActiveMQConnectionFactory;
@@ -43,6 +44,7 @@ import org.apache.activemq.CombinationTe
import org.apache.activemq.broker.BrokerFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.jmx.BrokerViewMBean;
+import org.apache.activemq.broker.jmx.QueueViewMBean;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTextMessage;
import org.slf4j.Logger;
@@ -55,7 +57,6 @@ public class StompTest extends Combinati
protected String confUri =
"xbean:org/apache/activemq/transport/stomp/stomp-auth-broker.xml";
protected String jmsUri = "vm://localhost";
-
private BrokerService broker;
private StompConnection stompConnection = new StompConnection();
private Connection connection;
@@ -1398,6 +1399,8 @@ public class StompTest extends Combinati
stompConnection.ack(frame5, "tx3");
stompConnection.commit("tx3");
+ waitForFrameToTakeEffect();
+
stompDisconnect();
}
@@ -1464,7 +1467,6 @@ public class StompTest extends Combinati
TextMessage message = (TextMessage)consumer.receive(5000);
assertNotNull(message);
assertEquals("system",
message.getStringProperty(Stomp.Headers.Message.USERID));
-
}
public void testJMSXUserIDIsSetInStompMessage() throws Exception {
@@ -1493,10 +1495,8 @@ public class StompTest extends Combinati
headers.put(Stomp.Headers.Message.SUBSCRIPTION, "Thisisnotallowed");
headers.put(Stomp.Headers.Message.USERID, "Thisisnotallowed");
-
stompConnection.connect("system", "manager");
-
stompConnection.send("/queue/" + getQueueName(), "msg", null, headers);
stompConnection.subscribe("/queue/" + getQueueName());
@@ -1511,7 +1511,6 @@ public class StompTest extends Combinati
assertNull(mess_headers.get(Stomp.Headers.Message.REDELIVERED));
assertNull(mess_headers.get(Stomp.Headers.Message.SUBSCRIPTION));
assertEquals("system", mess_headers.get(Stomp.Headers.Message.USERID));
-
}
public void testExpire() throws Exception {
@@ -1559,30 +1558,28 @@ public class StompTest extends Combinati
assertNotNull(stompMessage);
assertNull(stompMessage.getHeaders().get(Stomp.Headers.Message.PERSISTENT));
}
-
+
public void testReceiptNewQueue() throws Exception {
-
+
String frame = "CONNECT\n" + "login: system\n" + "passcode:
manager\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
frame = stompConnection.receiveFrame();
assertTrue(frame.startsWith("CONNECTED"));
-
+
frame = "SUBSCRIBE\n" + "destination:/queue/" + getQueueName() + 1234
+ "\n" + "id:8fee4b8-4e5c9f66-4703-e936-3" + "\n" +
"receipt:8fee4b8-4e5c9f66-4703-e936-2" + "\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
-
+
StompFrame receipt = stompConnection.receive();
assertTrue(receipt.getAction().startsWith("RECEIPT"));
assertEquals("8fee4b8-4e5c9f66-4703-e936-2",
receipt.getHeaders().get("receipt-id"));
-
frame = "SEND\n destination:/queue/" + getQueueName() + 123 +
"\ncontent-length:0" + " \n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
-
frame = "SUBSCRIBE\n" + "destination:/queue/" + getQueueName() + 123 +
"\n" + "id:8fee4b8-4e5c9f66-4703-e936-2" + "\n" +
"receipt:8fee4b8-4e5c9f66-4703-e936-1" + "\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
-
+
receipt = stompConnection.receive();
assertTrue(receipt.getAction().startsWith("RECEIPT"));
assertEquals("8fee4b8-4e5c9f66-4703-e936-1",
receipt.getHeaders().get("receipt-id"));
@@ -1598,6 +1595,51 @@ public class StompTest extends Combinati
stompConnection.sendFrame(frame);
}
+ public void testTransactedClientAckBrokerStats() throws Exception {
+ String frame = "CONNECT\n" + "login: system\n" + "passcode:
manager\n\n" + Stomp.NULL;
+ stompConnection.sendFrame(frame);
+
+ frame = stompConnection.receiveFrame();
+ assertTrue(frame.startsWith("CONNECTED"));
+
+ sendMessage(getName());
+ sendMessage(getName());
+
+ stompConnection.begin("tx1");
+
+ frame = "SUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n"
+ "ack:client\n\n" + Stomp.NULL;
+ stompConnection.sendFrame(frame);
+
+ StompFrame message = stompConnection.receive();
+ assertTrue(message.getAction().equals("MESSAGE"));
+ stompConnection.ack(message, "tx1");
+
+ message = stompConnection.receive();
+ assertTrue(message.getAction().equals("MESSAGE"));
+ stompConnection.ack(message, "tx1");
+
+ stompConnection.commit("tx1");
+
+ frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
+ stompConnection.sendFrame(frame);
+
+ waitForFrameToTakeEffect();
+
+ QueueViewMBean queueView = getProxyToQueue(getQueueName());
+ assertEquals(2, queueView.getDispatchCount());
+ assertEquals(2, queueView.getDequeueCount());
+ assertEquals(0, queueView.getQueueSize());
+ }
+
+ private QueueViewMBean getProxyToQueue(String name) throws
MalformedObjectNameException, JMSException {
+ ObjectName queueViewMBeanName = new ObjectName("org.apache.activemq"
+ + ":Type=Queue,Destination=" + name
+ + ",BrokerName=localhost");
+ QueueViewMBean proxy = (QueueViewMBean) broker.getManagementContext()
+ .newProxyInstance(queueViewMBeanName, QueueViewMBean.class,
true);
+ return proxy;
+ }
+
protected void assertClients(int expected) throws Exception {
org.apache.activemq.broker.Connection[] clients =
broker.getBroker().getClients();
int actual = clients.length;