Below is the source for MessageConsumer.java
package ss.integration.util;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicConnection;
import javax.jms.Topic;
import javax.jms.TopicSession;
import javax.jms.TopicSubscriber;
import javax.jms.MessageListener;
import javax.jms.JMSException;
import ss.util.logging.Logger;
import ss.util.SystemException;
import ss.util.JNDILookup;
import ss.integration.util.ExListener;
public class MessageConsumer
{
private TopicConnectionFactory _topicConnectionFactory;
private TopicConnection _topicConnection;
private Topic _topic;
private TopicSession _session;
private TopicSubscriber _subscriber;
private String _strClientID;
private String _strTopicConnectionFactory;
private String _strTopic;
private String _strSelector;
private String _strContext;
private String _strURL;
private int _reconnectRetry;
private long _reconnectTime;
private boolean _firstTimeConnection;
// flag to indicated that a shutdown has been requested
private boolean _shutdownRequested;
private static ExListener _exListener;
private static MessageListener _msgListener;
public MessageConsumer(String clientID_,
String topicConnectionFactory_,
String topic_,
String selector_,
String context_,
String url_) throws SystemException
{
_strClientID = clientID_;
_strTopicConnectionFactory = topicConnectionFactory_;
_strTopic = topic_;
_strSelector = selector_;
_strContext = context_;
_strURL = url_;
_exListener = null;
_msgListener = null;
_reconnectRetry = 0; // reconnect is currently not used.
_reconnectTime = 1 * 1000;
_firstTimeConnection = true;
_shutdownRequested = false;
}
public void init() throws SystemException
{
try
{
Logger.trace("MessageConsumer, setting the topicConnection,
client: " + _strClientID);
_topicConnectionFactory =
(TopicConnectionFactory)JNDILookup.lookup(_strTopicConnectionFactory,
_strContext, _strURL);
_topicConnection =
_topicConnectionFactory.createTopicConnection();
_topicConnection.setClientID(_strClientID);
_topic = (Topic)JNDILookup.lookup(_strTopic, _strContext,
_strURL);
_session = _topicConnection.createTopicSession(true, 0);
_subscriber = _session.createDurableSubscriber(_topic,
_strClientID,
_strSelector,
true);
Logger.trace("_topic=" + _topic + " _strClientID=" + _strClientID +
" _strSelector = " + _strSelector);
Logger.trace("_subscriber=" + _subscriber);
if (_exListener == null)
{
_exListener = new ExListener(this);
}
}
catch (JMSException je)
{
Logger.trace("MessageConsumer init() failed: JMSException");
throw new SystemException(je);
}
catch (SystemException se)
{
Logger.trace("MessageConsumer init() failed: SystemException");
throw new SystemException(se);
}
catch (Exception e)
{
Logger.trace("MessageConsumer init() failed: Exception");
throw new SystemException(e);
}
}
public void start() throws SystemException
{
synchronized (this)
{
while (!_shutdownRequested)
{
try
{
startConnection();
Logger.trace("Wait()..." );
wait();
Logger.trace("Waiting thread awaken" );
}
catch (Exception e)
{
Logger.error("Error in MessageConsumer.start: " +
e.getMessage());
throw new SystemException(e);
}
finally
{
close();
}
}
}
}
private void signalShutdown()
{
Logger.trace("Shutting down...");
_shutdownRequested = true;
notifyWaitingThread();
}
// to signal a waiting thread (waiting inside start() method)
public synchronized void notifyWaitingThread()
{
unregisterListener();
Logger.trace("Notifying/signaling a waiting thread...");
notify();
}
public void setListener (MessageListener listener_)
{
_msgListener = listener_;
}
private void registerMsgListener() throws SystemException
{
if (_msgListener != null)
{
try
{
Logger.trace("Registering Message Listener...");
// install asynchronous listener
_subscriber.setMessageListener(_msgListener);
}
catch (JMSException je)
{
throw new SystemException(je);
}
catch (Exception e)
{
throw new SystemException(e);
}
}
else
{
Logger.warning("MessageListener is null");
}
}
private void unregisterListener()
{
try
{
Logger.trace("Unregistering Message Listener...");
_subscriber.setMessageListener(null);
}
catch (Exception e)
{
Logger.error(e);
}
}
private void registerExceptionListener() throws SystemException
{
if (_exListener != null)
{
try
{
Logger.trace("Registering JMS Exception Listener...");
// install asynchronous listener
_topicConnection.setExceptionListener(_exListener);
}
catch (JMSException je)
{
throw new SystemException(je);
}
catch (Exception e)
{
throw new SystemException(e);
}
}
else
{
Logger.warning("ExceptionListener is null");
}
}
public void doCommit () throws SystemException
{
try
{
_session.commit();
}
catch (JMSException je)
{
throw new SystemException(je);
}
catch (Exception e)
{
throw new SystemException(e);
}
}
public void doRollback () throws SystemException
{
try
{
Logger.trace("doRollback() Rolling-back message...");
_session.rollback(); // Be careful
when using this. Sometimes the rollback doesn't work.
Logger.trace("doRollback() Message rolled-back");
signalShutdown();
}
catch (JMSException je)
{
Logger.trace("doRollback() JMS Exception: " + je.getMessage());
throw new SystemException(je);
}
catch (Exception e)
{
Logger.trace("doRollback() Exception :" + e.getMessage());
throw new SystemException(e);
}
}
public void close() throws SystemException
{
try
{
if (_subscriber != null)
{
Logger.trace("closing subscriber...");
_subscriber.close();
_subscriber = null;
Logger.trace("subscriber closed");
}
if (_session != null)
{
Logger.trace("closing session...");
_session.close();
_session = null;
Logger.trace("session closed");
}
if (_topicConnection != null)
{
Logger.trace("closing topic connection...");
_topicConnection.close();
_topicConnection = null;
Logger.trace("topicConnection closed");
}
_topicConnectionFactory = null;
_topic = null;
}
catch (Exception e)
{
Logger.trace("Exception in close()");
}
}
private void startConnection() throws SystemException
{
boolean success = false;
for (int i=0; i<_reconnectRetry || _firstTimeConnection; i++)
{
_firstTimeConnection = false;
try
{
Logger.trace("Closing all connections... ");
close();
if (_exListener != null)
{
wait(_reconnectTime);
}
Logger.trace("Starting all connections... (" + (i+1) + "/" +
_reconnectRetry + ")");
init();
Logger.trace("Init successfull");
registerMsgListener();
Logger.trace("RegisterMsgListener successfull");
registerExceptionListener();
Logger.trace("RegisterExceptionListener successfull");
Logger.trace("Starting TopicConnection...");
_topicConnection.start();
Logger.trace("TopicConnection started...");
success = true;
break;
}
catch (Exception e)
{
Logger.error("startConnection() failed");
Logger.error(e);
}
}
if (!success)
{
close();
Logger.error("Unable to connect to the JMS Server");
throw new SystemException("Unable to connect to the JMS
Server");
}
}
}
James.Strachan wrote:
>
> The exception is being generated from the code in this package :
> ss.integration.util
>
> I've no idea what that is - got the source?
>
> On 22/01/2008, activemqnewbie <[EMAIL PROTECTED]> wrote:
>>
>> We are getting NullpointerException and the consumer shuts off.
>> Consumer after receiving certain number of messages consumer
>> automatically
>> closes throwing a null pointer exception.
>> Activemq 5
>> Oracle10g
>>
>> Below is the stack trace.
>>
>> <ERROR><2008/01/22 09:52:56.470><23459640><> ExceptionListener receives a
>> JMS exception
>>
>> <TRACE><2008/01/22 09:52:56.470><23459640><> Unregistering Message
>> Listener...
>>
>> <TRACE><2008/01/22 09:52:56.470><23459640><> Notifying/signaling a
>> waiting
>> thread...
>>
>> <TRACE><2008/01/22 09:52:56.470><20688146><> Waiting thread awaken
>>
>> <TRACE><2008/01/22 09:52:56.470><20688146><> closing subscriber...
>>
>> <TRACE><2008/01/22 09:52:56.470><20688146><> subscriber closed
>>
>> <TRACE><2008/01/22 09:52:56.470><20688146><> closing session...
>>
>> <TRACE><2008/01/22 09:52:56.470><20688146><> session closed
>>
>> <TRACE><2008/01/22 09:52:56.470><20688146><> closing topic connection...
>>
>> <TRACE><2008/01/22 09:52:56.470><20688146><> Exception in close()
>>
>> <TRACE><2008/01/22 09:52:56.470><20688146><> closing topic connection...
>>
>> <TRACE><2008/01/22 09:52:56.470><20688146><> Exception in close()
>>
>> <ERROR><2008/01/22 09:52:56.470><20688146><> Unable to connect to the JMS
>> Server
>>
>> <ERROR><2008/01/22 09:52:56.470><23459640><> ExceptionListener receives a
>> JMS exception
>>
>> <ERROR><2008/01/22 09:52:56.470><20688146><> Error in
>> MessageConsumer.start:
>> Unable to connect to the JMS Server
>>
>> <TRACE><2008/01/22 09:52:56.470><20688146><> closing topic connection...
>>
>> <TRACE><2008/01/22 09:52:56.470><20688146><> Exception in close()
>>
>> <TRACE><2008/01/22 09:52:56.470><23459640><> Unregistering Message
>> Listener...
>> <ERROR><2008/01/22 09:52:56.470><20688146><> Error in main: Unable to
>> connect to the JMS Server
>>
>>
>> <ERROR><2008/01/22 09:52:56.470><20688146><> StackTrace:
>>
>> <ERROR><2008/01/22 09:52:56.470><20688146><>
>> Error Code: -1
>> k12.util.SystemException: Unable to connect to the JMS Server
>> at
>> ss.integration.util.MessageConsumer.startConnection(MessageConsumer.java:368)
>> at
>> ss.integration.util.MessageConsumer.start(MessageConsumer.java:124)
>> at
>> ss.integration.agent.sams.SInputStarter.start(SInputStarter.java:113)
>> at
>> ss.integration.agent.sams.SInputStarter.main(SInputStarter.java:45)
>>
>> <ERROR><2008/01/22 09:52:56.470><23459640><>
>> java.lang.NullPointerException
>> at
>> ss.integration.util.MessageConsumer.unregisterListener(MessageConsumer.java:205)
>> at
>> ss.integration.util.MessageConsumer.notifyWaitingThread(MessageConsumer.java:159)
>> at ss.integration.util.ExListener.onException(ExListener.java:26)
>> at
>> org.apache.activemq.ActiveMQConnection$3.run(ActiveMQConnection.java:1648)
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:650)
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:675)
>> at java.lang.Thread.run(Thread.java:595)
>>
>>
>>
>> <TRACE><2008/01/22 09:52:56.470><23459640><> Notifying/signaling a
>> waiting
>> thread...
>>
>>
>>
>> Thanks for the help,
>> Vij
>>
>>
>>
>>
>> --
>> View this message in context:
>> http://www.nabble.com/Activemq.xml-configuration-settings-directory-tp14957812s2354p15020252.html
>> Sent from the ActiveMQ - User mailing list archive at Nabble.com.
>>
>>
>
>
> --
> James
> -------
> http://macstrac.blogspot.com/
>
> Open Source Integration
> http://open.iona.com
>
>
--
View this message in context:
http://www.nabble.com/Activemq.xml-configuration-settings-directory-tp14957812s2354p15023059.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.