http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/25ae4724/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/server/config/impl/ConnectionFactoryConfigurationImpl.java ---------------------------------------------------------------------- diff --git a/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/server/config/impl/ConnectionFactoryConfigurationImpl.java b/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/server/config/impl/ConnectionFactoryConfigurationImpl.java index b5efcd7..187d6c4 100644 --- a/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/server/config/impl/ConnectionFactoryConfigurationImpl.java +++ b/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/server/config/impl/ConnectionFactoryConfigurationImpl.java @@ -127,24 +127,29 @@ public class ConnectionFactoryConfigurationImpl implements ConnectionFactoryConf // ConnectionFactoryConfiguration implementation ----------------- + @Override public String[] getBindings() { return bindings; } + @Override public ConnectionFactoryConfiguration setBindings(final String... bindings) { this.bindings = bindings; return this; } + @Override public String getName() { return name; } + @Override public ConnectionFactoryConfiguration setName(String name) { this.name = name; return this; } + @Override public boolean isPersisted() { return persisted; } @@ -152,6 +157,7 @@ public class ConnectionFactoryConfigurationImpl implements ConnectionFactoryConf /** * @return the discoveryGroupName */ + @Override public String getDiscoveryGroupName() { return discoveryGroupName; } @@ -159,209 +165,255 @@ public class ConnectionFactoryConfigurationImpl implements ConnectionFactoryConf /** * @param discoveryGroupName the discoveryGroupName to set */ + @Override public ConnectionFactoryConfiguration setDiscoveryGroupName(String discoveryGroupName) { this.discoveryGroupName = discoveryGroupName; return this; } + @Override public List<String> getConnectorNames() { return connectorNames; } + @Override public ConnectionFactoryConfiguration setConnectorNames(final List<String> connectorNames) { this.connectorNames = connectorNames; return this; } + @Override public ConnectionFactoryConfiguration setConnectorNames(final String...names) { return this.setConnectorNames(Arrays.asList(names)); } + @Override public boolean isHA() { return ha; } + @Override public ConnectionFactoryConfiguration setHA(final boolean ha) { this.ha = ha; return this; } + @Override public String getClientID() { return clientID; } + @Override public ConnectionFactoryConfiguration setClientID(final String clientID) { this.clientID = clientID; return this; } + @Override public long getClientFailureCheckPeriod() { return clientFailureCheckPeriod; } + @Override public ConnectionFactoryConfiguration setClientFailureCheckPeriod(final long clientFailureCheckPeriod) { this.clientFailureCheckPeriod = clientFailureCheckPeriod; return this; } + @Override public long getConnectionTTL() { return connectionTTL; } + @Override public ConnectionFactoryConfiguration setConnectionTTL(final long connectionTTL) { this.connectionTTL = connectionTTL; return this; } + @Override public long getCallTimeout() { return callTimeout; } + @Override public ConnectionFactoryConfiguration setCallTimeout(final long callTimeout) { this.callTimeout = callTimeout; return this; } + @Override public long getCallFailoverTimeout() { return callFailoverTimeout; } + @Override public ConnectionFactoryConfiguration setCallFailoverTimeout(long callFailoverTimeout) { this.callFailoverTimeout = callFailoverTimeout; return this; } + @Override public boolean isCacheLargeMessagesClient() { return cacheLargeMessagesClient; } + @Override public ConnectionFactoryConfiguration setCacheLargeMessagesClient(final boolean cacheLargeMessagesClient) { this.cacheLargeMessagesClient = cacheLargeMessagesClient; return this; } + @Override public int getMinLargeMessageSize() { return minLargeMessageSize; } + @Override public ConnectionFactoryConfiguration setMinLargeMessageSize(final int minLargeMessageSize) { this.minLargeMessageSize = minLargeMessageSize; return this; } + @Override public int getConsumerWindowSize() { return consumerWindowSize; } + @Override public ConnectionFactoryConfiguration setConsumerWindowSize(final int consumerWindowSize) { this.consumerWindowSize = consumerWindowSize; return this; } + @Override public int getConsumerMaxRate() { return consumerMaxRate; } + @Override public ConnectionFactoryConfiguration setConsumerMaxRate(final int consumerMaxRate) { this.consumerMaxRate = consumerMaxRate; return this; } + @Override public int getConfirmationWindowSize() { return confirmationWindowSize; } + @Override public ConnectionFactoryConfiguration setConfirmationWindowSize(final int confirmationWindowSize) { this.confirmationWindowSize = confirmationWindowSize; return this; } + @Override public int getProducerMaxRate() { return producerMaxRate; } + @Override public ConnectionFactoryConfiguration setProducerMaxRate(final int producerMaxRate) { this.producerMaxRate = producerMaxRate; return this; } + @Override public int getProducerWindowSize() { return producerWindowSize; } + @Override public ConnectionFactoryConfiguration setProducerWindowSize(final int producerWindowSize) { this.producerWindowSize = producerWindowSize; return this; } + @Override public boolean isBlockOnAcknowledge() { return blockOnAcknowledge; } + @Override public ConnectionFactoryConfiguration setBlockOnAcknowledge(final boolean blockOnAcknowledge) { this.blockOnAcknowledge = blockOnAcknowledge; return this; } + @Override public boolean isBlockOnDurableSend() { return blockOnDurableSend; } + @Override public ConnectionFactoryConfiguration setBlockOnDurableSend(final boolean blockOnDurableSend) { this.blockOnDurableSend = blockOnDurableSend; return this; } + @Override public boolean isBlockOnNonDurableSend() { return blockOnNonDurableSend; } + @Override public ConnectionFactoryConfiguration setBlockOnNonDurableSend(final boolean blockOnNonDurableSend) { this.blockOnNonDurableSend = blockOnNonDurableSend; return this; } + @Override public boolean isAutoGroup() { return autoGroup; } + @Override public ConnectionFactoryConfiguration setAutoGroup(final boolean autoGroup) { this.autoGroup = autoGroup; return this; } + @Override public boolean isPreAcknowledge() { return preAcknowledge; } + @Override public ConnectionFactoryConfiguration setPreAcknowledge(final boolean preAcknowledge) { this.preAcknowledge = preAcknowledge; return this; } + @Override public String getLoadBalancingPolicyClassName() { return loadBalancingPolicyClassName; } + @Override public ConnectionFactoryConfiguration setLoadBalancingPolicyClassName(final String loadBalancingPolicyClassName) { this.loadBalancingPolicyClassName = loadBalancingPolicyClassName; return this; } + @Override public int getTransactionBatchSize() { return transactionBatchSize; } + @Override public ConnectionFactoryConfiguration setTransactionBatchSize(final int transactionBatchSize) { this.transactionBatchSize = transactionBatchSize; return this; } + @Override public int getDupsOKBatchSize() { return dupsOKBatchSize; } + @Override public ConnectionFactoryConfiguration setDupsOKBatchSize(final int dupsOKBatchSize) { this.dupsOKBatchSize = dupsOKBatchSize; return this; @@ -376,82 +428,100 @@ public class ConnectionFactoryConfigurationImpl implements ConnectionFactoryConf return this; } + @Override public boolean isUseGlobalPools() { return useGlobalPools; } + @Override public ConnectionFactoryConfiguration setUseGlobalPools(final boolean useGlobalPools) { this.useGlobalPools = useGlobalPools; return this; } + @Override public int getScheduledThreadPoolMaxSize() { return scheduledThreadPoolMaxSize; } + @Override public ConnectionFactoryConfiguration setScheduledThreadPoolMaxSize(final int scheduledThreadPoolMaxSize) { this.scheduledThreadPoolMaxSize = scheduledThreadPoolMaxSize; return this; } + @Override public int getThreadPoolMaxSize() { return threadPoolMaxSize; } + @Override public ConnectionFactoryConfiguration setThreadPoolMaxSize(final int threadPoolMaxSize) { this.threadPoolMaxSize = threadPoolMaxSize; return this; } + @Override public long getRetryInterval() { return retryInterval; } + @Override public ConnectionFactoryConfiguration setRetryInterval(final long retryInterval) { this.retryInterval = retryInterval; return this; } + @Override public double getRetryIntervalMultiplier() { return retryIntervalMultiplier; } + @Override public ConnectionFactoryConfiguration setRetryIntervalMultiplier(final double retryIntervalMultiplier) { this.retryIntervalMultiplier = retryIntervalMultiplier; return this; } + @Override public long getMaxRetryInterval() { return maxRetryInterval; } + @Override public ConnectionFactoryConfiguration setMaxRetryInterval(final long maxRetryInterval) { this.maxRetryInterval = maxRetryInterval; return this; } + @Override public int getReconnectAttempts() { return reconnectAttempts; } + @Override public ConnectionFactoryConfiguration setReconnectAttempts(final int reconnectAttempts) { this.reconnectAttempts = reconnectAttempts; return this; } + @Override public boolean isFailoverOnInitialConnection() { return failoverOnInitialConnection; } + @Override public ConnectionFactoryConfiguration setFailoverOnInitialConnection(final boolean failover) { failoverOnInitialConnection = failover; return this; } + @Override public String getGroupID() { return groupID; } + @Override public ConnectionFactoryConfiguration setGroupID(final String groupID) { this.groupID = groupID; return this; @@ -744,11 +814,13 @@ public class ConnectionFactoryConfigurationImpl implements ConnectionFactoryConf return size; } + @Override public ConnectionFactoryConfiguration setFactoryType(final JMSFactoryType factoryType) { this.factoryType = factoryType; return this; } + @Override public JMSFactoryType getFactoryType() { return factoryType; }
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/25ae4724/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/server/config/impl/JMSConfigurationImpl.java ---------------------------------------------------------------------- diff --git a/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/server/config/impl/JMSConfigurationImpl.java b/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/server/config/impl/JMSConfigurationImpl.java index 9b36cfc..eb058c6 100644 --- a/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/server/config/impl/JMSConfigurationImpl.java +++ b/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/server/config/impl/JMSConfigurationImpl.java @@ -40,37 +40,45 @@ public class JMSConfigurationImpl implements JMSConfiguration { public JMSConfigurationImpl() { } + @Override public List<ConnectionFactoryConfiguration> getConnectionFactoryConfigurations() { return connectionFactoryConfigurations; } + @Override public JMSConfigurationImpl setConnectionFactoryConfigurations(List<ConnectionFactoryConfiguration> connectionFactoryConfigurations) { this.connectionFactoryConfigurations = connectionFactoryConfigurations; return this; } + @Override public List<JMSQueueConfiguration> getQueueConfigurations() { return queueConfigurations; } + @Override public JMSConfigurationImpl setQueueConfigurations(List<JMSQueueConfiguration> queueConfigurations) { this.queueConfigurations = queueConfigurations; return this; } + @Override public List<TopicConfiguration> getTopicConfigurations() { return topicConfigurations; } + @Override public JMSConfigurationImpl setTopicConfigurations(List<TopicConfiguration> topicConfigurations) { this.topicConfigurations = topicConfigurations; return this; } + @Override public String getDomain() { return domain; } + @Override public JMSConfigurationImpl setDomain(final String domain) { this.domain = domain; return this; http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/25ae4724/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/server/config/impl/JMSQueueConfigurationImpl.java ---------------------------------------------------------------------- diff --git a/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/server/config/impl/JMSQueueConfigurationImpl.java b/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/server/config/impl/JMSQueueConfigurationImpl.java index 52be423..fbf77d4 100644 --- a/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/server/config/impl/JMSQueueConfigurationImpl.java +++ b/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/server/config/impl/JMSQueueConfigurationImpl.java @@ -41,37 +41,45 @@ public class JMSQueueConfigurationImpl implements JMSQueueConfiguration { // QueueConfiguration implementation ----------------------------- + @Override public String[] getBindings() { return bindings; } + @Override public JMSQueueConfigurationImpl setBindings(String... bindings) { this.bindings = bindings; return this; } + @Override public String getName() { return name; } + @Override public JMSQueueConfigurationImpl setName(String name) { this.name = name; return this; } + @Override public String getSelector() { return selector; } + @Override public JMSQueueConfigurationImpl setSelector(String selector) { this.selector = selector; return this; } + @Override public boolean isDurable() { return durable; } + @Override public JMSQueueConfigurationImpl setDurable(boolean durable) { this.durable = durable; return this; http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/25ae4724/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/server/config/impl/TopicConfigurationImpl.java ---------------------------------------------------------------------- diff --git a/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/server/config/impl/TopicConfigurationImpl.java b/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/server/config/impl/TopicConfigurationImpl.java index e8ad224..1617db2 100644 --- a/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/server/config/impl/TopicConfigurationImpl.java +++ b/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/server/config/impl/TopicConfigurationImpl.java @@ -37,19 +37,23 @@ public class TopicConfigurationImpl implements TopicConfiguration { // TopicConfiguration implementation ----------------------------- + @Override public String[] getBindings() { return bindings; } + @Override public TopicConfigurationImpl setBindings(String... bindings) { this.bindings = bindings; return this; } + @Override public String getName() { return name; } + @Override public TopicConfigurationImpl setName(String name) { this.name = name; return this; http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/25ae4724/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/server/impl/JMSServerManagerImpl.java ---------------------------------------------------------------------- diff --git a/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/server/impl/JMSServerManagerImpl.java b/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/server/impl/JMSServerManagerImpl.java index 9361e8e..50c279c 100644 --- a/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/server/impl/JMSServerManagerImpl.java +++ b/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/server/impl/JMSServerManagerImpl.java @@ -166,10 +166,12 @@ public class JMSServerManagerImpl implements JMSServerManager, ActivateCallback // ActivateCallback implementation ------------------------------------- + @Override public void preActivate() { } + @Override public synchronized void activated() { if (!startCalled) { return; @@ -362,6 +364,7 @@ public class JMSServerManagerImpl implements JMSServerManager, ActivateCallback * must already be true. * </ol> */ + @Override public synchronized void start() throws Exception { if (startCalled) { return; @@ -382,6 +385,7 @@ public class JMSServerManagerImpl implements JMSServerManager, ActivateCallback } + @Override public void stop() throws Exception { synchronized (this) { if (!startCalled) { @@ -400,46 +404,56 @@ public class JMSServerManagerImpl implements JMSServerManager, ActivateCallback server.stop(); } + @Override public boolean isStarted() { return server.isStarted(); } // JMSServerManager implementation ------------------------------- + @Override public BindingRegistry getRegistry() { return registry; } + @Override public void setRegistry(BindingRegistry registry) { this.registry = registry; } + @Override public ActiveMQServer getActiveMQServer() { return server; } + @Override public void addAddressSettings(final String address, final AddressSettings addressSettings) { server.getAddressSettingsRepository().addMatch(address, addressSettings); } + @Override public AddressSettings getAddressSettings(final String address) { return server.getAddressSettingsRepository().getMatch(address); } + @Override public void addSecurity(final String addressMatch, final Set<Role> roles) { server.getSecurityRepository().addMatch(addressMatch, roles); } + @Override public Set<Role> getSecurity(final String addressMatch) { return server.getSecurityRepository().getMatch(addressMatch); } + @Override public synchronized String getVersion() { checkInitialised(); return server.getVersion().getFullVersion(); } + @Override public synchronized boolean createQueue(final boolean storeConfig, final String queueName, final String selectorString, @@ -506,6 +520,7 @@ public class JMSServerManagerImpl implements JMSServerManager, ActivateCallback return true; } + @Override public synchronized boolean createTopic(final boolean storeConfig, final String topicName, final String... bindings) throws Exception { @@ -557,6 +572,7 @@ public class JMSServerManagerImpl implements JMSServerManager, ActivateCallback } + @Override public boolean addTopicToBindingRegistry(final String topicName, final String registryBinding) throws Exception { checkInitialised(); @@ -578,18 +594,22 @@ public class JMSServerManagerImpl implements JMSServerManager, ActivateCallback return added; } + @Override public String[] getBindingsOnQueue(String queue) { return getBindingsList(queueBindings, queue); } + @Override public String[] getBindingsOnTopic(String topic) { return getBindingsList(topicBindings, topic); } + @Override public String[] getBindingsOnConnectionFactory(String factoryName) { return getBindingsList(connectionFactoryBindings, factoryName); } + @Override public boolean addQueueToBindingRegistry(final String queueName, final String registryBinding) throws Exception { checkInitialised(); @@ -610,6 +630,7 @@ public class JMSServerManagerImpl implements JMSServerManager, ActivateCallback return added; } + @Override public boolean addConnectionFactoryToBindingRegistry(final String name, final String registryBinding) throws Exception { checkInitialised(); @@ -685,6 +706,7 @@ public class JMSServerManagerImpl implements JMSServerManager, ActivateCallback /* (non-Javadoc) * @see org.apache.activemq.artemis.jms.server.JMSServerManager#removeTopicFromBindings(java.lang.String, java.lang.String) */ + @Override public boolean removeTopicFromBindingRegistry(final String name) throws Exception { final AtomicBoolean valueReturn = new AtomicBoolean(false); @@ -731,10 +753,12 @@ public class JMSServerManagerImpl implements JMSServerManager, ActivateCallback return true; } + @Override public synchronized boolean destroyQueue(final String name) throws Exception { return destroyQueue(name, true); } + @Override public synchronized boolean destroyQueue(final String name, final boolean removeConsumers) throws Exception { checkInitialised(); @@ -760,10 +784,12 @@ public class JMSServerManagerImpl implements JMSServerManager, ActivateCallback } } + @Override public synchronized boolean destroyTopic(final String name) throws Exception { return destroyTopic(name, true); } + @Override public synchronized boolean destroyTopic(final String name, final boolean removeConsumers) throws Exception { checkInitialised(); AddressControl addressControl = (AddressControl) server.getManagementService().getResource(ResourceNames.CORE_ADDRESS + ActiveMQDestination.createTopicAddressFromName(name)); @@ -803,6 +829,7 @@ public class JMSServerManagerImpl implements JMSServerManager, ActivateCallback } } + @Override public synchronized void createConnectionFactory(final String name, final boolean ha, final JMSFactoryType cfType, @@ -817,6 +844,7 @@ public class JMSServerManagerImpl implements JMSServerManager, ActivateCallback } } + @Override public synchronized void createConnectionFactory(final String name, final boolean ha, JMSFactoryType cfType, @@ -861,6 +889,7 @@ public class JMSServerManagerImpl implements JMSServerManager, ActivateCallback } } + @Override public synchronized void createConnectionFactory(final String name, final boolean ha, final JMSFactoryType cfType, @@ -904,6 +933,7 @@ public class JMSServerManagerImpl implements JMSServerManager, ActivateCallback } } + @Override public synchronized void createConnectionFactory(final String name, final boolean ha, final JMSFactoryType cfType, @@ -917,6 +947,7 @@ public class JMSServerManagerImpl implements JMSServerManager, ActivateCallback } } + @Override public synchronized ActiveMQConnectionFactory recreateCF(String name, ConnectionFactoryConfiguration cf) throws Exception { List<String> bindings = connectionFactoryBindings.get(name); @@ -941,6 +972,7 @@ public class JMSServerManagerImpl implements JMSServerManager, ActivateCallback return realCF; } + @Override public synchronized void createConnectionFactory(final boolean storeConfig, final ConnectionFactoryConfiguration cfConfig, final String... bindings) throws Exception { @@ -1171,6 +1203,7 @@ public class JMSServerManagerImpl implements JMSServerManager, ActivateCallback return cf; } + @Override public synchronized boolean destroyConnectionFactory(final String name) throws Exception { final AtomicBoolean valueReturn = new AtomicBoolean(false); @@ -1220,40 +1253,48 @@ public class JMSServerManagerImpl implements JMSServerManager, ActivateCallback return true; } + @Override public String[] listRemoteAddresses() throws Exception { checkInitialised(); return server.getActiveMQServerControl().listRemoteAddresses(); } + @Override public String[] listRemoteAddresses(final String ipAddress) throws Exception { checkInitialised(); return server.getActiveMQServerControl().listRemoteAddresses(ipAddress); } + @Override public boolean closeConnectionsForAddress(final String ipAddress) throws Exception { checkInitialised(); return server.getActiveMQServerControl().closeConnectionsForAddress(ipAddress); } + @Override public boolean closeConsumerConnectionsForAddress(final String address) throws Exception { checkInitialised(); return server.getActiveMQServerControl().closeConsumerConnectionsForAddress(address); } + @Override public boolean closeConnectionsForUser(final String userName) throws Exception { checkInitialised(); return server.getActiveMQServerControl().closeConnectionsForUser(userName); } + @Override public String[] listConnectionIDs() throws Exception { return server.getActiveMQServerControl().listConnectionIDs(); } + @Override public String[] listSessions(final String connectionID) throws Exception { checkInitialised(); return server.getActiveMQServerControl().listSessions(connectionID); } + @Override public String listPreparedTransactionDetailsAsJSON() throws Exception { ResourceManager resourceManager = server.getResourceManager(); Map<Xid, Long> xids = resourceManager.getPreparedTransactionsWithCreationTime(); @@ -1263,6 +1304,7 @@ public class JMSServerManagerImpl implements JMSServerManager, ActivateCallback ArrayList<Entry<Xid, Long>> xidsSortedByCreationTime = new ArrayList<Map.Entry<Xid, Long>>(xids.entrySet()); Collections.sort(xidsSortedByCreationTime, new Comparator<Entry<Xid, Long>>() { + @Override public int compare(final Entry<Xid, Long> entry1, final Entry<Xid, Long> entry2) { // sort by creation time, oldest first return (int) (entry1.getValue() - entry2.getValue()); @@ -1282,6 +1324,7 @@ public class JMSServerManagerImpl implements JMSServerManager, ActivateCallback return txDetailListJson.toString(); } + @Override public String listPreparedTransactionDetailsAsHTML() throws Exception { ResourceManager resourceManager = server.getResourceManager(); Map<Xid, Long> xids = resourceManager.getPreparedTransactionsWithCreationTime(); @@ -1291,6 +1334,7 @@ public class JMSServerManagerImpl implements JMSServerManager, ActivateCallback ArrayList<Entry<Xid, Long>> xidsSortedByCreationTime = new ArrayList<Map.Entry<Xid, Long>>(xids.entrySet()); Collections.sort(xidsSortedByCreationTime, new Comparator<Entry<Xid, Long>>() { + @Override public int compare(final Entry<Xid, Long> entry1, final Entry<Xid, Long> entry2) { // sort by creation time, oldest first return (int) (entry1.getValue() - entry2.getValue()); @@ -1536,6 +1580,7 @@ public class JMSServerManagerImpl implements JMSServerManager, ActivateCallback private abstract class WrappedRunnable implements Runnable { + @Override public void run() { try { runException(); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/25ae4724/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/server/management/impl/JMSManagementServiceImpl.java ---------------------------------------------------------------------- diff --git a/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/server/management/impl/JMSManagementServiceImpl.java b/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/server/management/impl/JMSManagementServiceImpl.java index 9549d91..0c56e24 100644 --- a/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/server/management/impl/JMSManagementServiceImpl.java +++ b/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/server/management/impl/JMSManagementServiceImpl.java @@ -64,6 +64,7 @@ public class JMSManagementServiceImpl implements JMSManagementService { // JMSManagementRegistration implementation ---------------------- + @Override public synchronized JMSServerControl registerJMSServer(final JMSServerManager server) throws Exception { ObjectName objectName = managementService.getObjectNameBuilder().getJMSServerObjectName(); JMSServerControlImpl control = new JMSServerControlImpl(server); @@ -72,12 +73,14 @@ public class JMSManagementServiceImpl implements JMSManagementService { return control; } + @Override public synchronized void unregisterJMSServer() throws Exception { ObjectName objectName = managementService.getObjectNameBuilder().getJMSServerObjectName(); managementService.unregisterFromJMX(objectName); managementService.unregisterFromRegistry(ResourceNames.JMS_SERVER); } + @Override public synchronized void registerQueue(final ActiveMQQueue queue, final Queue serverQueue) throws Exception { QueueControl coreQueueControl = (QueueControl) managementService.getResource(ResourceNames.CORE_QUEUE + queue.getAddress()); MessageCounterManager messageCounterManager = managementService.getMessageCounterManager(); @@ -89,12 +92,14 @@ public class JMSManagementServiceImpl implements JMSManagementService { managementService.registerInRegistry(ResourceNames.JMS_QUEUE + queue.getQueueName(), control); } + @Override public synchronized void unregisterQueue(final String name) throws Exception { ObjectName objectName = managementService.getObjectNameBuilder().getJMSQueueObjectName(name); managementService.unregisterFromJMX(objectName); managementService.unregisterFromRegistry(ResourceNames.JMS_QUEUE + name); } + @Override public synchronized void registerTopic(final ActiveMQTopic topic) throws Exception { ObjectName objectName = managementService.getObjectNameBuilder().getJMSTopicObjectName(topic.getTopicName()); AddressControl addressControl = (AddressControl) managementService.getResource(ResourceNames.CORE_ADDRESS + topic.getAddress()); @@ -103,12 +108,14 @@ public class JMSManagementServiceImpl implements JMSManagementService { managementService.registerInRegistry(ResourceNames.JMS_TOPIC + topic.getTopicName(), control); } + @Override public synchronized void unregisterTopic(final String name) throws Exception { ObjectName objectName = managementService.getObjectNameBuilder().getJMSTopicObjectName(name); managementService.unregisterFromJMX(objectName); managementService.unregisterFromRegistry(ResourceNames.JMS_TOPIC + name); } + @Override public synchronized void registerConnectionFactory(final String name, final ConnectionFactoryConfiguration cfConfig, final ActiveMQConnectionFactory connectionFactory) throws Exception { @@ -118,12 +125,14 @@ public class JMSManagementServiceImpl implements JMSManagementService { managementService.registerInRegistry(ResourceNames.JMS_CONNECTION_FACTORY + name, control); } + @Override public synchronized void unregisterConnectionFactory(final String name) throws Exception { ObjectName objectName = managementService.getObjectNameBuilder().getConnectionFactoryObjectName(name); managementService.unregisterFromJMX(objectName); managementService.unregisterFromRegistry(ResourceNames.JMS_CONNECTION_FACTORY + name); } + @Override public void stop() throws Exception { for (Object resource : managementService.getResources(ConnectionFactoryControl.class)) { unregisterConnectionFactory(((ConnectionFactoryControl) resource).getName()); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/25ae4724/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/AbstractSequentialFile.java ---------------------------------------------------------------------- diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/AbstractSequentialFile.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/AbstractSequentialFile.java index dcb22ab..4314267 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/AbstractSequentialFile.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/AbstractSequentialFile.java @@ -78,14 +78,17 @@ public abstract class AbstractSequentialFile implements SequentialFile { // Public -------------------------------------------------------- + @Override public final boolean exists() { return file.exists(); } + @Override public final String getFileName() { return file.getName(); } + @Override public final void delete() throws IOException, InterruptedException, ActiveMQException { if (isOpen()) { close(); @@ -96,6 +99,7 @@ public abstract class AbstractSequentialFile implements SequentialFile { } } + @Override public void copyTo(SequentialFile newFileName) throws Exception { try { ActiveMQJournalLogger.LOGGER.debug("Copying " + this + " as " + newFileName); @@ -134,10 +138,12 @@ public abstract class AbstractSequentialFile implements SequentialFile { position.set(pos); } + @Override public long position() { return position.get(); } + @Override public final void renameTo(final String newFileName) throws IOException, InterruptedException, ActiveMQException { try { close(); @@ -161,11 +167,13 @@ public abstract class AbstractSequentialFile implements SequentialFile { * @throws IOException we declare throwing IOException because sub-classes need to do it * @throws ActiveMQException */ + @Override public synchronized void close() throws IOException, InterruptedException, ActiveMQException { final CountDownLatch donelatch = new CountDownLatch(1); if (writerExecutor != null) { writerExecutor.execute(new Runnable() { + @Override public void run() { donelatch.countDown(); } @@ -177,6 +185,7 @@ public abstract class AbstractSequentialFile implements SequentialFile { } } + @Override public final boolean fits(final int size) { if (timedBuffer == null) { return position.get() + size <= fileSize; @@ -186,6 +195,7 @@ public abstract class AbstractSequentialFile implements SequentialFile { } } + @Override public void setTimedBuffer(final TimedBuffer buffer) { if (timedBuffer != null) { timedBuffer.setObserver(null); @@ -199,6 +209,7 @@ public abstract class AbstractSequentialFile implements SequentialFile { } + @Override public void write(final ActiveMQBuffer bytes, final boolean sync, final IOCallback callback) throws IOException { if (timedBuffer != null) { bytes.setIndex(0, bytes.capacity()); @@ -212,6 +223,7 @@ public abstract class AbstractSequentialFile implements SequentialFile { } } + @Override public void write(final ActiveMQBuffer bytes, final boolean sync) throws IOException, InterruptedException, ActiveMQException { if (sync) { @@ -226,6 +238,7 @@ public abstract class AbstractSequentialFile implements SequentialFile { } } + @Override public void write(final EncodingSupport bytes, final boolean sync, final IOCallback callback) { if (timedBuffer != null) { timedBuffer.addBytes(bytes, sync, callback); @@ -244,6 +257,7 @@ public abstract class AbstractSequentialFile implements SequentialFile { } } + @Override public void write(final EncodingSupport bytes, final boolean sync) throws InterruptedException, ActiveMQException { if (sync) { SimpleWaitIOCallback completion = new SimpleWaitIOCallback(); @@ -269,6 +283,7 @@ public abstract class AbstractSequentialFile implements SequentialFile { this.delegates = delegates; } + @Override public void done() { for (IOCallback callback : delegates) { try { @@ -280,6 +295,7 @@ public abstract class AbstractSequentialFile implements SequentialFile { } } + @Override public void onError(final int errorCode, final String errorMessage) { for (IOCallback callback : delegates) { try { @@ -303,6 +319,7 @@ public abstract class AbstractSequentialFile implements SequentialFile { protected class LocalBufferObserver implements TimedBufferObserver { + @Override public void flushBuffer(final ByteBuffer buffer, final boolean requestedSync, final List<IOCallback> callbacks) { buffer.flip(); @@ -314,10 +331,12 @@ public abstract class AbstractSequentialFile implements SequentialFile { } } + @Override public ByteBuffer newBuffer(final int size, final int limit) { return AbstractSequentialFile.this.newBuffer(size, limit); } + @Override public int getRemainingBytes() { if (fileSize - position.get() > Integer.MAX_VALUE) { return Integer.MAX_VALUE; http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/25ae4724/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/AbstractSequentialFileFactory.java ---------------------------------------------------------------------- diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/AbstractSequentialFileFactory.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/AbstractSequentialFileFactory.java index 16589bf..9f9a883 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/AbstractSequentialFileFactory.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/AbstractSequentialFileFactory.java @@ -82,6 +82,7 @@ public abstract class AbstractSequentialFileFactory implements SequentialFileFac this.maxIO = maxIO; } + @Override public void stop() { if (timedBuffer != null) { timedBuffer.stop(); @@ -106,6 +107,7 @@ public abstract class AbstractSequentialFileFactory implements SequentialFileFac return journalDir; } + @Override public void start() { if (timedBuffer != null) { timedBuffer.start(); @@ -121,6 +123,7 @@ public abstract class AbstractSequentialFileFactory implements SequentialFileFac } } + @Override public int getMaxIO() { return maxIO; } @@ -139,12 +142,14 @@ public abstract class AbstractSequentialFileFactory implements SequentialFileFac } } + @Override public void flush() { if (timedBuffer != null) { timedBuffer.flush(); } } + @Override public void deactivateBuffer() { if (timedBuffer != null) { // When moving to a new file, we need to make sure any pending buffer will be transferred to the buffer @@ -153,12 +158,14 @@ public abstract class AbstractSequentialFileFactory implements SequentialFileFac } } + @Override public void releaseBuffer(final ByteBuffer buffer) { } /** * Create the directory if it doesn't exist yet */ + @Override public void createDirs() throws Exception { boolean ok = journalDir.mkdirs(); if (!ok) { @@ -166,8 +173,10 @@ public abstract class AbstractSequentialFileFactory implements SequentialFileFac } } + @Override public List<String> listFiles(final String extension) throws Exception { FilenameFilter fnf = new FilenameFilter() { + @Override public boolean accept(final File file, final String name) { return name.endsWith("." + extension); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/25ae4724/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/DummyCallback.java ---------------------------------------------------------------------- diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/DummyCallback.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/DummyCallback.java index cf59fc4..2ee5186 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/DummyCallback.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/DummyCallback.java @@ -27,9 +27,11 @@ public class DummyCallback extends SyncIOCompletion { return DummyCallback.instance; } + @Override public void done() { } + @Override public void onError(final int errorCode, final String errorMessage) { ActiveMQJournalLogger.LOGGER.errorWritingData(new Exception(errorMessage), errorMessage, errorCode); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/25ae4724/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFile.java ---------------------------------------------------------------------- diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFile.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFile.java index e011b08..9bac49d 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFile.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFile.java @@ -72,16 +72,19 @@ public class AIOSequentialFile extends AbstractSequentialFile { this.aioFactory = factory; } + @Override public boolean isOpen() { return opened; } + @Override public int getAlignment() { checkOpened(); return aioFile.getBlockSize(); } + @Override public int calculateBlockStart(final int position) { int alignment = getAlignment(); @@ -90,6 +93,7 @@ public class AIOSequentialFile extends AbstractSequentialFile { return pos; } + @Override public SequentialFile cloneFile() { return new AIOSequentialFile(aioFactory, -1, -1, getFile().getParentFile(), getFile().getName(), writerExecutor); } @@ -114,6 +118,7 @@ public class AIOSequentialFile extends AbstractSequentialFile { aioFile = null; } + @Override public synchronized void fill(final int size) throws Exception { checkOpened(); aioFile.fill(size); @@ -121,10 +126,12 @@ public class AIOSequentialFile extends AbstractSequentialFile { fileSize = aioFile.getSize(); } + @Override public void open() throws Exception { open(aioFactory.getMaxIO(), true); } + @Override public synchronized void open(final int maxIO, final boolean useExecutor) throws ActiveMQException { opened = true; @@ -141,6 +148,7 @@ public class AIOSequentialFile extends AbstractSequentialFile { fileSize = aioFile.getSize(); } + @Override public int read(final ByteBuffer bytes, final IOCallback callback) throws ActiveMQException { checkOpened(); int bytesToRead = bytes.limit(); @@ -163,6 +171,7 @@ public class AIOSequentialFile extends AbstractSequentialFile { return bytesToRead; } + @Override public int read(final ByteBuffer bytes) throws Exception { SimpleWaitIOCallback waitCompletion = new SimpleWaitIOCallback(); @@ -173,6 +182,7 @@ public class AIOSequentialFile extends AbstractSequentialFile { return bytesRead; } + @Override public void writeDirect(final ByteBuffer bytes, final boolean sync) throws Exception { if (sync) { SimpleWaitIOCallback completion = new SimpleWaitIOCallback(); @@ -189,6 +199,7 @@ public class AIOSequentialFile extends AbstractSequentialFile { /** * Note: Parameter sync is not used on AIO */ + @Override public void writeDirect(final ByteBuffer bytes, final boolean sync, final IOCallback callback) { checkOpened(); @@ -240,10 +251,12 @@ public class AIOSequentialFile extends AbstractSequentialFile { } } + @Override public void sync() { throw new UnsupportedOperationException("This method is not supported on AIO"); } + @Override public long size() throws Exception { if (aioFile == null) { return getFile().length(); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/25ae4724/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFileFactory.java ---------------------------------------------------------------------- diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFileFactory.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFileFactory.java index 0af2a0e..92d1b3b 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFileFactory.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFileFactory.java @@ -106,10 +106,12 @@ public final class AIOSequentialFileFactory extends AbstractSequentialFileFactor this.reuseBuffers = false; } + @Override public SequentialFile createSequentialFile(final String fileName) { return new AIOSequentialFile(this, bufferSize, bufferTimeout, journalDir, fileName, writeExecutor); } + @Override public boolean isSupportsCallbacks() { return true; } @@ -118,6 +120,7 @@ public final class AIOSequentialFileFactory extends AbstractSequentialFileFactor return LibaioContext.isLoaded(); } + @Override public ByteBuffer allocateDirectBuffer(final int size) { int blocks = size / 512; @@ -133,10 +136,12 @@ public final class AIOSequentialFileFactory extends AbstractSequentialFileFactor return buffer; } + @Override public void releaseDirectBuffer(final ByteBuffer buffer) { LibaioContext.freeBuffer(buffer); } + @Override public ByteBuffer newBuffer(int size) { if (size % 512 != 0) { size = (size / 512 + 1) * 512; @@ -145,22 +150,26 @@ public final class AIOSequentialFileFactory extends AbstractSequentialFileFactor return buffersControl.newBuffer(size); } + @Override public void clearBuffer(final ByteBuffer directByteBuffer) { directByteBuffer.position(0); libaioContext.memsetBuffer(directByteBuffer); } + @Override public int getAlignment() { return 512; } // For tests only + @Override public ByteBuffer wrapBuffer(final byte[] bytes) { ByteBuffer newbuffer = newBuffer(bytes.length); newbuffer.put(bytes); return newbuffer; } + @Override public int calculateBlockSize(final int position) { int alignment = getAlignment(); @@ -263,6 +272,7 @@ public final class AIOSequentialFileFactory extends AbstractSequentialFileFactor return this; } + @Override public void run() { try { libaioFile.write(position, bytes, buffer, this); @@ -272,6 +282,7 @@ public final class AIOSequentialFileFactory extends AbstractSequentialFileFactor } } + @Override public int compareTo(AIOSequentialCallback other) { if (this == other || this.writeSequence == other.writeSequence) { return 0; @@ -309,6 +320,7 @@ public final class AIOSequentialFileFactory extends AbstractSequentialFileFactor /** * this is called by libaio. */ + @Override public void done() { this.sequentialFile.done(this); } @@ -338,6 +350,7 @@ public final class AIOSequentialFileFactory extends AbstractSequentialFileFactor private class PollerRunnable implements Runnable { + @Override public void run() { libaioContext.poll(); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/25ae4724/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/buffer/TimedBuffer.java ---------------------------------------------------------------------- diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/buffer/TimedBuffer.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/buffer/TimedBuffer.java index d72717a..20761a5 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/buffer/TimedBuffer.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/buffer/TimedBuffer.java @@ -365,6 +365,7 @@ public class TimedBuffer { final int sleepMillis = timeout / 1000000; // truncates final int sleepNanos = timeout % 1000000; + @Override public void run() { long lastFlushTime = 0; http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/25ae4724/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/nio/NIOSequentialFile.java ---------------------------------------------------------------------- diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/nio/NIOSequentialFile.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/nio/NIOSequentialFile.java index ae93a31..548b9a3 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/nio/NIOSequentialFile.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/nio/NIOSequentialFile.java @@ -60,14 +60,17 @@ public final class NIOSequentialFile extends AbstractSequentialFile { defaultMaxIO = maxIO; } + @Override public int getAlignment() { return 1; } + @Override public int calculateBlockStart(final int position) { return position; } + @Override public synchronized boolean isOpen() { return channel != null; } @@ -76,10 +79,12 @@ public final class NIOSequentialFile extends AbstractSequentialFile { * this.maxIO represents the default maxIO. * Some operations while initializing files on the journal may require a different maxIO */ + @Override public synchronized void open() throws IOException { open(defaultMaxIO, true); } + @Override public void open(final int maxIO, final boolean useExecutor) throws IOException { try { rfile = new RandomAccessFile(getFile(), "rw"); @@ -99,6 +104,7 @@ public final class NIOSequentialFile extends AbstractSequentialFile { } } + @Override public void fill(final int size) throws IOException { ByteBuffer bb = ByteBuffer.allocate(size); @@ -150,10 +156,12 @@ public final class NIOSequentialFile extends AbstractSequentialFile { notifyAll(); } + @Override public int read(final ByteBuffer bytes) throws Exception { return read(bytes, null); } + @Override public synchronized int read(final ByteBuffer bytes, final IOCallback callback) throws IOException, ActiveMQIllegalStateException { try { @@ -181,6 +189,7 @@ public final class NIOSequentialFile extends AbstractSequentialFile { } } + @Override public void sync() throws IOException { if (channel != null) { try { @@ -193,6 +202,7 @@ public final class NIOSequentialFile extends AbstractSequentialFile { } } + @Override public long size() throws IOException { if (channel == null) { return getFile().length(); @@ -224,10 +234,12 @@ public final class NIOSequentialFile extends AbstractSequentialFile { return "NIOSequentialFile " + getFile(); } + @Override public SequentialFile cloneFile() { return new NIOSequentialFile(factory, directory, getFileName(), maxIO, writerExecutor); } + @Override public void writeDirect(final ByteBuffer bytes, final boolean sync, final IOCallback callback) { if (callback == null) { throw new NullPointerException("callback parameter need to be set"); @@ -241,6 +253,7 @@ public final class NIOSequentialFile extends AbstractSequentialFile { } } + @Override public void writeDirect(final ByteBuffer bytes, final boolean sync) throws Exception { internalWrite(bytes, sync, null); } @@ -287,6 +300,7 @@ public final class NIOSequentialFile extends AbstractSequentialFile { maxIOSemaphore.acquire(); writerExecutor.execute(new Runnable() { + @Override public void run() { try { try { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/25ae4724/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/nio/NIOSequentialFileFactory.java ---------------------------------------------------------------------- diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/nio/NIOSequentialFileFactory.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/nio/NIOSequentialFileFactory.java index 8880f1a..d949f0b 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/nio/NIOSequentialFileFactory.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/nio/NIOSequentialFileFactory.java @@ -65,14 +65,17 @@ public class NIOSequentialFileFactory extends AbstractSequentialFileFactory { super(journalDir, buffered, bufferSize, bufferTimeout, maxIO, logRates, listener); } + @Override public SequentialFile createSequentialFile(final String fileName) { return new NIOSequentialFile(this, journalDir, fileName, maxIO, writeExecutor); } + @Override public boolean isSupportsCallbacks() { return timedBuffer != null; } + @Override public ByteBuffer allocateDirectBuffer(final int size) { // Using direct buffer, as described on https://jira.jboss.org/browse/HORNETQ-467 ByteBuffer buffer2 = null; @@ -101,14 +104,17 @@ public class NIOSequentialFileFactory extends AbstractSequentialFileFactory { return buffer2; } + @Override public void releaseDirectBuffer(ByteBuffer buffer) { // nothing we can do on this case. we can just have good faith on GC } + @Override public ByteBuffer newBuffer(final int size) { return ByteBuffer.allocate(size); } + @Override public void clearBuffer(final ByteBuffer buffer) { final int limit = buffer.limit(); buffer.rewind(); @@ -120,14 +126,17 @@ public class NIOSequentialFileFactory extends AbstractSequentialFileFactory { buffer.rewind(); } + @Override public ByteBuffer wrapBuffer(final byte[] bytes) { return ByteBuffer.wrap(bytes); } + @Override public int getAlignment() { return 1; } + @Override public int calculateBlockSize(final int bytes) { return bytes; } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/25ae4724/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/TestableJournal.java ---------------------------------------------------------------------- diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/TestableJournal.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/TestableJournal.java index 4f8dc4a..dfe7e56 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/TestableJournal.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/TestableJournal.java @@ -32,6 +32,7 @@ public interface TestableJournal extends Journal { void debugWait() throws Exception; + @Override int getFileSize(); int getMinFiles(); @@ -42,6 +43,7 @@ public interface TestableJournal extends Journal { int getMaxAIO(); + @Override void forceMoveNextFile() throws Exception; void setAutoReclaim(boolean autoReclaim); @@ -63,5 +65,6 @@ public interface TestableJournal extends Journal { */ boolean checkReclaimStatus() throws Exception; + @Override JournalFile[] getDataFiles(); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/25ae4724/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalBase.java ---------------------------------------------------------------------- diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalBase.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalBase.java index 5a844f3..50632fa 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalBase.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalBase.java @@ -36,54 +36,65 @@ abstract class JournalBase implements Journal { this.fileSize = fileSize; } + @Override public abstract void appendAddRecord(final long id, final byte recordType, final EncodingSupport record, final boolean sync, final IOCompletion callback) throws Exception; + @Override public abstract void appendAddRecordTransactional(final long txID, final long id, final byte recordType, final EncodingSupport record) throws Exception; + @Override public abstract void appendCommitRecord(final long txID, final boolean sync, final IOCompletion callback, boolean lineUpContext) throws Exception; + @Override public abstract void appendDeleteRecord(final long id, final boolean sync, final IOCompletion callback) throws Exception; + @Override public abstract void appendDeleteRecordTransactional(final long txID, final long id, final EncodingSupport record) throws Exception; + @Override public abstract void appendPrepareRecord(final long txID, final EncodingSupport transactionData, final boolean sync, final IOCompletion callback) throws Exception; + @Override public abstract void appendUpdateRecord(final long id, final byte recordType, final EncodingSupport record, final boolean sync, final IOCompletion callback) throws Exception; + @Override public abstract void appendUpdateRecordTransactional(final long txID, final long id, final byte recordType, final EncodingSupport record) throws Exception; + @Override public abstract void appendRollbackRecord(final long txID, final boolean sync, final IOCompletion callback) throws Exception; + @Override public void appendAddRecord(long id, byte recordType, byte[] record, boolean sync) throws Exception { appendAddRecord(id, recordType, new ByteArrayEncoding(record), sync); } + @Override public void appendAddRecord(long id, byte recordType, EncodingSupport record, boolean sync) throws Exception { SyncIOCompletion callback = getSyncCallback(sync); @@ -94,6 +105,7 @@ abstract class JournalBase implements Journal { } } + @Override public void appendCommitRecord(final long txID, final boolean sync) throws Exception { SyncIOCompletion syncCompletion = getSyncCallback(sync); @@ -104,10 +116,12 @@ abstract class JournalBase implements Journal { } } + @Override public void appendCommitRecord(final long txID, final boolean sync, final IOCompletion callback) throws Exception { appendCommitRecord(txID, sync, callback, true); } + @Override public void appendUpdateRecord(final long id, final byte recordType, final byte[] record, @@ -115,6 +129,7 @@ abstract class JournalBase implements Journal { appendUpdateRecord(id, recordType, new ByteArrayEncoding(record), sync); } + @Override public void appendUpdateRecordTransactional(final long txID, final long id, final byte recordType, @@ -122,6 +137,7 @@ abstract class JournalBase implements Journal { appendUpdateRecordTransactional(txID, id, recordType, new ByteArrayEncoding(record)); } + @Override public void appendAddRecordTransactional(final long txID, final long id, final byte recordType, @@ -129,14 +145,17 @@ abstract class JournalBase implements Journal { appendAddRecordTransactional(txID, id, recordType, new ByteArrayEncoding(record)); } + @Override public void appendDeleteRecordTransactional(final long txID, final long id) throws Exception { appendDeleteRecordTransactional(txID, id, NullEncoding.instance); } + @Override public void appendPrepareRecord(final long txID, final byte[] transactionData, final boolean sync) throws Exception { appendPrepareRecord(txID, new ByteArrayEncoding(transactionData), sync); } + @Override public void appendPrepareRecord(final long txID, final EncodingSupport transactionData, final boolean sync) throws Exception { @@ -149,10 +168,12 @@ abstract class JournalBase implements Journal { } } + @Override public void appendDeleteRecordTransactional(final long txID, final long id, final byte[] record) throws Exception { appendDeleteRecordTransactional(txID, id, new ByteArrayEncoding(record)); } + @Override public void appendUpdateRecord(final long id, final byte recordType, final EncodingSupport record, @@ -166,6 +187,7 @@ abstract class JournalBase implements Journal { } } + @Override public void appendRollbackRecord(final long txID, final boolean sync) throws Exception { SyncIOCompletion syncCompletion = getSyncCallback(sync); @@ -177,6 +199,7 @@ abstract class JournalBase implements Journal { } + @Override public void appendDeleteRecord(final long id, final boolean sync) throws Exception { SyncIOCompletion callback = getSyncCallback(sync); @@ -203,19 +226,23 @@ abstract class JournalBase implements Journal { private static NullEncoding instance = new NullEncoding(); + @Override public void decode(final ActiveMQBuffer buffer) { // no-op } + @Override public void encode(final ActiveMQBuffer buffer) { // no-op } + @Override public int getEncodeSize() { return 0; } } + @Override public int getFileSize() { return fileSize; } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/25ae4724/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalCompactor.java ---------------------------------------------------------------------- diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalCompactor.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalCompactor.java index 0216c19..3637f56 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalCompactor.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalCompactor.java @@ -254,6 +254,7 @@ public class JournalCompactor extends AbstractJournalUpdateTask implements Journ // JournalReaderCallback implementation ------------------------------------------- + @Override public void onReadAddRecord(final RecordInfo info) throws Exception { if (lookupRecord(info.id)) { JournalInternalRecord addRecord = new JournalAddRecord(true, info.id, info.getUserRecordType(), new ByteArrayEncoding(info.data)); @@ -267,6 +268,7 @@ public class JournalCompactor extends AbstractJournalUpdateTask implements Journ } } + @Override public void onReadAddRecordTX(final long transactionID, final RecordInfo info) throws Exception { if (pendingTransactions.get(transactionID) != null || lookupRecord(info.id)) { JournalTransaction newTransaction = getNewJournalTransaction(transactionID); @@ -283,6 +285,7 @@ public class JournalCompactor extends AbstractJournalUpdateTask implements Journ } } + @Override public void onReadCommitRecord(final long transactionID, final int numberOfRecords) throws Exception { if (pendingTransactions.get(transactionID) != null) { @@ -303,6 +306,7 @@ public class JournalCompactor extends AbstractJournalUpdateTask implements Journ } } + @Override public void onReadDeleteRecord(final long recordID) throws Exception { if (newRecords.get(recordID) != null) { // Sanity check, it should never happen @@ -311,6 +315,7 @@ public class JournalCompactor extends AbstractJournalUpdateTask implements Journ } + @Override public void onReadDeleteRecordTX(final long transactionID, final RecordInfo info) throws Exception { if (pendingTransactions.get(transactionID) != null) { JournalTransaction newTransaction = getNewJournalTransaction(transactionID); @@ -326,10 +331,12 @@ public class JournalCompactor extends AbstractJournalUpdateTask implements Journ // else.. nothing to be done } + @Override public void markAsDataFile(final JournalFile file) { // nothing to be done here } + @Override public void onReadPrepareRecord(final long transactionID, final byte[] extraData, final int numberOfRecords) throws Exception { @@ -348,6 +355,7 @@ public class JournalCompactor extends AbstractJournalUpdateTask implements Journ } } + @Override public void onReadRollbackRecord(final long transactionID) throws Exception { if (pendingTransactions.get(transactionID) != null) { // Sanity check, this should never happen @@ -370,6 +378,7 @@ public class JournalCompactor extends AbstractJournalUpdateTask implements Journ } } + @Override public void onReadUpdateRecord(final RecordInfo info) throws Exception { if (lookupRecord(info.id)) { JournalInternalRecord updateRecord = new JournalAddRecord(false, info.id, info.userRecordType, new ByteArrayEncoding(info.data)); @@ -391,6 +400,7 @@ public class JournalCompactor extends AbstractJournalUpdateTask implements Journ } } + @Override public void onReadUpdateRecordTX(final long transactionID, final RecordInfo info) throws Exception { if (pendingTransactions.get(transactionID) != null || lookupRecord(info.id)) { JournalTransaction newTransaction = getNewJournalTransaction(transactionID); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/25ae4724/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalFileImpl.java ---------------------------------------------------------------------- diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalFileImpl.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalFileImpl.java index 4cc23f1..4c766b7 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalFileImpl.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalFileImpl.java @@ -55,6 +55,7 @@ public class JournalFileImpl implements JournalFile { recordID = (int) (fileID & Integer.MAX_VALUE); } + @Override public int getPosCount() { return posCount.intValue(); } @@ -69,6 +70,7 @@ public class JournalFileImpl implements JournalFile { this.canReclaim = canReclaim; } + @Override public void incNegCount(final JournalFile file) { if (file != this) { totalNegativeToOthers.incrementAndGet(); @@ -76,6 +78,7 @@ public class JournalFileImpl implements JournalFile { getOrCreateNegCount(file).incrementAndGet(); } + @Override public int getNegCount(final JournalFile file) { AtomicInteger count = negCounts.get(file); @@ -87,14 +90,17 @@ public class JournalFileImpl implements JournalFile { } } + @Override public int getJournalVersion() { return version; } + @Override public void incPosCount() { posCount.incrementAndGet(); } + @Override public void decPosCount() { posCount.decrementAndGet(); } @@ -103,10 +109,12 @@ public class JournalFileImpl implements JournalFile { return offset; } + @Override public long getFileID() { return fileID; } + @Override public int getRecordID() { return recordID; } @@ -115,6 +123,7 @@ public class JournalFileImpl implements JournalFile { this.offset = offset; } + @Override public SequentialFile getFile() { return file; } @@ -169,6 +178,7 @@ public class JournalFileImpl implements JournalFile { return liveBytes.get(); } + @Override public int getTotalNegativeToOthers() { return totalNegativeToOthers.get(); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/25ae4724/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalFilesRepository.java ---------------------------------------------------------------------- diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalFilesRepository.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalFilesRepository.java index a837dd3..159690d 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalFilesRepository.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalFilesRepository.java @@ -86,6 +86,7 @@ public class JournalFilesRepository { private Executor openFilesExecutor; private final Runnable pushOpenRunnable = new Runnable() { + @Override public void run() { try { pushOpenedFile(); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/25ae4724/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java ---------------------------------------------------------------------- diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java index 865cf72..f8aa784 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java @@ -267,6 +267,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal return "JournalImpl(state=" + state + ", currentFile=[" + currentFile + "], hash=" + super.toString() + ")"; } + @Override public void runDirectJournalBlast() throws Exception { final int numIts = 100000000; @@ -276,14 +277,17 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal class MyAIOCallback implements IOCompletion { + @Override public void done() { latch.countDown(); } + @Override public void onError(final int errorCode, final String errorMessage) { } + @Override public void storeLineUp() { } } @@ -296,13 +300,16 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal class MyRecord implements EncodingSupport { + @Override public void decode(final ActiveMQBuffer buffer) { } + @Override public void encode(final ActiveMQBuffer buffer) { buffer.writeBytes(bytes); } + @Override public int getEncodeSize() { return recordSize; } @@ -319,14 +326,17 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal latch.await(); } + @Override public Map<Long, JournalRecord> getRecords() { return records; } + @Override public JournalFile getCurrentFile() { return currentFile; } + @Override public JournalCompactor getCompactor() { return compactor; } @@ -1101,6 +1111,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal } // XXX make it protected? + @Override public int getAlignment() throws Exception { return fileFactory.getAlignment(); } @@ -1109,33 +1120,41 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal static final LoaderCallback INSTANCE = new DummyLoader(); + @Override public void failedTransaction(final long transactionID, final List<RecordInfo> records, final List<RecordInfo> recordsToDelete) { } + @Override public void updateRecord(final RecordInfo info) { } + @Override public void deleteRecord(final long id) { } + @Override public void addRecord(final RecordInfo info) { } + @Override public void addPreparedTransaction(final PreparedTransactionInfo preparedTransaction) { } } + @Override public synchronized JournalLoadInformation loadInternalOnly() throws Exception { return load(DummyLoader.INSTANCE, true, null); } + @Override public synchronized JournalLoadInformation loadSyncOnly(JournalState syncState) throws Exception { assert syncState == JournalState.SYNCING || syncState == JournalState.SYNCING_UP_TO_DATE; return load(DummyLoader.INSTANCE, true, syncState); } + @Override public JournalLoadInformation load(final List<RecordInfo> committedRecords, final List<PreparedTransactionInfo> preparedTransactions, final TransactionFailureCallback failureCallback) throws Exception { @@ -1179,26 +1198,31 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal } } + @Override public void addPreparedTransaction(final PreparedTransactionInfo preparedTransaction) { preparedTransactions.add(preparedTransaction); checkDeleteSize(); } + @Override public void addRecord(final RecordInfo info) { records.add(info); checkDeleteSize(); } + @Override public void updateRecord(final RecordInfo info) { records.add(info); checkDeleteSize(); } + @Override public void deleteRecord(final long id) { recordsToDelete.add(id); checkDeleteSize(); } + @Override public void failedTransaction(final long transactionID, final List<RecordInfo> records, final List<RecordInfo> recordsToDelete) { @@ -1217,6 +1241,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal return info; } + @Override public void scheduleCompactAndBlock(int timeout) throws Exception { final AtomicInteger errors = new AtomicInteger(0); @@ -1227,6 +1252,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal // We can't use the executor for the compacting... or we would dead lock because of file open and creation // operations (that will use the executor) compactorExecutor.execute(new Runnable() { + @Override public void run() { try { @@ -1469,6 +1495,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal * <p></p> * <p> * FileID and NumberOfElements are the transaction summary, and they will be repeated (N)umberOfFiles times </p> */ + @Override public JournalLoadInformation load(final LoaderCallback loadManager) throws Exception { return load(loadManager, true, null); } @@ -1524,6 +1551,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal } } + @Override public void onReadAddRecord(final RecordInfo info) throws Exception { checkID(info.id); @@ -1534,6 +1562,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal records.put(info.id, new JournalRecord(file, info.data.length + JournalImpl.SIZE_ADD_RECORD + 1)); } + @Override public void onReadUpdateRecord(final RecordInfo info) throws Exception { checkID(info.id); @@ -1553,6 +1582,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal } } + @Override public void onReadDeleteRecord(final long recordID) throws Exception { hasData.set(true); @@ -1565,10 +1595,12 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal } } + @Override public void onReadUpdateRecordTX(final long transactionID, final RecordInfo info) throws Exception { onReadAddRecordTX(transactionID, info); } + @Override public void onReadAddRecordTX(final long transactionID, final RecordInfo info) throws Exception { checkID(info.id); @@ -1597,6 +1629,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal // count } + @Override public void onReadDeleteRecordTX(final long transactionID, final RecordInfo info) throws Exception { hasData.set(true); @@ -1622,6 +1655,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal } + @Override public void onReadPrepareRecord(final long transactionID, final byte[] extraData, final int numberOfRecords) throws Exception { @@ -1659,6 +1693,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal } } + @Override public void onReadCommitRecord(final long transactionID, final int numberOfRecords) throws Exception { TransactionHolder tx = loadTransactions.remove(transactionID); @@ -1705,6 +1740,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal } + @Override public void onReadRollbackRecord(final long transactionID) throws Exception { TransactionHolder tx = loadTransactions.remove(transactionID); @@ -1727,6 +1763,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal } } + @Override public void markAsDataFile(final JournalFile file) { hasData.set(true); } @@ -1791,6 +1828,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal /** * @return true if cleanup was called */ + @Override public final boolean checkReclaimStatus() throws Exception { if (compactorRunning.get()) { @@ -1871,6 +1909,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal // We can't use the executor for the compacting... or we would dead lock because of file open and creation // operations (that will use the executor) compactorExecutor.execute(new Runnable() { + @Override public void run() { try { @@ -1889,10 +1928,12 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal // TestableJournal implementation // -------------------------------------------------------------- + @Override public final void setAutoReclaim(final boolean autoReclaim) { this.autoReclaim = autoReclaim; } + @Override public final boolean isAutoReclaim() { return autoReclaim; } @@ -1941,6 +1982,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal * Method for use on testcases. * It will call waitComplete on every transaction, so any assertions on the file system will be correct after this */ + @Override public void debugWait() throws InterruptedException { fileFactory.flush(); @@ -1954,6 +1996,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal final CountDownLatch latch = newLatch(1); filesExecutor.execute(new Runnable() { + @Override public void run() { latch.countDown(); } @@ -1964,22 +2007,27 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal } + @Override public int getDataFilesCount() { return filesRepository.getDataFilesCount(); } + @Override public JournalFile[] getDataFiles() { return filesRepository.getDataFilesArray(); } + @Override public int getFreeFilesCount() { return filesRepository.getFreeFilesCount(); } + @Override public int getOpenedFilesCount() { return filesRepository.getOpenedFilesCount(); } + @Override public int getIDMapSize() { return records.size(); } @@ -1989,27 +2037,33 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal return fileSize; } + @Override public int getMinFiles() { return minFiles; } + @Override public String getFilePrefix() { return filesRepository.getFilePrefix(); } + @Override public String getFileExtension() { return filesRepository.getFileExtension(); } + @Override public int getMaxAIO() { return filesRepository.getMaxAIO(); } + @Override public int getUserVersion() { return userVersion; } // In some tests we need to force the journal to move to a next file + @Override public void forceMoveNextFile() throws Exception { journalLock.readLock().lock(); try { @@ -2027,6 +2081,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal } } + @Override public void perfBlast(final int pages) { new PerfBlast(pages).start(); } @@ -2034,10 +2089,12 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal // ActiveMQComponent implementation // --------------------------------------------------- + @Override public synchronized boolean isStarted() { return state != JournalState.STOPPED; } + @Override public synchronized void start() { if (state != JournalState.STOPPED) { throw new IllegalStateException("Journal " + this + " is not stopped, state is " + state); @@ -2045,6 +2102,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal filesExecutor = Executors.newSingleThreadExecutor(new ThreadFactory() { + @Override public Thread newThread(final Runnable r) { return new Thread(r, "JournalImpl::FilesExecutor"); } @@ -2052,6 +2110,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal compactorExecutor = Executors.newSingleThreadExecutor(new ThreadFactory() { + @Override public Thread newThread(final Runnable r) { return new Thread(r, "JournalImpl::CompactorExecutor"); } @@ -2064,6 +2123,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal setJournalState(JournalState.STARTED); } + @Override public synchronized void stop() throws Exception { if (state == JournalState.STOPPED) { throw new IllegalStateException("Journal is already stopped"); @@ -2121,6 +2181,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal } } + @Override public int getNumberOfRecords() { return records.size(); } @@ -2156,6 +2217,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal final CountDownLatch done = newLatch(1); filesExecutor.execute(new Runnable() { + @Override public void run() { try { for (JournalFile file : oldFiles) { @@ -2448,6 +2510,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal if (isAutoReclaim() && !compactorRunning.get()) { compactorExecutor.execute(new Runnable() { + @Override public void run() { try { if (!checkReclaimStatus()) { @@ -2582,6 +2645,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal private static final long serialVersionUID = -6264728973604070321L; + @Override public int compare(final JournalFile f1, final JournalFile f2) { long id1 = f1.getFileID(); long id2 = f2.getFileID(); @@ -2614,6 +2678,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal return byteEncoder.getEncodeSize(); } + @Override public void encode(final ActiveMQBuffer buffer) { byteEncoder.encode(buffer); } @@ -2632,11 +2697,13 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal } } + @Override public final void synchronizationLock() { compactorLock.writeLock().lock(); journalLock.writeLock().lock(); } + @Override public final void synchronizationUnlock() { try { compactorLock.writeLock().unlock();