Author: tabish
Date: Sun Aug 17 11:57:34 2008
New Revision: 686632
URL: http://svn.apache.org/viewvc?rev=686632&view=rev
Log:
https://issues.apache.org/activemq/browse/AMQCPP-189
Adding timed and un-timed requests to the Response correlator code via the
Future response object, refactored FutureReponse to use a Latch to wait for the
response.
This fixes the first part of Adding Producer Flow control, clients will now
block waiting for the broker when using sync style sends. The async transport
will keep running right now consuming all client memory if allowed to run long
enough.
Modified:
activemq/activemq-cpp/trunk/src/examples/consumers/SimpleAsyncConsumer.cpp
activemq/activemq-cpp/trunk/src/examples/producers/SimpleProducer.cpp
activemq/activemq-cpp/trunk/src/main/activemq/connector/ProducerInfo.h
activemq/activemq-cpp/trunk/src/main/activemq/connector/openwire/OpenWireConnector.cpp
activemq/activemq-cpp/trunk/src/main/activemq/connector/openwire/OpenWireConnector.h
activemq/activemq-cpp/trunk/src/main/activemq/connector/openwire/OpenWireFormatNegotiator.cpp
activemq/activemq-cpp/trunk/src/main/activemq/connector/openwire/OpenWireFormatNegotiator.h
activemq/activemq-cpp/trunk/src/main/activemq/connector/openwire/OpenWireProducerInfo.h
activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/StompConnectionNegotiator.cpp
activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/StompConnectionNegotiator.h
activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/StompConnector.cpp
activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/StompConnector.h
activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/StompProducerInfo.h
activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConstants.cpp
activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConstants.h
activemq/activemq-cpp/trunk/src/main/activemq/transport/IOTransport.cpp
activemq/activemq-cpp/trunk/src/main/activemq/transport/IOTransport.h
activemq/activemq-cpp/trunk/src/main/activemq/transport/MockTransport.cpp
activemq/activemq-cpp/trunk/src/main/activemq/transport/MockTransport.h
activemq/activemq-cpp/trunk/src/main/activemq/transport/Transport.h
activemq/activemq-cpp/trunk/src/main/activemq/transport/TransportFilter.h
activemq/activemq-cpp/trunk/src/main/activemq/transport/filters/LoggingTransport.cpp
activemq/activemq-cpp/trunk/src/main/activemq/transport/filters/LoggingTransport.h
activemq/activemq-cpp/trunk/src/main/activemq/transport/filters/ResponseCorrelator.cpp
activemq/activemq-cpp/trunk/src/main/activemq/transport/filters/ResponseCorrelator.h
activemq/activemq-cpp/trunk/src/main/activemq/transport/filters/ResponseCorrelatorFactory.cpp
activemq/activemq-cpp/trunk/src/test/activemq/transport/filters/ResponseCorrelatorTest.cpp
activemq/activemq-cpp/trunk/src/test/activemq/transport/filters/ResponseCorrelatorTest.h
Modified:
activemq/activemq-cpp/trunk/src/examples/consumers/SimpleAsyncConsumer.cpp
URL:
http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/examples/consumers/SimpleAsyncConsumer.cpp?rev=686632&r1=686631&r2=686632&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/examples/consumers/SimpleAsyncConsumer.cpp
(original)
+++ activemq/activemq-cpp/trunk/src/examples/consumers/SimpleAsyncConsumer.cpp
Sun Aug 17 11:57:34 2008
@@ -211,11 +211,11 @@
//
std::string brokerURI =
"tcp://127.0.0.1:61616"
- "?wireFormat=openwire"
- "&transport.useAsyncSend=true"
+ "?wireFormat=openwire";
+// "&transport.useAsyncSend=true"
// "&transport.commandTracingEnabled=true"
// "&transport.tcpTracingEnabled=true";
- "&wireFormat.tightEncodingEnabled=true";
+// "&wireFormat.tightEncodingEnabled=true";
//============================================================
// This is the Destination Name and URI options. Use this to
Modified: activemq/activemq-cpp/trunk/src/examples/producers/SimpleProducer.cpp
URL:
http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/examples/producers/SimpleProducer.cpp?rev=686632&r1=686631&r2=686632&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/examples/producers/SimpleProducer.cpp
(original)
+++ activemq/activemq-cpp/trunk/src/examples/producers/SimpleProducer.cpp Sun
Aug 17 11:57:34 2008
@@ -190,15 +190,16 @@
std::string brokerURI =
"tcp://127.0.0.1:61616"
"?wireFormat=openwire"
- "&transport.useAsyncSend=true"
+ "&soKeepAlive=true";
+// "&transport.useAsyncSend=true"
// "&transport.commandTracingEnabled=true"
// "&transport.tcpTracingEnabled=true";
- "&wireFormat.tightEncodingEnabled=true";
+// "&wireFormat.tightEncodingEnabled=true";
//============================================================
// Total number of messages for this producer to send.
//============================================================
- unsigned int numMessages = 2000;
+ unsigned int numMessages = 50000;
//============================================================
// This is the Destination Name and URI options. Use this to
Modified: activemq/activemq-cpp/trunk/src/main/activemq/connector/ProducerInfo.h
URL:
http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/connector/ProducerInfo.h?rev=686632&r1=686631&r2=686632&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/connector/ProducerInfo.h
(original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/connector/ProducerInfo.h Sun
Aug 17 11:57:34 2008
@@ -87,6 +87,21 @@
*/
virtual bool isDisableMessageId() const = 0;
+ /**
+ * Gets the set send timeout for messages from this producer, a value
of
+ * zero indicates that the Producer should wait forever for a response
from
+ * the broker on the send.
+ * @return default time to wait for broker response to a message being
sent.
+ */
+ virtual unsigned int getSendTimeout() const = 0;
+
+ /**
+ * Sets the time to wait for the broker to respond to a message being
sent
+ * @param timeout - The time to wait for the broker to acknowledge
that a
+ * message has been received.
+ */
+ virtual void setSendTimeout( unsigned int timeout ) = 0;
+
};
}}
Modified:
activemq/activemq-cpp/trunk/src/main/activemq/connector/openwire/OpenWireConnector.cpp
URL:
http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/connector/openwire/OpenWireConnector.cpp?rev=686632&r1=686631&r2=686632&view=diff
==============================================================================
---
activemq/activemq-cpp/trunk/src/main/activemq/connector/openwire/OpenWireConnector.cpp
(original)
+++
activemq/activemq-cpp/trunk/src/main/activemq/connector/openwire/OpenWireConnector.cpp
Sun Aug 17 11:57:34 2008
@@ -655,12 +655,14 @@
producer = new OpenWireProducerInfo( this );
producer->setSessionInfo( session );
+ producer->setSendTimeout( this->getSendTimeout() );
producerInfo = new commands::ProducerInfo();
producer->setProducerInfo( producerInfo );
commands::ProducerId* producerId = new commands::ProducerId();
producerInfo->setProducerId( producerId );
+ producerInfo->setWindowSize( this->getProducerWindowSize( ));
producerId->setConnectionId( session->getConnectionId() );
producerId->setSessionId( session->getSessionId() );
@@ -853,7 +855,7 @@
}
// Send the message to the broker.
- Response* response = syncRequest( amqMessage );
+ Response* response = syncRequest( amqMessage,
producerInfo->getSendTimeout() );
// The broker did not return an error - this is good.
// Just discard the response.
@@ -1013,6 +1015,7 @@
transaction->setTransactionInfo( info );
return transaction;
+
} catch( ConnectorException& ex ){
try{ transport->close(); } catch( ... ){}
ex.setMark(__FILE__,__LINE__);
@@ -1054,6 +1057,7 @@
info->setType( (int)TRANSACTION_STATE_COMMITONEPHASE );
oneway( info );
+
} catch( ConnectorException& ex ){
try{ transport->close(); } catch( ... ){}
ex.setMark(__FILE__,__LINE__);
@@ -1095,6 +1099,7 @@
info->setType( (int)TRANSACTION_STATE_ROLLBACK );
oneway( info );
+
} catch( ConnectorException& ex ){
try{ transport->close(); } catch( ... ){}
ex.setMark(__FILE__,__LINE__);
@@ -1183,7 +1188,7 @@
rsi->setClientId( connectionInfo.getClientId() );
// Send the message to the broker.
- Response* response = syncRequest(rsi);
+ Response* response = syncRequest( rsi );
// The broker did not return an error - this is good.
// Just discard the response.
@@ -1225,7 +1230,6 @@
consumer->getConsumerInfo()->getDestination()->cloneDataStructure() );
messagePull.setTimeout( timeout );
- // TODO - This should be Async
this->oneway( &messagePull );
}
}
@@ -1422,12 +1426,18 @@
}
////////////////////////////////////////////////////////////////////////////////
-Response* OpenWireConnector::syncRequest( Command* command )
+Response* OpenWireConnector::syncRequest( Command* command, unsigned int
timeout )
throw ( ConnectorException ) {
try {
- Response* response = transport->request( command );
+ Response* response = NULL;
+
+ if( timeout == 0 ) {
+ response = transport->request( command );
+ } else {
+ response = transport->request( command, timeout );
+ }
commands::ExceptionResponse* exceptionResponse =
dynamic_cast<commands::ExceptionResponse*>( response );
Modified:
activemq/activemq-cpp/trunk/src/main/activemq/connector/openwire/OpenWireConnector.h
URL:
http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/connector/openwire/OpenWireConnector.h?rev=686632&r1=686631&r2=686632&view=diff
==============================================================================
---
activemq/activemq-cpp/trunk/src/main/activemq/connector/openwire/OpenWireConnector.h
(original)
+++
activemq/activemq-cpp/trunk/src/main/activemq/connector/openwire/OpenWireConnector.h
Sun Aug 17 11:57:34 2008
@@ -34,6 +34,7 @@
#include <cms/TextMessage.h>
#include <cms/MapMessage.h>
+#include <decaf/lang/Integer.h>
#include <decaf/lang/exceptions/InvalidStateException.h>
#include <decaf/lang/exceptions/IllegalArgumentException.h>
@@ -370,7 +371,7 @@
*/
virtual ProducerInfo* createProducer(
const cms::Destination* destination,
- SessionInfo* session)
+ SessionInfo* session )
throw ( ConnectorException );
/**
@@ -605,6 +606,24 @@
private:
+ // Gets the configured producer window size to use when creating new
+ // producers to control how much memory is used.
+ virtual unsigned int getProducerWindowSize() const {
+ return decaf::lang::Integer::parseInt(
+ properties.getProperty(
+ core::ActiveMQConstants::toString(
+ core::ActiveMQConstants::PARAM_PRODUCERWINDOWSIZE ),
"0" ) );
+ }
+
+ // Gets the time to wait for a producer send to complete, meaning the
time to
+ // wait for a response. Zero indicates to wait forever.
+ virtual unsigned int getSendTimeout() const {
+ return decaf::lang::Integer::parseInt(
+ properties.getProperty(
+ core::ActiveMQConstants::toString(
+ core::ActiveMQConstants::PARAM_SENDTIMEOUT ), "0" ) );
+ }
+
// Check for Connected State and Throw an exception if not.
void enforceConnected() throw ( ConnectorException );
@@ -620,11 +639,12 @@
* Sends a synchronous request and returns the response from the
broker.
* Converts any error responses into an exception.
* @param command The request command.
+ * @param timeout The time to wait for a response, default is zero or
infinite.
* @returns The response sent from the broker.
* @throws ConnectorException thrown if an error response was received
* from the broker, or if any other error occurred.
*/
- transport::Response* syncRequest( transport::Command* command )
+ transport::Response* syncRequest( transport::Command* command,
unsigned int timeout = 0 )
throw (ConnectorException);
/**
Modified:
activemq/activemq-cpp/trunk/src/main/activemq/connector/openwire/OpenWireFormatNegotiator.cpp
URL:
http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/connector/openwire/OpenWireFormatNegotiator.cpp?rev=686632&r1=686631&r2=686632&view=diff
==============================================================================
---
activemq/activemq-cpp/trunk/src/main/activemq/connector/openwire/OpenWireFormatNegotiator.cpp
(original)
+++
activemq/activemq-cpp/trunk/src/main/activemq/connector/openwire/OpenWireFormatNegotiator.cpp
Sun Aug 17 11:57:34 2008
@@ -109,6 +109,35 @@
}
////////////////////////////////////////////////////////////////////////////////
+Response* OpenWireFormatNegotiator::request( Command* command, unsigned int
timeout )
+ throw( CommandIOException, UnsupportedOperationException ) {
+
+ try{
+
+ if( closed || next == NULL ){
+ throw CommandIOException(
+ __FILE__, __LINE__,
+ "OpenWireFormatNegotiator::request - transport already closed"
);
+ }
+
+ if( !readyCountDownLatch.await( negotiationTimeout ) ) {
+ throw CommandIOException(
+ __FILE__,
+ __LINE__,
+ "OpenWireFormatNegotiator::request"
+ "Wire format negotiation timeout: peer did not "
+ "send his wire format." );
+ }
+
+ return next->request( command, timeout );
+ }
+ AMQ_CATCH_RETHROW( UnsupportedOperationException )
+ AMQ_CATCH_RETHROW( CommandIOException )
+ AMQ_CATCH_EXCEPTION_CONVERT( exceptions::ActiveMQException,
CommandIOException )
+ AMQ_CATCHALL_THROW( CommandIOException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
void OpenWireFormatNegotiator::onCommand( Command* command ) {
DataStructure* dataStructure =
Modified:
activemq/activemq-cpp/trunk/src/main/activemq/connector/openwire/OpenWireFormatNegotiator.h
URL:
http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/connector/openwire/OpenWireFormatNegotiator.h?rev=686632&r1=686631&r2=686632&view=diff
==============================================================================
---
activemq/activemq-cpp/trunk/src/main/activemq/connector/openwire/OpenWireFormatNegotiator.h
(original)
+++
activemq/activemq-cpp/trunk/src/main/activemq/connector/openwire/OpenWireFormatNegotiator.h
Sun Aug 17 11:57:34 2008
@@ -101,6 +101,19 @@
decaf::lang::exceptions::UnsupportedOperationException );
/**
+ * Sends the given request to the server and waits for the response.
+ * First waits for the WireFormatInfo exchange to happen so that we
+ * know how to encode outbound data.
+ * @param command The request to send.
+ * @param timeout The time to wait for the response.
+ * @return the response from the server.
+ * @throws CommandIOException if an error occurs with the request.
+ */
+ virtual transport::Response* request( transport::Command* command,
unsigned int timeout )
+ throw( transport::CommandIOException,
+ decaf::lang::exceptions::UnsupportedOperationException );
+
+ /**
* This is called in the context of the nested transport's
* reading thread. In the case of a response object,
* updates the request map and notifies those waiting on the
Modified:
activemq/activemq-cpp/trunk/src/main/activemq/connector/openwire/OpenWireProducerInfo.h
URL:
http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/connector/openwire/OpenWireProducerInfo.h?rev=686632&r1=686631&r2=686632&view=diff
==============================================================================
---
activemq/activemq-cpp/trunk/src/main/activemq/connector/openwire/OpenWireProducerInfo.h
(original)
+++
activemq/activemq-cpp/trunk/src/main/activemq/connector/openwire/OpenWireProducerInfo.h
Sun Aug 17 11:57:34 2008
@@ -39,6 +39,9 @@
// Session that this producer is attached to - we do not own this
const SessionInfo* session;
+ // Send timeout, how long to wait for a response before failing.
+ unsigned int sendTimeout;
+
public:
OpenWireProducerInfo( Connector* connector ) :
@@ -47,11 +50,12 @@
this->disableMessageIds = false;
this->producerInfo = NULL;
this->session = NULL;
+ this->sendTimeout = 0;
}
- virtual ~OpenWireProducerInfo() {
+ virtual ~OpenWireProducerInfo() {
this->close();
- delete producerInfo;
+ delete producerInfo;
}
/**
@@ -158,6 +162,25 @@
return this->disableMessageIds;
}
+ /**
+ * Gets the set send timeout for messages from this producer, a value
of
+ * zero indicates that the Producer should wait forever for a response
from
+ * the broker on the send.
+ * @return default time to wait for broker response to a message being
sent.
+ */
+ virtual unsigned int getSendTimeout() const {
+ return this->sendTimeout;
+ }
+
+ /**
+ * Sets the time to wait for the broker to respond to a message being
sent
+ * @param timeout - The time to wait for the broker to acknowledge
that a
+ * message has been received.
+ */
+ virtual void setSendTimeout( unsigned int timeout ) {
+ this->sendTimeout = timeout;
+ }
+
};
}}}
Modified:
activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/StompConnectionNegotiator.cpp
URL:
http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/StompConnectionNegotiator.cpp?rev=686632&r1=686631&r2=686632&view=diff
==============================================================================
---
activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/StompConnectionNegotiator.cpp
(original)
+++
activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/StompConnectionNegotiator.cpp
Sun Aug 17 11:57:34 2008
@@ -127,6 +127,57 @@
}
////////////////////////////////////////////////////////////////////////////////
+Response* StompConnectionNegotiator::request( Command* command, unsigned int
timeout )
+ throw( CommandIOException,
decaf::lang::exceptions::UnsupportedOperationException ) {
+
+ try{
+
+ if( closed || next == NULL ){
+ throw CommandIOException(
+ __FILE__, __LINE__,
+ "StompConnectionNegotiator::request - transport already
closed" );
+ }
+
+ // Once connected we just pass through all requests.
+ if( connected ) {
+ return next->request( command, timeout );
+ } else {
+
+ ConnectCommand* connect = dynamic_cast<ConnectCommand*>( command );
+
+ if( connect == NULL ) {
+ throw CommandIOException(
+ __FILE__,
+ __LINE__,
+ "StompConnectionNegotiator::request"
+ "Invalid Command Received: only a connect command "
+ "can be sent before connected." );
+ }
+
+ // Send the connect request
+ next->oneway( command );
+
+ if( !readyCountDownLatch.await( negotiationTimeout ) ) {
+ throw CommandIOException(
+ __FILE__,
+ __LINE__,
+ "StompConnectionNegotiator::request"
+ "Connection Negotiate timeout: peer did not "
+ "send a connected command." );
+ }
+
+ // return the connected command
+ return connectedCmd;
+ }
+ }
+ AMQ_CATCH_RETHROW( UnsupportedOperationException )
+ AMQ_CATCH_RETHROW( CommandIOException )
+ AMQ_CATCH_EXCEPTION_CONVERT( ActiveMQException, CommandIOException )
+ AMQ_CATCH_EXCEPTION_CONVERT( Exception, CommandIOException )
+ AMQ_CATCHALL_THROW( CommandIOException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
void StompConnectionNegotiator::onCommand( Command* command ) {
ConnectedCommand* response =
Modified:
activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/StompConnectionNegotiator.h
URL:
http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/StompConnectionNegotiator.h?rev=686632&r1=686631&r2=686632&view=diff
==============================================================================
---
activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/StompConnectionNegotiator.h
(original)
+++
activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/StompConnectionNegotiator.h
Sun Aug 17 11:57:34 2008
@@ -177,6 +177,19 @@
decaf::lang::exceptions::UnsupportedOperationException );
/**
+ * Sends the given request to the server and waits for the response.
+ * First waits for the WireFormatInfo exchange to happen so that we
+ * know how to encode outbound data.
+ * @param command The request to send.
+ * @param timeout The time to wait for a response.
+ * @return the response from the server.
+ * @throws CommandIOException if an error occurs with the request.
+ */
+ virtual transport::Response* request( transport::Command* command,
unsigned int timeout )
+ throw( transport::CommandIOException,
+ decaf::lang::exceptions::UnsupportedOperationException );
+
+ /**
* This is called in the context of the nested transport's
* reading thread. In the case of a response object,
* updates the request map and notifies those waiting on the
Modified:
activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/StompConnector.cpp
URL:
http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/StompConnector.cpp?rev=686632&r1=686631&r2=686632&view=diff
==============================================================================
---
activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/StompConnector.cpp
(original)
+++
activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/StompConnector.cpp
Sun Aug 17 11:57:34 2008
@@ -345,6 +345,7 @@
producer->setDestination( destination );
producer->setProducerId( producerIds.getNextSequenceId() );
producer->setSessionInfo( session );
+ producer->setSendTimeout( 0 );
return producer;
}
Modified:
activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/StompConnector.h
URL:
http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/StompConnector.h?rev=686632&r1=686631&r2=686632&view=diff
==============================================================================
---
activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/StompConnector.h
(original)
+++
activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/StompConnector.h
Sun Aug 17 11:57:34 2008
@@ -283,6 +283,7 @@
* Create a Consumer for the given Session
* @param destination Destination to Subscribe to.
* @param session Session Information.
+ * @param sendTimeout The time to wait for an Ack from the Broker,
zero = forever.
* @return Producer Information
* @throws ConnectorException
*/
@@ -363,7 +364,7 @@
virtual void acknowledge( const SessionInfo* session,
const ConsumerInfo* consumer,
const cms::Message* message,
- AckType ackType = ACK_TYPE_CONSUMED)
+ AckType ackType = ACK_TYPE_CONSUMED )
throw ( ConnectorException );
/**
Modified:
activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/StompProducerInfo.h
URL:
http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/StompProducerInfo.h?rev=686632&r1=686631&r2=686632&view=diff
==============================================================================
---
activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/StompProducerInfo.h
(original)
+++
activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/StompProducerInfo.h
Sun Aug 17 11:57:34 2008
@@ -41,6 +41,9 @@
// Session that this producer is attached to - we do not own this
const SessionInfo* session;
+ // Send timeout, how long to wait for a response before failing.
+ unsigned int sendTimeout;
+
public:
StompProducerInfo() : ProducerInfo() {
@@ -49,6 +52,7 @@
this->disableMessageIds = false;
this->session = NULL;
this->destination = NULL;
+ this->sendTimeout = 0;
}
StompProducerInfo( Connector* connector ) :
@@ -58,6 +62,7 @@
this->disableMessageIds = false;
this->session = NULL;
this->destination = NULL;
+ this->sendTimeout = 0;
}
virtual ~StompProducerInfo(void) {
@@ -139,6 +144,25 @@
return this->disableMessageIds;
}
+ /**
+ * Gets the set send timeout for messages from this producer, a value
of
+ * zero indicates that the Producer should wait forever for a response
from
+ * the broker on the send.
+ * @return default time to wait for broker response to a message being
sent.
+ */
+ virtual unsigned int getSendTimeout() const {
+ return this->sendTimeout;
+ }
+
+ /**
+ * Sets the time to wait for the broker to respond to a message being
sent
+ * @param timeout - The time to wait for the broker to acknowledge
that a
+ * message has been received.
+ */
+ virtual void setSendTimeout( unsigned int timeout ) {
+ this->sendTimeout = timeout;
+ }
+
};
}}}
Modified:
activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConstants.cpp
URL:
http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConstants.cpp?rev=686632&r1=686631&r2=686632&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConstants.cpp
(original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConstants.cpp
Sun Aug 17 11:57:34 2008
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
+
#include "ActiveMQConstants.h"
#include <stdio.h>
@@ -26,16 +26,16 @@
string ActiveMQConstants::StaticInitializer::destOptions[NUM_OPTIONS];
string ActiveMQConstants::StaticInitializer::uriParams[NUM_PARAMS];
-map< std::string, ActiveMQConstants::DestinationOption >
+map< std::string, ActiveMQConstants::DestinationOption >
ActiveMQConstants::StaticInitializer::destOptionMap;
-map< std::string, ActiveMQConstants::URIParam >
+map< std::string, ActiveMQConstants::URIParam >
ActiveMQConstants::StaticInitializer::uriParamsMap;
ActiveMQConstants::StaticInitializer ActiveMQConstants::staticInits;
////////////////////////////////////////////////////////////////////////////////
ActiveMQConstants::StaticInitializer::StaticInitializer(){
-
+
destOptions[CONSUMER_PREFECTCHSIZE] = "consumer.prefetchSize";
destOptions[CUNSUMER_MAXPENDINGMSGLIMIT] =
"consumer.maximumPendingMessageLimit";
destOptions[CONSUMER_NOLOCAL] = "consumer.noLocal";
@@ -44,10 +44,12 @@
destOptions[CONSUMER_SELECTOR] = "consumer.selector";
destOptions[CONSUMER_EXCLUSIVE] = "consumer.exclusive";
destOptions[CONSUMER_PRIORITY] = "consumer.priority";
-
+
+ uriParams[PARAM_SENDTIMEOUT] = "connection.sendTimeout";
+ uriParams[PARAM_PRODUCERWINDOWSIZE] = "connection.producerWidowSize";
uriParams[PARAM_USERNAME] = "username";
uriParams[PARAM_PASSWORD] = "password";
- uriParams[PARAM_CLIENTID] = "client-id";
+ uriParams[PARAM_CLIENTID] = "client-id";
for( int ix=0; ix<NUM_OPTIONS; ++ix ){
destOptionMap[destOptions[ix]] = (DestinationOption)ix;
Modified: activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConstants.h
URL:
http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConstants.h?rev=686632&r1=686631&r2=686632&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConstants.h
(original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConstants.h Sun
Aug 17 11:57:34 2008
@@ -54,6 +54,8 @@
*/
enum URIParam
{
+ PARAM_SENDTIMEOUT,
+ PARAM_PRODUCERWINDOWSIZE,
PARAM_USERNAME,
PARAM_PASSWORD,
PARAM_CLIENTID,
Modified:
activemq/activemq-cpp/trunk/src/main/activemq/transport/IOTransport.cpp
URL:
http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/transport/IOTransport.cpp?rev=686632&r1=686631&r2=686632&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/transport/IOTransport.cpp
(original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/transport/IOTransport.cpp Sun
Aug 17 11:57:34 2008
@@ -221,3 +221,12 @@
__FILE__, __LINE__,
"IOTransport::request() - unsupported operation" );
}
+
+////////////////////////////////////////////////////////////////////////////////
+Response* IOTransport::request( Command* command AMQCPP_UNUSED, unsigned int
timeout AMQCPP_UNUSED )
+ throw( CommandIOException,
decaf::lang::exceptions::UnsupportedOperationException ){
+
+ throw decaf::lang::exceptions::UnsupportedOperationException(
+ __FILE__, __LINE__,
+ "IOTransport::request() - unsupported operation" );
+}
Modified: activemq/activemq-cpp/trunk/src/main/activemq/transport/IOTransport.h
URL:
http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/transport/IOTransport.h?rev=686632&r1=686631&r2=686632&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/transport/IOTransport.h
(original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/transport/IOTransport.h Sun
Aug 17 11:57:34 2008
@@ -159,6 +159,16 @@
throw( CommandIOException,
decaf::lang::exceptions::UnsupportedOperationException );
/**
+ * Not supported by this class - throws an exception.
+ * @param command the command to be sent.
+ * @param timeout the time to wait for a response.
+ * @returns the response to the command sent.
+ * @throws UnsupportedOperationException.
+ */
+ virtual Response* request( Command* command, unsigned int timeout )
+ throw( CommandIOException,
decaf::lang::exceptions::UnsupportedOperationException );
+
+ /**
* Assigns the command listener for non-response commands.
* @param listener the listener.
*/
Modified:
activemq/activemq-cpp/trunk/src/main/activemq/transport/MockTransport.cpp
URL:
http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/transport/MockTransport.cpp?rev=686632&r1=686631&r2=686632&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/transport/MockTransport.cpp
(original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/transport/MockTransport.cpp
Sun Aug 17 11:57:34 2008
@@ -57,7 +57,7 @@
}
////////////////////////////////////////////////////////////////////////////////
-unsigned int MockTransport::getNextCommandId()
+unsigned int MockTransport::getNextCommandId()
throw ( activemq::exceptions::ActiveMQException ) {
try{
@@ -127,3 +127,18 @@
AMQ_CATCH_EXCEPTION_CONVERT( Exception, CommandIOException )
AMQ_CATCHALL_THROW( CommandIOException )
}
+
+////////////////////////////////////////////////////////////////////////////////
+Response* MockTransport::request( Command* command, unsigned int timeout
AMQCPP_UNUSED )
+ throw( CommandIOException,
+ decaf::lang::exceptions::UnsupportedOperationException)
+{
+ try{
+ this->request( command );
+ }
+ AMQ_CATCH_RETHROW( CommandIOException )
+ AMQ_CATCH_RETHROW( UnsupportedOperationException )
+ AMQ_CATCH_EXCEPTION_CONVERT( ActiveMQException, CommandIOException )
+ AMQ_CATCH_EXCEPTION_CONVERT( Exception, CommandIOException )
+ AMQ_CATCHALL_THROW( CommandIOException )
+}
Modified:
activemq/activemq-cpp/trunk/src/main/activemq/transport/MockTransport.h
URL:
http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/transport/MockTransport.h?rev=686632&r1=686631&r2=686632&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/transport/MockTransport.h
(original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/transport/MockTransport.h Sun
Aug 17 11:57:34 2008
@@ -213,6 +213,11 @@
throw( CommandIOException,
decaf::lang::exceptions::UnsupportedOperationException);
+
+ virtual Response* request( Command* command, unsigned int timeout )
+ throw( CommandIOException,
+ decaf::lang::exceptions::UnsupportedOperationException);
+
virtual void setCommandListener( CommandListener* listener ){
this->commandListener = listener;
}
Modified: activemq/activemq-cpp/trunk/src/main/activemq/transport/Transport.h
URL:
http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/transport/Transport.h?rev=686632&r1=686631&r2=686632&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/transport/Transport.h
(original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/transport/Transport.h Sun Aug
17 11:57:34 2008
@@ -81,6 +81,20 @@
decaf::lang::exceptions::UnsupportedOperationException ) =
0;
/**
+ * Sends the given command to the broker and then waits for the
response.
+ * @param command - The command to be sent.
+ * @param timeout - The time to wait for this response.
+ * @return the response from the broker.
+ * @throws CommandIOException if an exception occurs during the read
of the
+ * command.
+ * @throws UnsupportedOperationException if this method is not
implemented
+ * by this transport.
+ */
+ virtual Response* request( Command* command, unsigned int timeout )
+ throw( CommandIOException,
+ decaf::lang::exceptions::UnsupportedOperationException ) =
0;
+
+ /**
* Assigns the command listener for non-response commands.
* @param listener the listener.
*/
Modified:
activemq/activemq-cpp/trunk/src/main/activemq/transport/TransportFilter.h
URL:
http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/transport/TransportFilter.h?rev=686632&r1=686631&r2=686632&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/transport/TransportFilter.h
(original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/transport/TransportFilter.h
Sun Aug 17 11:57:34 2008
@@ -143,6 +143,19 @@
}
/**
+ * Not supported by this class - throws an exception.
+ * @param command - The command that is sent as a request
+ * @param timeout - The the time to wait for a response.
+ * @throws CommandIOException
+ * @throws UnsupportedOperationException.
+ */
+ virtual Response* request( Command* command, unsigned int timeout )
+ throw( CommandIOException,
decaf::lang::exceptions::UnsupportedOperationException ){
+
+ return next->request( command, timeout );
+ }
+
+ /**
* Assigns the command listener for non-response commands.
* @param listener the listener.
*/
Modified:
activemq/activemq-cpp/trunk/src/main/activemq/transport/filters/LoggingTransport.cpp
URL:
http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/transport/filters/LoggingTransport.cpp?rev=686632&r1=686631&r2=686632&view=diff
==============================================================================
---
activemq/activemq-cpp/trunk/src/main/activemq/transport/filters/LoggingTransport.cpp
(original)
+++
activemq/activemq-cpp/trunk/src/main/activemq/transport/filters/LoggingTransport.cpp
Sun Aug 17 11:57:34 2008
@@ -93,3 +93,28 @@
AMQ_CATCH_EXCEPTION_CONVERT( Exception, CommandIOException )
AMQ_CATCHALL_THROW( CommandIOException )
}
+
+////////////////////////////////////////////////////////////////////////////////
+Response* LoggingTransport::request( Command* command, unsigned int timeout )
+ throw(CommandIOException,
decaf::lang::exceptions::UnsupportedOperationException) {
+
+ try {
+
+ // Delegate to the base class.
+ Response* response = TransportFilter::request( command, timeout );
+
+ ostringstream ostream;
+ ostream << "*** SENDING REQUEST COMMAND: Timeout = " << timeout << "
***" << endl;
+ ostream << command->toString() << endl;
+ ostream << "*** RECEIVED RESPONSE COMMAND ***" << endl;
+ ostream << ( response == NULL? "NULL" : response->toString() );
+
+ LOGDECAF_INFO( logger, ostream.str() );
+
+ return response;
+ }
+ AMQ_CATCH_RETHROW( CommandIOException )
+ AMQ_CATCH_RETHROW( UnsupportedOperationException )
+ AMQ_CATCH_EXCEPTION_CONVERT( Exception, CommandIOException )
+ AMQ_CATCHALL_THROW( CommandIOException )
+}
Modified:
activemq/activemq-cpp/trunk/src/main/activemq/transport/filters/LoggingTransport.h
URL:
http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/transport/filters/LoggingTransport.h?rev=686632&r1=686631&r2=686632&view=diff
==============================================================================
---
activemq/activemq-cpp/trunk/src/main/activemq/transport/filters/LoggingTransport.h
(original)
+++
activemq/activemq-cpp/trunk/src/main/activemq/transport/filters/LoggingTransport.h
Sun Aug 17 11:57:34 2008
@@ -73,6 +73,16 @@
throw( CommandIOException,
decaf::lang::exceptions::UnsupportedOperationException);
+ /**
+ * Not supported by this class - throws an exception.
+ * @param command the command that is sent as a request
+ * @param timeout the time to wait for a response.
+ * @throws UnsupportedOperationException.
+ */
+ virtual Response* request( Command* command, unsigned int timeout )
+ throw( CommandIOException,
+ decaf::lang::exceptions::UnsupportedOperationException);
+
};
}}}
Modified:
activemq/activemq-cpp/trunk/src/main/activemq/transport/filters/ResponseCorrelator.cpp
URL:
http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/transport/filters/ResponseCorrelator.cpp?rev=686632&r1=686631&r2=686632&view=diff
==============================================================================
---
activemq/activemq-cpp/trunk/src/main/activemq/transport/filters/ResponseCorrelator.cpp
(original)
+++
activemq/activemq-cpp/trunk/src/main/activemq/transport/filters/ResponseCorrelator.cpp
Sun Aug 17 11:57:34 2008
@@ -46,13 +46,9 @@
////////////////////////////////////////////////////////////////////////////////
ResponseCorrelator::ResponseCorrelator( Transport* next, bool own )
:
- TransportFilter( next, own )
-{
- nextCommandId = 0;
-
- // Default max response wait time to 3 seconds.
- maxResponseWaitTime = 3000;
+ TransportFilter( next, own ) {
+ nextCommandId = 0;
// Start in the closed state.
closed = true;
}
@@ -68,16 +64,6 @@
}
////////////////////////////////////////////////////////////////////////////////
-unsigned long ResponseCorrelator::getMaxResponseWaitTime() const{
- return maxResponseWaitTime;
-}
-
-////////////////////////////////////////////////////////////////////////////////
-void ResponseCorrelator::setMaxResponseWaitTime( const unsigned long
milliseconds ){
- maxResponseWaitTime = milliseconds;
-}
-
-////////////////////////////////////////////////////////////////////////////////
void ResponseCorrelator::oneway( Command* command )
throw( CommandIOException,
decaf::lang::exceptions::UnsupportedOperationException ) {
@@ -123,7 +109,64 @@
next->oneway( command );
// Get the response.
- response = futureResponse->getResponse( maxResponseWaitTime );
+ response = futureResponse->getResponse();
+
+ // Perform cleanup on the map.
+ synchronized( &mapMutex ){
+
+ // We've done our waiting - get this thing out
+ // of the map.
+ requestMap.erase( command->getCommandId() );
+
+ // Destroy the futureResponse. It is safe to
+ // do this now because the other thread only
+ // accesses the futureResponse within a lock on
+ // the map.
+ delete futureResponse;
+ futureResponse = NULL;
+ }
+
+ if( response == NULL ){
+
+ throw CommandIOException( __FILE__, __LINE__,
+ "No valid response received for command: %s, check broker.",
+ command->toString().c_str() );
+ }
+
+ return response;
+ }
+ AMQ_CATCH_RETHROW( UnsupportedOperationException )
+ AMQ_CATCH_RETHROW( CommandIOException )
+ AMQ_CATCH_EXCEPTION_CONVERT( ActiveMQException, CommandIOException )
+ AMQ_CATCH_EXCEPTION_CONVERT( Exception, CommandIOException )
+ AMQ_CATCHALL_THROW( CommandIOException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+Response* ResponseCorrelator::request( Command* command, unsigned int timeout )
+ throw( CommandIOException,
decaf::lang::exceptions::UnsupportedOperationException ) {
+
+ try{
+ command->setCommandId( getNextCommandId() );
+ command->setResponseRequired( true );
+
+ // Add a future response object to the map indexed by this
+ // command id.
+ FutureResponse* futureResponse = new FutureResponse();
+
+ synchronized( &mapMutex ){
+ requestMap[command->getCommandId()] = futureResponse;
+ }
+
+ // Wait to be notified of the response via the futureResponse
+ // object.
+ Response* response = NULL;
+
+ // Send the request.
+ next->oneway( command );
+
+ // Get the response.
+ response = futureResponse->getResponse( timeout );
// Perform cleanup on the map.
synchronized( &mapMutex ){
@@ -235,6 +278,14 @@
try{
+ // Wake-up any outstanding requests.
+ synchronized( &mapMutex ){
+ std::map<unsigned int, FutureResponse*>::iterator iter =
requestMap.begin();
+ for( ; iter != requestMap.end(); ++iter ){
+ iter->second->setResponse( NULL );
+ }
+ }
+
if( !closed && next != NULL ){
next->close();
}
@@ -245,3 +296,20 @@
AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException )
AMQ_CATCHALL_THROW( ActiveMQException )
}
+
+////////////////////////////////////////////////////////////////////////////////
+void ResponseCorrelator::onTransportException(
+ Transport* source AMQCPP_UNUSED,
+ const decaf::lang::Exception& ex ) {
+
+ // Trigger each outstanding request to complete so that we don't hang
+ // forever waiting for one that has been sent without timeout.
+ synchronized( &mapMutex ){
+ std::map<unsigned int, FutureResponse*>::iterator iter =
requestMap.begin();
+ for( ; iter != requestMap.end(); ++iter ){
+ iter->second->setResponse( NULL );
+ }
+ }
+
+ fire( ex );
+}
Modified:
activemq/activemq-cpp/trunk/src/main/activemq/transport/filters/ResponseCorrelator.h
URL:
http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/transport/filters/ResponseCorrelator.h?rev=686632&r1=686631&r2=686632&view=diff
==============================================================================
---
activemq/activemq-cpp/trunk/src/main/activemq/transport/filters/ResponseCorrelator.h
(original)
+++
activemq/activemq-cpp/trunk/src/main/activemq/transport/filters/ResponseCorrelator.h
Sun Aug 17 11:57:34 2008
@@ -51,11 +51,6 @@
std::map<unsigned int, FutureResponse*> requestMap;
/**
- * Maximum amount of time in milliseconds to wait for a response.
- */
- unsigned long maxResponseWaitTime;
-
- /**
* Sync object for accessing the next command id variable.
*/
decaf::util::concurrent::Mutex commandIdMutex;
@@ -89,18 +84,6 @@
virtual ~ResponseCorrelator();
/**
- * Gets the maximum wait time for a response in milliseconds.
- * @return max time that a response can take
- */
- virtual unsigned long getMaxResponseWaitTime() const;
-
- /**
- * Sets the maximum wait time for a response in milliseconds.
- * @param milliseconds the max time that a response can take.
- */
- virtual void setMaxResponseWaitTime( const unsigned long milliseconds
);
-
- /**
* Sends a one-way command. Does not wait for any response from the
* broker.
* @param command the command to be sent.
@@ -124,6 +107,17 @@
decaf::lang::exceptions::UnsupportedOperationException );
/**
+ * Sends the given request to the server and waits for the response.
+ * @param command The request to send.
+ * @param timeout The time to wait for a response.
+ * @return the response from the server.
+ * @throws CommandIOException if an error occurs with the request.
+ */
+ virtual Response* request( Command* command, unsigned int timeout )
+ throw( CommandIOException,
+ decaf::lang::exceptions::UnsupportedOperationException );
+
+ /**
* This is called in the context of the nested transport's
* reading thread. In the case of a response object,
* updates the request map and notifies those waiting on the
@@ -152,6 +146,14 @@
*/
virtual void close() throw( cms::CMSException );
+ /**
+ * Event handler for an exception from a command transport.
+ * @param source The source of the exception
+ * @param ex The exception.
+ */
+ virtual void onTransportException( Transport* source,
+ const decaf::lang::Exception& ex );
+
};
}}}
Modified:
activemq/activemq-cpp/trunk/src/main/activemq/transport/filters/ResponseCorrelatorFactory.cpp
URL:
http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/transport/filters/ResponseCorrelatorFactory.cpp?rev=686632&r1=686631&r2=686632&view=diff
==============================================================================
---
activemq/activemq-cpp/trunk/src/main/activemq/transport/filters/ResponseCorrelatorFactory.cpp
(original)
+++
activemq/activemq-cpp/trunk/src/main/activemq/transport/filters/ResponseCorrelatorFactory.cpp
Sun Aug 17 11:57:34 2008
@@ -47,12 +47,6 @@
try {
ResponseCorrelator* transport = new ResponseCorrelator( next, own );
-
- transport->setMaxResponseWaitTime(
- (unsigned long)Long::parseLong(
- properties.getProperty(
- "transport.ResponseCorrelator.maxResponseWaitTime", "3000"
) ) );
-
return transport;
}
AMQ_CATCH_RETHROW( ActiveMQException )
Modified:
activemq/activemq-cpp/trunk/src/test/activemq/transport/filters/ResponseCorrelatorTest.cpp
URL:
http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/test/activemq/transport/filters/ResponseCorrelatorTest.cpp?rev=686632&r1=686631&r2=686632&view=diff
==============================================================================
---
activemq/activemq-cpp/trunk/src/test/activemq/transport/filters/ResponseCorrelatorTest.cpp
(original)
+++
activemq/activemq-cpp/trunk/src/test/activemq/transport/filters/ResponseCorrelatorTest.cpp
Sun Aug 17 11:57:34 2008
@@ -133,7 +133,7 @@
// Send one request.
MyCommand cmd;
try{
- correlator.request( &cmd );
+ correlator.request( &cmd, 500 );
CPPUNIT_ASSERT(false);
}catch( CommandIOException& ex ){
// Expected.
Modified:
activemq/activemq-cpp/trunk/src/test/activemq/transport/filters/ResponseCorrelatorTest.h
URL:
http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/test/activemq/transport/filters/ResponseCorrelatorTest.h?rev=686632&r1=686631&r2=686632&view=diff
==============================================================================
---
activemq/activemq-cpp/trunk/src/test/activemq/transport/filters/ResponseCorrelatorTest.h
(original)
+++
activemq/activemq-cpp/trunk/src/test/activemq/transport/filters/ResponseCorrelatorTest.h
Sun Aug 17 11:57:34 2008
@@ -165,6 +165,16 @@
"stuff" );
}
+ virtual Response* request( Command* command AMQCPP_UNUSED,
+ unsigned int timeout AMQCPP_UNUSED )
+ throw(CommandIOException,
decaf::lang::exceptions::UnsupportedOperationException)
+ {
+ throw decaf::lang::exceptions::UnsupportedOperationException(
+ __FILE__,
+ __LINE__,
+ "stuff" );
+ }
+
virtual void setCommandListener( CommandListener* listener ){
this->listener = listener;
}