Github user arunmahadevan commented on a diff in the pull request:
https://github.com/apache/storm/pull/2639#discussion_r183531789
--- Diff:
external/storm-jms/src/main/java/org/apache/storm/jms/spout/JmsSpout.java ---
@@ -339,26 +339,26 @@ public void nextTuple() {
*/
@Override
public void ack(Object msgId) {
-
Message msg = this.pendingMessages.remove(msgId);
- JmsMessageID oldest = this.toCommit.first();
- if (msgId.equals(oldest)) {
- if (msg != null) {
- try {
- LOG.debug("Committing...");
- msg.acknowledge();
- LOG.debug("JMS Message acked: " + msgId);
- this.toCommit.remove(msgId);
- } catch (JMSException e) {
- LOG.warn("Error acknowldging JMS message: " + msgId,
e);
+ if (!toCommit.isEmpty()) {
+ JmsMessageID oldest = this.toCommit.first();
+ if (msgId.equals(oldest)) {
+ if (msg != null) {
+ try {
+ LOG.debug("Committing...");
+ msg.acknowledge();
--- End diff --
This piece of code was already there. I am guessing its based on the JMS
acknowledgement mode.
See - https://docs.oracle.com/cd/E19798-01/821-1841/bncfw/index.html
In `Session.CLIENT_ACKNOWLEDGE` - Acknowledging a consumed message
automatically acknowledges the receipt of all messages that have been consumed
by its session, so this logic seems fine.
The spout is ignoring Auto acknowledgement mode, but I am not sure about
the other modes like `DUPS_OK_ACKNOWLEDGE` or `SESSION_TRANSACTED` work. cc
@ptgoetz who might have more context around this.
---