christyc wrote:
Anyway, we have multiple users as publishers and am trying to test the
persistentence capability by not having any consumers consume messages. We
have some requirements that 1 million messages must be persisted and in our
current test, we generate up to 400k messages and then get OutOfMemoryError
: Java heap space coming from ActiveMQ. From what I see on the website,
having topics that are durable should be persisted to the database and
spooled to disk.
We are doing a very similar test and getting better results, using both
the journal and postgres db as persistent backend (find the broker
configuration enclosed).
We have now a serious problem in the broker not recovering from postgres
failures. When that happens, amq let quickly grow one of the the journal
files from about 20 MB to 60 and then blocks forever. Neither restarting
postgres nor restarting the broker allow amq to recover.
To replicate the problem:
- Linux, java 1.5
- ActiveMQ 4.0.1
- activemq.xml, as enclosed
- postgres 7.4.13
Then run the enclosed producer. After running for some minutes, stop
postgres and amq will not be able to recover, neither restarting
postgres nor restarting amq.
tito.
<!-- START SNIPPET: example -->
<beans xmlns="http://activemq.org/config/1.0">
<!-- Allows us to use system properties as variables in this configuration file -->
<bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"/>
<broker useJmx="true">
<!-- Use the following to set the broker memory limit (in bytes) -->
<memoryManager>
<usageManager id="memory-manager" limit="1048576"/>
</memoryManager>
<!-- Use the following to configure how ActiveMQ is exposed in JMX -->
<managementContext>
<managementContext connectorPort="1099" jmxDomainName="org.apache.activemq"/>
</managementContext>
<!-- In ActiveMQ 4, you can setup destination policies -->
<destinationPolicy>
<policyMap><policyEntries>
<policyEntry topic="FOO.>">
<dispatchPolicy>
<strictOrderDispatchPolicy />
</dispatchPolicy>
<subscriptionRecoveryPolicy>
<lastImageSubscriptionRecoveryPolicy />
</subscriptionRecoveryPolicy>
</policyEntry>
</policyEntries></policyMap>
</destinationPolicy>
<persistenceAdapter>
<journaledJDBC journalLogFiles="5" dataDirectory="${activemq.home}/activemq-data" dataSource="#postgres-ds"/>
<!-- To use a different datasource, use th following syntax : -->
<!--
<journaledJDBC journalLogFiles="5" dataDirectory="../activemq-data" dataSource="#postgres-ds"/>
-->
</persistenceAdapter>
<transportConnectors>
<transportConnector name="default" uri="tcp://localhost:61616" discoveryUri="multicast://default"/>
<transportConnector name="stomp" uri="stomp://localhost:61613"/>
</transportConnectors>
<networkConnectors>
<!-- by default just auto discover the other brokers -->
<networkConnector name="default" uri="multicast://default"/>
<!--
<networkConnector name="host1 and host2" uri="static://(tcp://host1:61616,tcp://host2:61616)" failover="true"/>
-->
</networkConnectors>
</broker>
<!-- This xbean configuration file supports all the standard spring xml configuration options -->
<!-- Postgres DataSource Sample Setup -->
<!-- -->
<!-- <bean id="postgres-ds" class="org.postgresql.ds.PGPoolingDataSource"> -->
<bean id="postgres-ds" class="org.postgresql.jdbc3.Jdbc3PoolingDataSource">
<property name="serverName" value="localhost"/>
<property name="databaseName" value="activemq"/>
<property name="portNumber" value="0"/>
<property name="user" value="activemq"/>
<property name="password" value="activemq"/>
<property name="dataSourceName" value="postgres"/>
<property name="initialConnections" value="1"/>
<property name="maxConnections" value="10"/>
</bean>
<!-- MySql DataSource Sample Setup -->
<!--
<bean id="mysql-ds" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">
<property name="driverClassName" value="com.mysql.jdbc.Driver"/>
<property name="url" value="jdbc:mysql://localhost/activemq?relaxAutoCommit=true"/>
<property name="username" value="activemq"/>
<property name="password" value="activemq"/>
<property name="poolPreparedStatements" value="true"/>
</bean>
-->
<!-- Embedded Derby DataSource Sample Setup -->
<!--
<bean id="derby-ds" class="org.apache.derby.jdbc.EmbeddedDataSource">
<property name="databaseName" value="derbydb"/>
<property name="createDatabase" value="create"/>
</bean>
-->
</beans>
<!-- END SNIPPET: example -->
import java.util.Properties;
import org.apache.log4j.*;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueConnection;
import javax.jms.QueueSession;
import javax.jms.QueueSender;
import javax.jms.Queue;
import javax.jms.ObjectMessage;
import javax.jms.Session;
import javax.jms.JMSException;
import java.util.Date;
//import it.link.regserv.OperazioneUDDI;
/**
* <p>Simple JMS client, publish text messages to testTopic Topic.
* </p>
*
* <p><b>NOTE</b>This code is a showcase only. It may not provide
* a stable production example.</p>
* @author Peter Antman
* @version $Revision: 3.1 $
*/
public class HelloPublisher {
/**
* Topic connection, hold on to this so you may close it.
*/
QueueConnection queueConnection;
/**
* Topic session, hold on to this so you may close it.
* Also used to create messages.
*/
QueueSession queueSession;
/**
* Use this to publish messages.
*/
QueueSender queueSender;
/**
* Destination where to publish.
*/
Queue queue;
/**
* Sets up all the JMS fixtures.
*
* Use close() when finished with object.
*
* @param factoryJNDI name of the topic connection factory to look up.
* @param queueJNDI name of the topic destination to look up
*/
public HelloPublisher(String factoryJNDI, String queueJNDI) throws
JMSException, NamingException {
// Get the initial context
//PropertyConfigurator.configure("helloPublisher.log4j.properties");
Properties properties = new Properties();
// properties.put(Context.INITIAL_CONTEXT_FACTORY,
"org.jnp.interfaces.NamingContextFactory");
// properties.put(Context.URL_PKG_PREFIXES, "org.jnp.interfaces");
// properties.put(Context.PROVIDER_URL, "127.0.0.1");
properties.put(Context.INITIAL_CONTEXT_FACTORY,
"org.apache.activemq.jndi.ActiveMQInitialContextFactory");
properties.put(Context.PROVIDER_URL,
"failover:tcp://10.114.87.205:61616");
properties.put("connectionFactoryNames","ConnectionFactory, "+
factoryJNDI);
properties.put("queue."+queueJNDI,queueJNDI);//+"?consumer.prefetch=10");
InitialContext context = new InitialContext(properties);
// Get the connection factory
QueueConnectionFactory queueFactory =
(QueueConnectionFactory)context.lookup(factoryJNDI);
// Create the connection
queueConnection = queueFactory.createQueueConnection();
// Create the session
// primo argomento : no transazione
// secondo argomento : Auto ACK
queueSession =
queueConnection.createQueueSession(false,Session.AUTO_ACKNOWLEDGE);
// Look up the destination
queue = (Queue)context.lookup(queueJNDI);
// Create a publisher
queueSender = queueSession.createSender(queue);
}
/**
* Publish the given String as a JMS message to the testTopic topic.
*/
public void publish(String msg) throws JMSException {
// Create a message
//OperazioneUDDI prova = new OperazioneUDDI();
//prova.setOperazione(OperazioneUDDI.CREAZIONE);
//prova.setOperazione(OperazioneUDDI.MODIFICA);
//prova.setOperazione(OperazioneUDDI.ELIMINAZIONE);
//prova.setIDTable(4);
//prova.setNomeServizio("ServizioDiProva");
//prova.setTipoServizio("SPC");
//prova.setNomeSoggetto("SoggettoDiProva");
//prova.setTipoSoggetto("SPC");
ObjectMessage message = queueSession.createObjectMessage(msg);
message.setStringProperty("CLIENT-HELLOWORLD","BOH");
// Publish the message
queueSender.send(message);
}
/**
* Close session and connection.
* When done, no publishing is possible any more.
*/
public void close() throws JMSException {
queueSession.close();
queueConnection.close();
}
/**
* Run an example publishing 10 messages to testTopic.
* Only works up to and including JBoss 2.4.0
*/
public static void main(String[] args) {
Date start,end;
start = new Date();
try {
// Create the HelloPublisher, giving it the name of the
// TopicConnection Factory and the Topic destination to
// use in lookup.
HelloPublisher publisher = new HelloPublisher(
// Name of
ConnectionFactory
"QueueConnectionFactory",
// Name of
destination to publish to
"queue/OperazioniGestoreUDDI");
// Publish 10 messages
byte[] msg = new byte[1024*100];
for (int i = 0; i < 1024*100; i++) {
msg[i] = 't';
}
String s = new String(msg);
for (int i = 1; i <= 1286281; i++) {
System.out.println("Publishing message: " + i);
publisher.publish(s);
}
// Close down your publisher
publisher.close();
} catch(Exception ex) {
System.err.println(
"An exception occurred while testing
HelloPublisher: " + ex);
ex.printStackTrace();
}
end =new Date();
System.out.println("Start Time: "+start.toString());
System.out.println("Finish Time: "+end.toString());
}
} // HelloPublisher