Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2234#discussion_r128847982
  
    --- Diff: 
external/storm-jms/src/main/java/org/apache/storm/jms/spout/JmsSpout.java ---
    @@ -30,65 +35,110 @@
     import javax.jms.MessageListener;
     import javax.jms.Session;
     
    -import org.apache.storm.topology.base.BaseRichSpout;
    -import org.apache.storm.utils.Utils;
    -import org.slf4j.Logger;
    -import org.slf4j.LoggerFactory;
    -
    +import org.apache.storm.Config;
     import org.apache.storm.jms.JmsProvider;
     import org.apache.storm.jms.JmsTupleProducer;
     import org.apache.storm.spout.SpoutOutputCollector;
     import org.apache.storm.task.TopologyContext;
     import org.apache.storm.topology.OutputFieldsDeclarer;
    +import org.apache.storm.topology.base.BaseRichSpout;
     import org.apache.storm.tuple.Values;
    +import org.apache.storm.utils.Utils;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +
     
     /**
    - * A Storm <code>Spout</code> implementation that listens to a JMS topic 
or queue
    - * and outputs tuples based on the messages it receives.
    - * <p>
    - * <code>JmsSpout</code> instances rely on <code>JmsProducer</code> 
implementations
    - * to obtain the JMS <code>ConnectionFactory</code> and 
<code>Destination</code> objects
    - * necessary to connect to a JMS topic/queue.
    - * <p>
    - * When a <code>JmsSpout</code> receives a JMS message, it delegates to an
    - * internal <code>JmsTupleProducer</code> instance to create a Storm tuple 
from the
    - * incoming message.
    - * <p>
    - * Typically, developers will supply a custom 
<code>JmsTupleProducer</code> implementation
    - * appropriate for the expected message content.
    + * A Storm <code>Spout</code> implementation that listens to a JMS topic or
    + * queue and outputs tuples based on the messages it receives.
    + *
    + * <p><code>JmsSpout</code> instances rely on <code>JmsProducer</code>
    + * implementations to obtain the JMS
    + * <code>ConnectionFactory</code> and <code>Destination</code> objects 
necessary
    + * to connect to a JMS topic/queue.
    + *
    + * <p>When a <code>JmsSpout</code> receives a JMS message, it delegates to 
an
    + * internal <code>JmsTupleProducer</code> instance to create a Storm tuple 
from
    + * the incoming message.
    + *
    + * <p>Typically, developers will supply a custom 
<code>JmsTupleProducer</code>
    + * implementation appropriate for the expected message content.
      */
     @SuppressWarnings("serial")
     public class JmsSpout extends BaseRichSpout implements MessageListener {
    +
    +    /** The logger object instance for this class. */
         private static final Logger LOG = 
LoggerFactory.getLogger(JmsSpout.class);
     
    -    // JMS options
    +    /** The logger of the recovery task. */
    +    private static final Logger RECOVERY_TASK_LOG =
    +            LoggerFactory.getLogger(RecoveryTask.class);
    +
    +    /** Time to sleep between queue polling attempts. */
    +    private static final int POLL_INTERVAL = 50;
    +
    +    /**
    +     * The default value the {@link Config#TOPOLOGY_MESSAGE_TIMEOUT_SECS}.
    +     */
    +    private static final int DEFAULT_MESSAGE_TIMEOUT = 30;
    +
    +    /** Time to wait before queuing the first recovery task (in seconds). 
*/
    +    private static final int RECOVERY_DELAY = 10;
    +
    +    /**
    +     * The acknowledgment mode used for this instance.
    +     *
    +     * @see Session
    +     */
         private int jmsAcknowledgeMode = Session.AUTO_ACKNOWLEDGE;
     
    +    /** Indicates whether or not this spout should run as a singleton. */
         private boolean distributed = true;
     
    +    /** Used to generate tuples from incoming messages. */
         private JmsTupleProducer tupleProducer;
     
    +    /** Encapsulates jms related classes needed to communicate with the 
mq. */
         private JmsProvider jmsProvider;
     
    +    /** Stores incoming messages for later sending. */
         private LinkedBlockingQueue<Message> queue;
    +
    +    /** Contains all message ids of messages that were not yet acked. */
         private TreeSet<JmsMessageID> toCommit;
    +
    +    /** Maps between message ids of not-yet acked messages, and the 
messages. */
         private HashMap<JmsMessageID, Message> pendingMessages;
    +
    +    /** Counter of handled messages. */
         private long messageSequence = 0;
     
    +    /** The collector used to emit tuples. */
         private SpoutOutputCollector collector;
     
    +    /** Connection to the jms queue. */
         private transient Connection connection;
    +
    +    /** The active jms session. */
         private transient Session session;
     
    +    /** Indicates whether or not a message failed to be processed. */
         private boolean hasFailures = false;
    -    public final Serializable recoveryMutex = "RECOVERY_MUTEX";
    +
    +    /** Used to safely recover failed JMS sessions across instances. */
    +    private final Serializable recoveryMutex = "RECOVERY_MUTEX";
    +
    +    /** Schedules recovery tasks periodically. */
         private Timer recoveryTimer = null;
    +
    +    /** To to wait between recovery attempts. */
    --- End diff --
    
    Nit: Time to wait. Also could you rename recoveryPeriod to recoveryPeriodMs?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to