Github user arunmahadevan commented on a diff in the pull request:
https://github.com/apache/storm/pull/2639#discussion_r187496258
--- Diff:
external/storm-jms/src/main/java/org/apache/storm/jms/spout/JmsSpout.java ---
@@ -262,42 +189,26 @@ public void onMessage(Message msg) {
* topic/queue.
*/
@Override
- public void open(Map<String, Object> conf,
- TopologyContext context,
- SpoutOutputCollector collector) {
+ public void open(final Map<String, Object> conf,
+ final TopologyContext context,
+ final SpoutOutputCollector spoutOutputCollector) {
- if (this.jmsProvider == null) {
- throw new IllegalStateException("JMS provider has not been
set.");
- }
- if (this.tupleProducer == null) {
- throw new IllegalStateException("JMS Tuple Producer has not
been set.");
+ if (jmsProvider == null) {
+ throw new IllegalStateException(
+ "JMS provider has not been set.");
}
- // TODO get the default value from storm instead of hard coding 30
secs
- Long topologyTimeout =
- ((Number)
conf.getOrDefault(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS,
DEFAULT_MESSAGE_TIMEOUT_SECS)).longValue();
- if ((TimeUnit.SECONDS.toMillis(topologyTimeout)) >
this.recoveryPeriodMs) {
- LOG.warn("*** WARNING *** : "
- + "Recovery period (" + this.recoveryPeriodMs + "
ms.) is less then the configured "
- + "'topology.message.timeout.secs' of " +
topologyTimeout
- + " secs. This could lead to a message replay
flood!");
+ if (tupleProducer == null) {
+ throw new IllegalStateException(
+ "JMS Tuple Producer has not been set.");
}
- this.queue = new LinkedBlockingQueue<Message>();
- this.toCommit = new TreeSet<JmsMessageID>();
- this.pendingMessages = new HashMap<JmsMessageID, Message>();
- this.collector = collector;
+ collector = spoutOutputCollector;
try {
- ConnectionFactory cf = this.jmsProvider.connectionFactory();
- Destination dest = this.jmsProvider.destination();
- this.connection = cf.createConnection();
- this.session = connection.createSession(false,
this.jmsAcknowledgeMode);
- MessageConsumer consumer = session.createConsumer(dest);
- consumer.setMessageListener(this);
- this.connection.start();
- if (this.isDurableSubscription() && this.recoveryPeriodMs > 0)
{
- this.recoveryTimer = new Timer();
- this.recoveryTimer.scheduleAtFixedRate(new RecoveryTask(),
RECOVERY_DELAY_MS, this.recoveryPeriodMs);
- }
-
+ ConnectionFactory cf = jmsProvider.connectionFactory();
+ Destination dest = jmsProvider.destination();
+ connection = cf.createConnection();
+ session = messageHandler.createSession(connection);
--- End diff --
We can throw an exception if `setIndividualAck` is invoked and the ACK mode
is still the standard ones.
---