Author: jstrachan
Date: Thu May 24 03:18:53 2007
New Revision: 541256
URL: http://svn.apache.org/viewvc?view=rev&rev=541256
Log:
added fix for AMQ-1253 to log a warning if users forget to start the
connection within a small timeout period (500ms by default)
Added:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/CreateConsumerButDontStartConnectionWarningTest.java
(with props)
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSessionExecutor.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/test/JmsTopicSendReceiveTest.java
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java?view=diff&rev=541256&r1=541255&r2=541256
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
Thu May 24 03:18:53 2007
@@ -140,7 +140,9 @@
private int closeTimeout = 15000;
private boolean useSyncSend=false;
private boolean watchTopicAdvisories=true;
-
+ private long warnAboutUnstartedConnectionTimeout = 500L;
+
+
private final Transport transport;
private final IdGenerator clientIdGenerator;
private final JMSStatsImpl factoryStats;
@@ -176,6 +178,7 @@
// Assume that protocol is the latest. Change to the actual protocol
// version when a WireFormatInfo is received.
private AtomicInteger protocolVersion=new
AtomicInteger(CommandTypes.PROTOCOL_VERSION);
+ private long timeCreated;
/**
* Construct an <code>ActiveMQConnection</code>
@@ -207,6 +210,7 @@
this.stats = new JMSConnectionStatsImpl(sessions, this instanceof
XAConnection);
this.factoryStats.addConnection(this);
+ this.timeCreated = System.currentTimeMillis();
}
@@ -1505,6 +1509,28 @@
this.optimizeAcknowledge=optimizeAcknowledge;
}
+ public long getWarnAboutUnstartedConnectionTimeout() {
+ return warnAboutUnstartedConnectionTimeout;
+ }
+
+ /**
+ * Enables the timemout from a session creation to when a warning is
generated
+ * if the connection is not properly started via [EMAIL PROTECTED]
#start()}. It is a very
+ * common gotcha to forget to
+ * <a
href="http://activemq.apache.org/i-am-not-receiving-any-messages-what-is-wrong.html">start
the connection</a>
+ * so this option makes the default case to create a warning if the user
forgets.
+ * To disable the warning just set the value to < 0 (say -1).
+ */
+ public void setWarnAboutUnstartedConnectionTimeout(long
warnAboutUnstartedConnectionTimeout) {
+ this.warnAboutUnstartedConnectionTimeout =
warnAboutUnstartedConnectionTimeout;
+ }
+
+ /**
+ * Returns the time this connection was created
+ */
+ public long getTimeCreated() {
+ return timeCreated;
+ }
private void waitForBrokerInfo() throws JMSException {
try {
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSessionExecutor.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSessionExecutor.java?view=diff&rev=541256&r1=541255&r2=541256
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSessionExecutor.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSessionExecutor.java
Thu May 24 03:18:53 2007
@@ -28,6 +28,8 @@
import org.apache.activemq.thread.Task;
import org.apache.activemq.thread.TaskRunner;
import org.apache.activemq.util.JMSExceptionSupport;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
/**
* A utility class used by the Session for dispatching messages asynchronously
to consumers
@@ -36,11 +38,14 @@
* @see javax.jms.Session
*/
public class ActiveMQSessionExecutor implements Task {
-
+ private static final transient Log log =
LogFactory.getLog(ActiveMQSessionExecutor.class);
+
private ActiveMQSession session;
private MessageDispatchChannel messageQueue = new MessageDispatchChannel();
private boolean dispatchedBySessionPool;
private TaskRunner taskRunner;
+ private boolean startedOrWarnedThatNotStarted;
+ private long warnAboutUnstartedConnectionTime = 500L;
ActiveMQSessionExecutor(ActiveMQSession session) {
this.session = session;
@@ -53,6 +58,24 @@
void execute(MessageDispatch message) throws InterruptedException {
+ if (!startedOrWarnedThatNotStarted) {
+
+ ActiveMQConnection connection = session.connection;
+ long aboutUnstartedConnectionTimeout =
connection.getWarnAboutUnstartedConnectionTimeout();
+ if (connection.isStarted() || aboutUnstartedConnectionTimeout <
0L) {
+ startedOrWarnedThatNotStarted = true;
+ }
+ else {
+ long elapsedTime = System.currentTimeMillis() -
connection.getTimeCreated();
+
+ // lets only warn when a significant amount of time has passed
just in case its normal operation
+ if (elapsedTime > aboutUnstartedConnectionTimeout) {
+ log.warn("Received a message on a connection which is not
yet started. Have you forgotten to call Connection.start()? Connection: " +
connection + " Received: " + message);
+ startedOrWarnedThatNotStarted = true;
+ }
+ }
+ }
+
if (!session.isSessionAsyncDispatch() && !dispatchedBySessionPool){
dispatch(message);
}else {
Added:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/CreateConsumerButDontStartConnectionWarningTest.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/CreateConsumerButDontStartConnectionWarningTest.java?view=auto&rev=541256
==============================================================================
---
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/CreateConsumerButDontStartConnectionWarningTest.java
(added)
+++
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/CreateConsumerButDontStartConnectionWarningTest.java
Thu May 24 03:18:53 2007
@@ -0,0 +1,45 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import javax.jms.JMSException;
+
+/**
+ * @version $Revision: 1.1 $
+ */
+public class CreateConsumerButDontStartConnectionWarningTest extends
JmsQueueSendReceiveTest {
+ private static final transient Log log =
LogFactory.getLog(CreateConsumerButDontStartConnectionWarningTest.class);
+
+ @Override
+ protected void startConnection() throws JMSException {
+ // don't start the connection
+ }
+
+ @Override
+ protected void assertMessagesAreReceived() throws JMSException {
+ try {
+ Thread.sleep(1000);
+ }
+ catch (InterruptedException e) {
+ log.warn("Caught: " + e, e);
+ }
+ }
+}
Propchange:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/CreateConsumerButDontStartConnectionWarningTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/test/JmsTopicSendReceiveTest.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/test/JmsTopicSendReceiveTest.java?view=diff&rev=541256&r1=541255&r2=541256
==============================================================================
---
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/test/JmsTopicSendReceiveTest.java
(original)
+++
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/test/JmsTopicSendReceiveTest.java
Thu May 24 03:18:53 2007
@@ -68,9 +68,13 @@
log.info("Created producer destination: " + producerDestination + "
of type: " + producerDestination.getClass());
consumer = createConsumer();
consumer.setMessageListener(this);
- connection.start();
+ startConnection();
log.info("Created connection: " + connection);
+ }
+
+ protected void startConnection() throws JMSException {
+ connection.start();
}
protected void tearDown() throws Exception {