Just so you know I haven't forgotten about you :) ...
I have replicated the problem and I believe it is an issue with the broker.
I'm going to dig into it more later.
On 11/30/06, sgliu <[EMAIL PROTECTED]> wrote:
Sorry,according to your indication,message doesn't disappear.
I hope:
send first,a few time later,I receive nothing. (a few time later,message
will disappear itself.)
follow source code:
#include <activemq/concurrent/Thread.h>
#include <activemq/concurrent/Runnable.h>
#include <activemq/core/ActiveMQConnectionFactory.h>
#include <activemq/util/Integer.h>
#include <cms/Connection.h>
#include <cms/Session.h>
#include <cms/TextMessage.h>
#include <cms/ExceptionListener.h>
#include <cms/MessageListener.h>
#include <stdlib.h>
using namespace activemq::core;
using namespace activemq::util;
using namespace activemq::concurrent;
using namespace cms;
using namespace std;
#include <sys/timeb.h>
unsigned long getcurt()
{
struct timeb t;
ftime (&t);
unsigned long timeStamp = (t.time * 1000LL) + t.millitm;
return timeStamp;
}
class HelloWorldProducer : public Runnable {
private:
Connection* connection;
Session* session;
Topic* destination;
MessageProducer* producer;
int numMessages;
public:
HelloWorldProducer( int numMessages ){
connection = NULL;
session = NULL;
destination = NULL;
producer = NULL;
this->numMessages = numMessages;
}
virtual ~HelloWorldProducer(){
cleanup();
}
virtual void run() {
try {
string user,passwd,sID;
user="default";
passwd="";
sID="lsgID";
// Create a ConnectionFactory
ActiveMQConnectionFactory* connectionFactory = new
ActiveMQConnectionFactory("tcp://localhost:61613",user,passwd,sID);
// Create a Connection
connection =
connectionFactory->createConnection(user,passwd,sID);
connection->start();
string sss=connection->getClientId();
cout << sss << endl;
session = connection->createSession( Session::AUTO_ACKNOWLEDGE
);
destination = session->createTopic( "mytopic" );
producer = session->createProducer( destination );
producer->setDeliveryMode( DeliveryMode::PERSISTANT );
unsigned long ttt=getcurt();
producer->setTimeToLive( 10000);
// Create the Thread Id String
string threadIdStr = Integer::toString( Thread::getId() );
// Create a messages
string text = (string)"Hello world! from thread " +
threadIdStr;
for( int ix=0; ix<numMessages; ++ix ){
TextMessage* message = session->createTextMessage(
text );
// message->setCMSTimeStamp(ttt);
// message->setCMSExpiration(ttt +
10000); //消息到期时间
// string messageID="messageID";
// message->setCMSMessageId(messageID);
//消息ID
// producer->setTimeToLive(10000);
// Tell the producer to send the message
printf( "Sent message from thread %s\n",
threadIdStr.c_str() );
producer->send( message );
delete message;
}
}catch ( CMSException& e ) {
e.printStackTrace();
}
}
private:
void cleanup(){
// Destroy resources.
try{
if( destination != NULL ) delete destination;
}catch ( CMSException& e ) {}
destination = NULL;
try{
if( producer != NULL ) delete producer;
}catch ( CMSException& e ) {}
producer = NULL;
// Close open resources.
try{
if( session != NULL ) session->close();
if( connection != NULL ) connection->close();
}catch ( CMSException& e ) {}
try{
if( session != NULL ) delete session;
}catch ( CMSException& e ) {}
session = NULL;
try{
if( connection != NULL ) delete connection;
}catch ( CMSException& e ) {}
connection = NULL;
}
};
class HelloWorldConsumer : public ExceptionListener,
public MessageListener,
public Runnable {
private:
Connection* connection;
Session* session;
Topic* destination;
MessageConsumer* consumer;
long waitMillis;
public:
HelloWorldConsumer( long waitMillis ){
connection = NULL;
session = NULL;
destination = NULL;
consumer = NULL;
this->waitMillis = waitMillis;
}
virtual ~HelloWorldConsumer(){
cleanup();
}
virtual void run() {
try {
string user,passwd,sID;
user="default";
passwd="";
sID="lsgID";
// Create a ConnectionFactory
ActiveMQConnectionFactory* connectionFactory =
new ActiveMQConnectionFactory(
"tcp://localhost:61613",user,passwd,sID);
// Create a Connection
connection =
connectionFactory->createConnection();//user,passwd,sID);
delete connectionFactory;
connection->start();
connection->setExceptionListener(this);
// Create a Session
session = connection->createSession( Session::AUTO_ACKNOWLEDGE
);
// Create the destination (Topic or Queue)
destination = session->createTopic(
"mytopic?consumer.retroactive=true"
);
consumer = session->createDurableConsumer(
destination , user ,
"",false);
consumer->setMessageListener( this );
// Sleep while asynchronous messages come in.
Thread::sleep( waitMillis );
} catch (CMSException& e) {
e.printStackTrace();
}
}
virtual void onMessage( const Message* message ){
try
{
const TextMessage* textMessage =
dynamic_cast< const TextMessage* >( message );
string text = textMessage->getText();
printf( "Received: %s\n", text.c_str() );
} catch (CMSException& e) {
e.printStackTrace();
}
}
virtual void onException( const CMSException& ex ) {
printf("JMS Exception occured. Shutting down client.\n");
}
private:
void cleanup(){
// Destroy resources.
try{
if( destination != NULL ) delete destination;
}catch (CMSException& e) {}
destination = NULL;
try{
if( consumer != NULL ) delete consumer;
}catch (CMSException& e) {}
consumer = NULL;
// Close open resources.
try{
if( session != NULL ) session->close();
if( connection != NULL ) connection->close();
}catch (CMSException& e) {}
try{
if( session != NULL ) delete session;
}catch (CMSException& e) {}
session = NULL;
try{
if( connection != NULL ) delete connection;
}catch (CMSException& e) {}
connection = NULL;
}
};
void Produce()
{
HelloWorldProducer producer( 2 );
Thread producerThread( &producer );
producerThread.start();
producerThread.join();
}
void Consumer()
{
HelloWorldConsumer consumer( 5000 );
Thread consumerThread( &consumer );
consumerThread.start();
consumerThread.join();
}
int main(int argc, char* argv[])
{
// Produce();
cout << "Produce End." << endl;
Consumer();
}
----------------------------------------
I made follow two experiments:
1. In HelloWorldProducer,add follow code,but 11 seconds later, I receive
message yet.
producer->setTimeToLive( 10000);
2. In HelloWorldProducer,add follow code,but 11 seconds later, I receive
message yet.
unsigned long ttt=getcurt();
producer->setTimeToLive( ttt + 10000);
Each time, at first,I have commented out the line:Consumer() in main
function,and run program.
Second,wait 20 seconds.
Third,I have commented out the line:Produce() in main function,and run
program.
Both two experiment,I have received message.
That message don't disappear itself.
Why?
Please help me.
--
View this message in context:
http://www.nabble.com/Message%27s-live-time-tf2706004.html#a7617317
Sent from the ActiveMQ - User mailing list archive at Nabble.com.