Author: rgodfrey Date: Mon Oct 13 10:52:27 2014 New Revision: 1631345 URL: http://svn.apache.org/r1631345 Log: Merge from trunk
Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/ (props changed) qpid/branches/QPID-6125-ProtocolRefactoring/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java qpid/branches/QPID-6125-ProtocolRefactoring/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java qpid/branches/QPID-6125-ProtocolRefactoring/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java qpid/branches/QPID-6125-ProtocolRefactoring/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBConfigurationStore.java qpid/branches/QPID-6125-ProtocolRefactoring/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBUtils.java qpid/branches/QPID-6125-ProtocolRefactoring/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacade.java qpid/branches/QPID-6125-ProtocolRefactoring/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java qpid/branches/QPID-6125-ProtocolRefactoring/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/DatabasePinger.java qpid/branches/QPID-6125-ProtocolRefactoring/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java qpid/branches/QPID-6125-ProtocolRefactoring/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java qpid/branches/QPID-6125-ProtocolRefactoring/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java qpid/branches/QPID-6125-ProtocolRefactoring/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-core/src/main/java/org/apache/qpid/server/logging/messages/BrokerMessages.java qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-core/src/main/java/org/apache/qpid/server/logging/messages/Broker_logmessages.properties qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java Propchange: qpid/branches/QPID-6125-ProtocolRefactoring/java/ ------------------------------------------------------------------------------ Merged /qpid/trunk/qpid/java:r1630747-1631344 Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java?rev=1631345&r1=1631344&r2=1631345&view=diff ============================================================================== --- qpid/branches/QPID-6125-ProtocolRefactoring/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java (original) +++ qpid/branches/QPID-6125-ProtocolRefactoring/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java Mon Oct 13 10:52:27 2014 @@ -300,6 +300,12 @@ public class MessageConsumerImpl impleme } } + Message receiveRecoveredMessage() + { + return _replaymessages.isEmpty() ? null : _replaymessages.remove(0); + + } + Message receive0(final long timeout) { Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java?rev=1631345&r1=1631344&r2=1631345&view=diff ============================================================================== --- qpid/branches/QPID-6125-ProtocolRefactoring/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java (original) +++ qpid/branches/QPID-6125-ProtocolRefactoring/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java Mon Oct 13 10:52:27 2014 @@ -922,7 +922,15 @@ public class SessionImpl implements Sess else { consumer = _messageConsumerList.remove(0); - msg = consumer.receive0(0L); + msg = consumer.receiveRecoveredMessage(); + if(msg == null) + { + msg = consumer.receive0(0L); + } + else + { + recoveredMessage = true; + } } MessageListener listener = consumer._messageListener; Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java?rev=1631345&r1=1631344&r2=1631345&view=diff ============================================================================== --- qpid/branches/QPID-6125-ProtocolRefactoring/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java (original) +++ qpid/branches/QPID-6125-ProtocolRefactoring/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java Mon Oct 13 10:52:27 2014 @@ -107,7 +107,7 @@ public abstract class AbstractBDBMessage { new Upgrader(getEnvironmentFacade().getEnvironment(), getParent()).upgradeIfNecessary(); } - catch(DatabaseException e) + catch(RuntimeException e) { throw getEnvironmentFacade().handleDatabaseException("Cannot upgrade store", e); } @@ -138,7 +138,7 @@ public abstract class AbstractBDBMessage MESSAGE_METADATA_SEQ_CONFIG); newMessageId = mmdSeq.get(null, 1); } - catch (DatabaseException de) + catch (RuntimeException de) { throw getEnvironmentFacade().handleDatabaseException("Cannot get sequence value for new message", de); } @@ -216,7 +216,7 @@ public abstract class AbstractBDBMessage } } } - catch (DatabaseException e) + catch (RuntimeException e) { throw getEnvironmentFacade().handleDatabaseException("Cannot visit message instances", e); } @@ -259,7 +259,7 @@ public abstract class AbstractBDBMessage entries.add(entry); } } - catch (DatabaseException e) + catch (RuntimeException e) { throw getEnvironmentFacade().handleDatabaseException("Cannot visit message instances", e); } @@ -306,7 +306,7 @@ public abstract class AbstractBDBMessage } } - catch (DatabaseException e) + catch (RuntimeException e) { throw getEnvironmentFacade().handleDatabaseException("Cannot recover distributed transactions", e); } @@ -350,7 +350,7 @@ public abstract class AbstractBDBMessage return mdd; } - catch (DatabaseException e) + catch (RuntimeException e) { throw getEnvironmentFacade().handleDatabaseException("Error reading message metadata for message with id " + messageId @@ -424,7 +424,7 @@ public abstract class AbstractBDBMessage tx.abort(); } } - catch(DatabaseException e2) + catch(RuntimeException e2) { getLogger().warn( "Unable to abort transaction after LockConflictException on removal of message with id " @@ -465,7 +465,7 @@ public abstract class AbstractBDBMessage } while(!complete); } - catch (DatabaseException e) + catch (RuntimeException e) { getLogger().error("Unexpected BDB exception", e); @@ -550,7 +550,7 @@ public abstract class AbstractBDBMessage } return written; } - catch (DatabaseException e) + catch (RuntimeException e) { throw getEnvironmentFacade().handleDatabaseException("Error getting AMQMessage with id " + messageId @@ -587,7 +587,7 @@ public abstract class AbstractBDBMessage } } - catch (DatabaseException e) + catch (RuntimeException e) { throw getEnvironmentFacade().handleDatabaseException("Error getting AMQMessage with id " + messageId @@ -618,7 +618,7 @@ public abstract class AbstractBDBMessage } } } - catch (DatabaseException e) + catch (RuntimeException e) { throw environmentFacade.handleDatabaseException("Cannot visit messages", e); } @@ -630,7 +630,7 @@ public abstract class AbstractBDBMessage { cursor.close(); } - catch(DatabaseException e) + catch(RuntimeException e) { throw environmentFacade.handleDatabaseException("Cannot close cursor", e); } @@ -659,7 +659,7 @@ public abstract class AbstractBDBMessage } } - catch (DatabaseException e) + catch (RuntimeException e) { throw environmentFacade.handleDatabaseException("Cannot visit messages", e); } @@ -697,7 +697,7 @@ public abstract class AbstractBDBMessage } } - catch (DatabaseException e) + catch (RuntimeException e) { throw getEnvironmentFacade().handleDatabaseException("Error writing AMQMessage with id " + messageId @@ -740,7 +740,7 @@ public abstract class AbstractBDBMessage getLogger().debug("Storing message metadata for message id " + messageId + " in transaction " + tx); } } - catch (DatabaseException e) + catch (RuntimeException e) { throw getEnvironmentFacade().handleDatabaseException("Error writing message metadata with id " + messageId @@ -779,7 +779,7 @@ public abstract class AbstractBDBMessage } getDeliveryDb().put(tx, key, value); } - catch (DatabaseException e) + catch (RuntimeException e) { getLogger().error("Failed to enqueue: " + e.getMessage(), e); throw getEnvironmentFacade().handleDatabaseException("Error writing enqueued message with id " @@ -838,7 +838,7 @@ public abstract class AbstractBDBMessage } } - catch (DatabaseException e) + catch (RuntimeException e) { getLogger().error("Failed to dequeue message " + messageId + " in transaction " + tx, e); @@ -879,7 +879,7 @@ public abstract class AbstractBDBMessage getXidDb().put(txn, key, value); return postActions; } - catch (DatabaseException e) + catch (RuntimeException e) { getLogger().error("Failed to write xid: " + e.getMessage(), e); throw getEnvironmentFacade().handleDatabaseException("Error writing xid to database", e); @@ -910,7 +910,7 @@ public abstract class AbstractBDBMessage } } - catch (DatabaseException e) + catch (RuntimeException e) { getLogger().error("Failed to remove xid in transaction " + txn, e); @@ -963,7 +963,7 @@ public abstract class AbstractBDBMessage { tx.abort(); } - catch (DatabaseException e) + catch (RuntimeException e) { throw getEnvironmentFacade().handleDatabaseException("Error aborting transaction: " + e.getMessage(), e); } @@ -975,7 +975,7 @@ public abstract class AbstractBDBMessage { storedSizeChange(delta); } - catch(DatabaseException e) + catch(RuntimeException e) { throw getEnvironmentFacade().handleDatabaseException("Stored size change exception", e); } @@ -1415,7 +1415,7 @@ public abstract class AbstractBDBMessage txn = getEnvironmentFacade().getEnvironment().beginTransaction( null, null); } - catch (DatabaseException e) + catch (RuntimeException e) { throw getEnvironmentFacade().handleDatabaseException("failed to begin transaction", e); } @@ -1476,7 +1476,7 @@ public abstract class AbstractBDBMessage { _txn = getEnvironmentFacade().getEnvironment().beginTransaction(null, null); } - catch(DatabaseException e) + catch(RuntimeException e) { throw getEnvironmentFacade().handleDatabaseException("Cannot create store transaction", e); } Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBConfigurationStore.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBConfigurationStore.java?rev=1631345&r1=1631344&r2=1631345&view=diff ============================================================================== --- qpid/branches/QPID-6125-ProtocolRefactoring/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBConfigurationStore.java (original) +++ qpid/branches/QPID-6125-ProtocolRefactoring/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBConfigurationStore.java Mon Oct 13 10:52:27 2014 @@ -140,7 +140,7 @@ public class BDBConfigurationStore imple } _initialRecords = new ConfiguredObjectRecord[0]; } - catch(DatabaseException e) + catch (RuntimeException e) { throw _environmentFacade.handleDatabaseException("Cannot upgrade store", e); } @@ -156,7 +156,7 @@ public class BDBConfigurationStore imple doVisitAllConfiguredObjectRecords(handler); handler.end(); } - catch (DatabaseException e) + catch (RuntimeException e) { throw _environmentFacade.handleDatabaseException("Cannot visit configured object records", e); } @@ -243,7 +243,7 @@ public class BDBConfigurationStore imple _environmentFacade.close(); _environmentFacade = null; } - catch(DatabaseException e) + catch (RuntimeException e) { throw new StoreException("Exception occurred on message store close", e); } @@ -268,7 +268,7 @@ public class BDBConfigurationStore imple txn.commit(); txn = null; } - catch (DatabaseException e) + catch (RuntimeException e) { throw _environmentFacade.handleDatabaseException("Error creating configured object " + configuredObject + " in database: " + e.getMessage(), e); @@ -305,7 +305,7 @@ public class BDBConfigurationStore imple txn = null; return removed.toArray(new UUID[removed.size()]); } - catch (DatabaseException e) + catch (RuntimeException e) { throw _environmentFacade.handleDatabaseException("Error deleting configured objects from database", e); } @@ -334,7 +334,7 @@ public class BDBConfigurationStore imple txn.commit(); txn = null; } - catch (DatabaseException e) + catch (RuntimeException e) { throw _environmentFacade.handleDatabaseException("Error updating configuration details within the store: " + e,e); } @@ -408,7 +408,7 @@ public class BDBConfigurationStore imple } writeHierarchyRecords(txn, configuredObject); } - catch (DatabaseException e) + catch (RuntimeException e) { throw _environmentFacade.handleDatabaseException("Error writing configured object " + configuredObject + " to database: " + e.getMessage(), e); Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBUtils.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBUtils.java?rev=1631345&r1=1631344&r2=1631345&view=diff ============================================================================== --- qpid/branches/QPID-6125-ProtocolRefactoring/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBUtils.java (original) +++ qpid/branches/QPID-6125-ProtocolRefactoring/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBUtils.java Mon Oct 13 10:52:27 2014 @@ -38,7 +38,7 @@ public class BDBUtils { cursor.close(); } - catch(DatabaseException e) + catch (RuntimeException e) { // We need the possible side effect of the facade restarting the environment but don't care about the exception throw environmentFacade.handleDatabaseException("Cannot close cursor", e); @@ -55,7 +55,7 @@ public class BDBUtils tx.abort(); } } - catch (DatabaseException e) + catch (RuntimeException e) { // We need the possible side effect of the facade restarting the environment but don't care about the exception environmentFacade.handleDatabaseException("Cannot abort transaction", e); Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacade.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacade.java?rev=1631345&r1=1631344&r2=1631345&view=diff ============================================================================== --- qpid/branches/QPID-6125-ProtocolRefactoring/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacade.java (original) +++ qpid/branches/QPID-6125-ProtocolRefactoring/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacade.java Mon Oct 13 10:52:27 2014 @@ -57,7 +57,7 @@ public interface EnvironmentFacade StoreFuture commit(com.sleepycat.je.Transaction tx, boolean sync); - DatabaseException handleDatabaseException(String contextMessage, DatabaseException e); + RuntimeException handleDatabaseException(String contextMessage, RuntimeException e); String getStoreLocation(); Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java?rev=1631345&r1=1631344&r2=1631345&view=diff ============================================================================== --- qpid/branches/QPID-6125-ProtocolRefactoring/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java (original) +++ qpid/branches/QPID-6125-ProtocolRefactoring/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java Mon Oct 13 10:52:27 2014 @@ -37,6 +37,7 @@ import com.sleepycat.je.SequenceConfig; import com.sleepycat.je.Transaction; import org.apache.log4j.Logger; +import org.apache.qpid.server.store.StoreException; import org.apache.qpid.server.store.StoreFuture; public class StandardEnvironmentFacade implements EnvironmentFacade @@ -266,13 +267,17 @@ public class StandardEnvironmentFacade i } @Override - public DatabaseException handleDatabaseException(String contextMessage, DatabaseException e) + public RuntimeException handleDatabaseException(String contextMessage, RuntimeException e) { if (_environment != null && !_environment.isValid()) { closeEnvironmentSafely(); } - return e; + if (e instanceof StoreException) + { + return e; + } + return new StoreException("Unexpected exception occurred on store operation", e); } @Override Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/DatabasePinger.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/DatabasePinger.java?rev=1631345&r1=1631344&r2=1631345&view=diff ============================================================================== --- qpid/branches/QPID-6125-ProtocolRefactoring/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/DatabasePinger.java (original) +++ qpid/branches/QPID-6125-ProtocolRefactoring/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/DatabasePinger.java Mon Oct 13 10:52:27 2014 @@ -79,7 +79,7 @@ public class DatabasePinger } } } - catch (DatabaseException de) + catch (RuntimeException de) { facade.handleDatabaseException("DatabaseException from DatabasePinger ", de); } Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java?rev=1631345&r1=1631344&r2=1631345&view=diff ============================================================================== --- qpid/branches/QPID-6125-ProtocolRefactoring/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java (original) +++ qpid/branches/QPID-6125-ProtocolRefactoring/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java Mon Oct 13 10:52:27 2014 @@ -82,6 +82,7 @@ import com.sleepycat.je.rep.vlsn.VLSNRan import com.sleepycat.je.utilint.PropUtil; import com.sleepycat.je.utilint.VLSN; import org.apache.log4j.Logger; +import org.apache.qpid.server.store.StoreException; import org.codehaus.jackson.map.ObjectMapper; import org.apache.qpid.server.configuration.IllegalConfigurationException; @@ -371,26 +372,40 @@ public class ReplicatedEnvironmentFacade } @Override - public DatabaseException handleDatabaseException(String contextMessage, final DatabaseException dbe) + public RuntimeException handleDatabaseException(String contextMessage, final RuntimeException dbe) { - boolean noMajority = dbe instanceof InsufficientReplicasException || dbe instanceof InsufficientAcksException; - - if (noMajority) + if (dbe instanceof StoreException || dbe instanceof ConnectionScopedRuntimeException) { - ReplicationGroupListener listener = _replicationGroupListener.get(); - if (listener != null) + return dbe; + } + else if (dbe instanceof DatabaseException) + { + boolean noMajority = dbe instanceof InsufficientReplicasException || dbe instanceof InsufficientAcksException; + + if (noMajority) + { + ReplicationGroupListener listener = _replicationGroupListener.get(); + if (listener != null) + { + listener.onNoMajority(); + } + } + + boolean restart = (noMajority || dbe instanceof RestartRequiredException); + if (restart) { - listener.onNoMajority(); + tryToRestartEnvironment((DatabaseException)dbe); + return new ConnectionScopedRuntimeException(noMajority ? "Required number of nodes not reachable" : "Underlying JE environment is being restarted", dbe); } } - - boolean restart = (noMajority || dbe instanceof RestartRequiredException); - if (restart) + else { - tryToRestartEnvironment(dbe); - throw new ConnectionScopedRuntimeException(noMajority ? "Required number of nodes not reachable" : "Underlying JE environment is being restarted", dbe); + if (dbe instanceof IllegalStateException && getFacadeState() == State.RESTARTING) + { + return new ConnectionScopedRuntimeException("Underlying JE environment is being restarted", dbe); + } } - return dbe; + return new StoreException("Unexpected exception occurred in replicated environment", dbe); } private void tryToRestartEnvironment(final DatabaseException dbe) @@ -452,12 +467,12 @@ public class ReplicatedEnvironmentFacade } if (_state.get() != State.OPEN) { - throw new IllegalStateException("Environment facade is not in opened state"); + throw new ConnectionScopedRuntimeException("Environment facade is not in opened state"); } if (!_environment.isValid()) { - throw new IllegalStateException("Environment is not valid"); + throw new ConnectionScopedRuntimeException("Environment is not valid"); } Database cachedHandle = _cachedDatabases.get(name); Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java?rev=1631345&r1=1631344&r2=1631345&view=diff ============================================================================== --- qpid/branches/QPID-6125-ProtocolRefactoring/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java (original) +++ qpid/branches/QPID-6125-ProtocolRefactoring/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java Mon Oct 13 10:52:27 2014 @@ -343,6 +343,7 @@ public class BDBHAVirtualHostNodeImpl ex String nodeAddress = node.getHostName() + ":" + node.getPort(); if (!_permittedNodes.contains(nodeAddress)) { + getEventLogger().message(getGroupLogSubject(), HighAvailabilityMessages.INTRUDER_DETECTED(node.getName(), nodeAddress)); shutdownOnIntruder(nodeAddress); throw new IllegalStateException("Intruder node detected: " + nodeAddress); } Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java?rev=1631345&r1=1631344&r2=1631345&view=diff ============================================================================== --- qpid/branches/QPID-6125-ProtocolRefactoring/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java (original) +++ qpid/branches/QPID-6125-ProtocolRefactoring/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java Mon Oct 13 10:52:27 2014 @@ -283,7 +283,7 @@ public class BDBMessageStoreTest extends catch (RuntimeException e) { assertEquals("Unexpected exception message", "Offset 15 is greater than message size 10 for message id " - + messageid_0_8 + "!", e.getMessage()); + + messageid_0_8 + "!", e.getCause().getMessage()); } // buffer is smaller then message size Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java?rev=1631345&r1=1631344&r2=1631345&view=diff ============================================================================== --- qpid/branches/QPID-6125-ProtocolRefactoring/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java (original) +++ qpid/branches/QPID-6125-ProtocolRefactoring/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java Mon Oct 13 10:52:27 2014 @@ -35,6 +35,7 @@ import java.util.concurrent.atomic.Atomi import org.apache.log4j.Logger; import org.apache.qpid.server.store.berkeleydb.EnvironmentFacade; +import org.apache.qpid.server.util.ConnectionScopedRuntimeException; import org.apache.qpid.test.utils.PortHelper; import org.apache.qpid.test.utils.QpidTestCase; import org.apache.qpid.test.utils.TestFileUtils; @@ -146,6 +147,24 @@ public class ReplicatedEnvironmentFacade assertNotSame("Expecting a new handle after database closure", handle1, handle3); } + public void testOpenDatabaseWhenFacadeIsNotOpened() throws Exception + { + DatabaseConfig createIfAbsentDbConfig = DatabaseConfig.DEFAULT.setAllowCreate(true); + + EnvironmentFacade ef = createMaster(); + ef.close(); + + try + { + ef.openDatabase("myDatabase", createIfAbsentDbConfig ); + fail("Database open should fail"); + } + catch(ConnectionScopedRuntimeException e) + { + assertEquals("Unexpected exception", "Environment facade is not in opened state", e.getMessage()); + } + } + public void testGetGroupName() throws Exception { assertEquals("Unexpected group name", TEST_GROUP_NAME, createMaster().getGroupName()); Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-core/src/main/java/org/apache/qpid/server/logging/messages/BrokerMessages.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-core/src/main/java/org/apache/qpid/server/logging/messages/BrokerMessages.java?rev=1631345&r1=1631344&r2=1631345&view=diff ============================================================================== --- qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-core/src/main/java/org/apache/qpid/server/logging/messages/BrokerMessages.java (original) +++ qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-core/src/main/java/org/apache/qpid/server/logging/messages/BrokerMessages.java Mon Oct 13 10:52:27 2014 @@ -50,6 +50,8 @@ public class BrokerMessages public static final String STOPPED_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "broker.stopped"; public static final String STATS_MSGS_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "broker.stats_msgs"; public static final String LISTENING_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "broker.listening"; + public static final String FLOW_TO_DISK_INACTIVE_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "broker.flow_to_disk_inactive"; + public static final String FLOW_TO_DISK_ACTIVE_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "broker.flow_to_disk_active"; public static final String MAX_MEMORY_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "broker.max_memory"; public static final String PLATFORM_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "broker.platform"; public static final String SHUTTING_DOWN_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "broker.shutting_down"; @@ -66,6 +68,8 @@ public class BrokerMessages Logger.getLogger(STOPPED_LOG_HIERARCHY); Logger.getLogger(STATS_MSGS_LOG_HIERARCHY); Logger.getLogger(LISTENING_LOG_HIERARCHY); + Logger.getLogger(FLOW_TO_DISK_INACTIVE_LOG_HIERARCHY); + Logger.getLogger(FLOW_TO_DISK_ACTIVE_LOG_HIERARCHY); Logger.getLogger(MAX_MEMORY_LOG_HIERARCHY); Logger.getLogger(PLATFORM_LOG_HIERARCHY); Logger.getLogger(SHUTTING_DOWN_LOG_HIERARCHY); @@ -265,6 +269,70 @@ public class BrokerMessages /** * Log a Broker message of the Format: + * <pre>BRK-1015 : Message flow to disk inactive : Message memory use {0,number,#}KB within threshold {1,number,#.##}KB</pre> + * Optional values are contained in [square brackets] and are numbered + * sequentially in the method call. + * + */ + public static LogMessage FLOW_TO_DISK_INACTIVE(Number param1, Number param2) + { + String rawMessage = _messages.getString("FLOW_TO_DISK_INACTIVE"); + + final Object[] messageArguments = {param1, param2}; + // Create a new MessageFormat to ensure thread safety. + // Sharing a MessageFormat and using applyPattern is not thread safe + MessageFormat formatter = new MessageFormat(rawMessage, _currentLocale); + + final String message = formatter.format(messageArguments); + + return new LogMessage() + { + public String toString() + { + return message; + } + + public String getLogHierarchy() + { + return FLOW_TO_DISK_INACTIVE_LOG_HIERARCHY; + } + }; + } + + /** + * Log a Broker message of the Format: + * <pre>BRK-1014 : Message flow to disk active : Message memory use {0,number,#}KB exceeds threshold {1,number,#.##}KB</pre> + * Optional values are contained in [square brackets] and are numbered + * sequentially in the method call. + * + */ + public static LogMessage FLOW_TO_DISK_ACTIVE(Number param1, Number param2) + { + String rawMessage = _messages.getString("FLOW_TO_DISK_ACTIVE"); + + final Object[] messageArguments = {param1, param2}; + // Create a new MessageFormat to ensure thread safety. + // Sharing a MessageFormat and using applyPattern is not thread safe + MessageFormat formatter = new MessageFormat(rawMessage, _currentLocale); + + final String message = formatter.format(messageArguments); + + return new LogMessage() + { + public String toString() + { + return message; + } + + public String getLogHierarchy() + { + return FLOW_TO_DISK_ACTIVE_LOG_HIERARCHY; + } + }; + } + + /** + * Log a Broker message of the Format: * <pre>BRK-1011 : Maximum Memory : {0,number} bytes</pre> * Optional values are contained in [square brackets] and are numbered * sequentially in the method call. Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-core/src/main/java/org/apache/qpid/server/logging/messages/Broker_logmessages.properties URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-core/src/main/java/org/apache/qpid/server/logging/messages/Broker_logmessages.properties?rev=1631345&r1=1631344&r2=1631345&view=diff ============================================================================== --- qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-core/src/main/java/org/apache/qpid/server/logging/messages/Broker_logmessages.properties (original) +++ qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-core/src/main/java/org/apache/qpid/server/logging/messages/Broker_logmessages.properties Mon Oct 13 10:52:27 2014 @@ -47,4 +47,9 @@ PLATFORM = BRK-1010 : Platform : JVM : { # 0 Maximum Memory MAX_MEMORY = BRK-1011 : Maximum Memory : {0,number} bytes -MANAGEMENT_MODE = BRK-1012 : Management Mode : User Details : {0} / {1} \ No newline at end of file +MANAGEMENT_MODE = BRK-1012 : Management Mode : User Details : {0} / {1} + +# 0 - Total message size +# 1 - Target memory size +FLOW_TO_DISK_ACTIVE = BRK-1014 : Message flow to disk active : Message memory use {0,number,#}KB exceeds threshold {1,number,#.##}KB +FLOW_TO_DISK_INACTIVE = BRK-1015 : Message flow to disk inactive : Message memory use {0,number,#}KB within threshold {1,number,#.##}KB \ No newline at end of file Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java?rev=1631345&r1=1631344&r2=1631345&view=diff ============================================================================== --- qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java (original) +++ qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java Mon Oct 13 10:52:27 2014 @@ -82,6 +82,9 @@ public class BrokerAdapter extends Abstr private Timer _reportingTimer; private final StatisticsCounter _messagesDelivered, _dataDelivered, _messagesReceived, _dataReceived; + /** Flags used to control the reporting of flow to disk. Protected by this */ + private boolean _totalMessageSizeExceedThresholdReported = false, _totalMessageSizeWithinThresholdReported = true; + @ManagedAttributeField private String _defaultVirtualHost; @ManagedAttributeField @@ -99,6 +102,7 @@ public class BrokerAdapter extends Abstr @ManagedAttributeField private String _confidentialConfigurationEncryptionProvider; + @ManagedObjectFactoryConstructor public BrokerAdapter(Map<String, Object> attributes, SystemConfig parent) @@ -437,6 +441,19 @@ public class BrokerAdapter extends Abstr } } + if (totalSize > totalTarget && !_totalMessageSizeExceedThresholdReported) + { + _eventLogger.message(BrokerMessages.FLOW_TO_DISK_ACTIVE(totalSize / 1024, totalTarget / 1024)); + _totalMessageSizeExceedThresholdReported = true; + _totalMessageSizeWithinThresholdReported = false; + } + else if (totalSize <= totalTarget && !_totalMessageSizeWithinThresholdReported) + { + _eventLogger.message(BrokerMessages.FLOW_TO_DISK_INACTIVE(totalSize / 1024, totalTarget / 1024)); + _totalMessageSizeWithinThresholdReported = true; + _totalMessageSizeExceedThresholdReported = false; + } + for(Map.Entry<VirtualHost<?, ?, ?>,Long> entry : vhs.entrySet()) { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org