This is an automated email from the ASF dual-hosted git repository. vavrtom pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/qpid-broker-j.git
The following commit(s) were added to refs/heads/main by this push: new 0d386dd353 QPID-8653: [Broker-J] Code cleanup: collection type arguments, collection factory methods, lambdas (#214) 0d386dd353 is described below commit 0d386dd353cf777d12ce5e15bcf8f2c5a09cc720 Author: Daniil Kirilyuk <daniel.kiril...@gmail.com> AuthorDate: Mon Aug 7 13:03:42 2023 +0200 QPID-8653: [Broker-J] Code cleanup: collection type arguments, collection factory methods, lambdas (#214) --- .../store/berkeleydb/AbstractBDBMessageStore.java | 10 +- .../qpid/server/store/berkeleydb/BDBBackup.java | 12 +- .../store/berkeleydb/BDBPreferenceStore.java | 81 +++-- .../store/berkeleydb/CoalescingCommiter.java | 4 +- .../server/store/berkeleydb/EnvironmentFacade.java | 18 +- .../replication/ReplicatedEnvironmentFacade.java | 338 ++++++++------------- .../store/berkeleydb/tuple/ByteBufferBinding.java | 10 +- .../store/berkeleydb/tuple/StringMapBinding.java | 2 +- .../berkeleydb/upgrade/AbstractStoreUpgrade.java | 11 +- .../store/berkeleydb/upgrade/CursorOperation.java | 16 +- .../store/berkeleydb/upgrade/DatabaseTemplate.java | 11 +- .../store/berkeleydb/upgrade/UpgradeFrom4To5.java | 49 ++- .../store/berkeleydb/upgrade/UpgradeFrom5To6.java | 59 ++-- .../store/berkeleydb/upgrade/UpgradeFrom7To8.java | 24 +- .../upgrade/UpgradeInteractionHandler.java | 12 +- .../berkeleydb/BDBHARemoteReplicationNodeImpl.java | 1 - .../berkeleydb/BDBHAVirtualHostNodeImpl.java | 79 ++--- .../store/berkeleydb/BDBHAVirtualHostNodeTest.java | 6 +- .../store/berkeleydb/BDBPreferenceStoreTest.java | 2 +- .../berkeleydb/BDBStoreUpgradeTestPreparer.java | 4 +- .../ReplicatedEnvironmentFacadeTest.java | 15 +- .../store/berkeleydb/upgrade/UpgraderTest.java | 2 +- 22 files changed, 294 insertions(+), 472 deletions(-) diff --git a/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java b/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java index acfddb577b..1fca731aca 100644 --- a/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java +++ b/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java @@ -1360,14 +1360,10 @@ public abstract class AbstractBDBMessageStore implements MessageStore { final StoredBDBMessage storedMessage = (StoredBDBMessage) message.getStoredMessage(); final long contentSize = storedMessage.getContentSize(); - _preCommitActions.add(new Runnable() + _preCommitActions.add(() -> { - @Override - public void run() - { - storedMessage.store(_txn); - _storeSizeIncrease += contentSize; - } + storedMessage.store(_txn); + _storeSizeIncrease += contentSize; }); } diff --git a/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBBackup.java b/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBBackup.java index cd67da9a1e..b3628a35c4 100644 --- a/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBBackup.java +++ b/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBBackup.java @@ -23,7 +23,6 @@ package org.apache.qpid.server.store.berkeleydb; import java.io.File; import java.io.FileInputStream; import java.io.FileNotFoundException; -import java.io.FilenameFilter; import java.io.IOException; import java.nio.file.Files; import java.util.Arrays; @@ -237,14 +236,7 @@ public class BDBBackup while (!consistentSet) { // List all .jdb files in the directory. - fileSet = fromDirFile.listFiles(new FilenameFilter() - { - @Override - public boolean accept(File dir, String name) - { - return name.endsWith(LOG_FILE_SUFFIX); - } - }); + fileSet = fromDirFile.listFiles((dir, name) -> name.endsWith(LOG_FILE_SUFFIX)); if (fileSet == null || fileSet.length == 0) { @@ -303,7 +295,7 @@ public class BDBBackup } // Copy the consistent set of open files. - List<String> backedUpFileNames = new LinkedList<String>(); + List<String> backedUpFileNames = new LinkedList<>(); for (int j = 0; j < fileSet.length; j++) { diff --git a/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBPreferenceStore.java b/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBPreferenceStore.java index ce4790c8fe..dc69604ae0 100644 --- a/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBPreferenceStore.java +++ b/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBPreferenceStore.java @@ -45,54 +45,47 @@ public class BDBPreferenceStore extends AbstractBDBPreferenceStore public BDBPreferenceStore(final ConfiguredObject<?> parent, final String storePath) { _storePath = storePath; - _environmentFactory = new EnvironmentFacadeFactory() + _environmentFactory = object -> new StandardEnvironmentFacade(new StandardEnvironmentConfiguration() { @Override - public EnvironmentFacade createEnvironmentFacade(final ConfiguredObject<?> object) + public String getName() { - return new StandardEnvironmentFacade(new StandardEnvironmentConfiguration() - { - @Override - public String getName() - { - return parent.getName(); - } - - @Override - public String getStorePath() - { - return storePath; - } - - @Override - public CacheMode getCacheMode() - { - return BDBUtils.getCacheMode(parent); - } - - @Override - public Map<String, String> getParameters() - { - return BDBUtils.getEnvironmentConfigurationParameters(parent); - } - - @Override - public <T> T getFacadeParameter(final Class<T> paremeterClass, final String parameterName, final T defaultValue) - { - return BDBUtils.getContextValue(parent, paremeterClass, parameterName, defaultValue); - } - - @Override - public <T> T getFacadeParameter(final Class<T> paremeterClass, - final Type type, - final String parameterName, - final T defaultValue) - { - return BDBUtils.getContextValue(parent, paremeterClass, type, parameterName, defaultValue); - } - }); + return parent.getName(); } - }; + + @Override + public String getStorePath() + { + return storePath; + } + + @Override + public CacheMode getCacheMode() + { + return BDBUtils.getCacheMode(parent); + } + + @Override + public Map<String, String> getParameters() + { + return BDBUtils.getEnvironmentConfigurationParameters(parent); + } + + @Override + public <T> T getFacadeParameter(final Class<T> paremeterClass, final String parameterName, final T defaultValue) + { + return BDBUtils.getContextValue(parent, paremeterClass, parameterName, defaultValue); + } + + @Override + public <T> T getFacadeParameter(final Class<T> paremeterClass, + final Type type, + final String parameterName, + final T defaultValue) + { + return BDBUtils.getContextValue(parent, paremeterClass, type, parameterName, defaultValue); + } + }); } @Override diff --git a/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CoalescingCommiter.java b/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CoalescingCommiter.java index fc6e1fbad3..151c01e159 100644 --- a/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CoalescingCommiter.java +++ b/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CoalescingCommiter.java @@ -84,8 +84,8 @@ public class CoalescingCommiter implements Committer @Override public <X> ListenableFuture<X> commitAsync(Transaction tx, X val) { - ThreadNotifyingSettableFuture<X> future = new ThreadNotifyingSettableFuture<X>(); - BDBCommitFutureResult<X> commitFuture = new BDBCommitFutureResult<X>(val, future); + ThreadNotifyingSettableFuture<X> future = new ThreadNotifyingSettableFuture<>(); + BDBCommitFutureResult<X> commitFuture = new BDBCommitFutureResult<>(val, future); _commitThread.addJob(commitFuture, false); return future; } diff --git a/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacade.java b/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacade.java index bf5e9b7ce8..5d3b8c1eca 100644 --- a/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacade.java +++ b/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacade.java @@ -20,8 +20,6 @@ */ package org.apache.qpid.server.store.berkeleydb; -import java.util.Collections; -import java.util.HashMap; import java.util.Map; import com.google.common.util.concurrent.ListenableFuture; @@ -39,16 +37,12 @@ import org.apache.qpid.server.model.ConfiguredObject; public interface EnvironmentFacade { - @SuppressWarnings("serial") - final Map<String, String> ENVCONFIG_DEFAULTS = Collections.unmodifiableMap(new HashMap<String, String>() - {{ - put(EnvironmentConfig.LOCK_N_LOCK_TABLES, "7"); - // Turn off stats generation - feature introduced (and on by default) from BDB JE 5.0.84 - put(EnvironmentConfig.STATS_COLLECT, "false"); - put(EnvironmentConfig.FILE_LOGGING_LEVEL, "OFF"); - put(EnvironmentConfig.CONSOLE_LOGGING_LEVEL, "OFF"); - put(EnvironmentConfig.CLEANER_UPGRADE_TO_LOG_VERSION, "-1"); - }}); + Map<String, String> ENVCONFIG_DEFAULTS = Map.of(EnvironmentConfig.LOCK_N_LOCK_TABLES, "7", + // Turn off stats generation - feature introduced (and on by default) from BDB JE 5.0.84 + EnvironmentConfig.STATS_COLLECT, "false", + EnvironmentConfig.FILE_LOGGING_LEVEL, "OFF", + EnvironmentConfig.CONSOLE_LOGGING_LEVEL, "OFF", + EnvironmentConfig.CLEANER_UPGRADE_TO_LOG_VERSION, "-1"); String CACHE_MODE_PROPERTY_NAME = "qpid.bdb.cache_mode"; CacheMode CACHE_MODE_DEFAULT = CacheMode.EVICT_LN; diff --git a/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java b/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java index 4e77652aa9..d0193cea21 100644 --- a/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java +++ b/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java @@ -194,7 +194,7 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan public static final ReplicaAckPolicy REPLICA_REPLICA_ACKNOWLEDGMENT_POLICY = ReplicaAckPolicy.SIMPLE_MAJORITY; @SuppressWarnings("serial") - private static final Map<String, String> REPCONFIG_DEFAULTS = Collections.unmodifiableMap(new HashMap<String, String>() + private static final Map<String, String> REPCONFIG_DEFAULTS = Collections.unmodifiableMap(new HashMap<>() {{ /** * Parameter decreased as the 24h default may lead very large log files for most users. @@ -222,13 +222,13 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan * is scheduled to become default after JE 5.1. */ put(ReplicationConfig.PROTOCOL_OLD_STRING_ENCODING, Boolean.FALSE.toString()); - /** - * Allow Replica to proceed with transactions regardless of the state of a Replica - * At the moment we do not read or write databases on Replicas. - * Setting consistency policy to NoConsistencyRequiredPolicy - * would allow to create transaction on Replica immediately. - * Any followed write operation would fail with ReplicaWriteException. - */ + /** + * Allow Replica to proceed with transactions regardless of the state of a Replica + * At the moment we do not read or write databases on Replicas. + * Setting consistency policy to NoConsistencyRequiredPolicy + * would allow to create transaction on Replica immediately. + * Any followed write operation would fail with ReplicaWriteException. + */ put(ReplicationConfig.CONSISTENCY_POLICY, NoConsistencyRequiredPolicy.NAME); }}); @@ -263,16 +263,16 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan * transfer master operation. */ private final ScheduledThreadPoolExecutor _groupChangeExecutor; - private final AtomicReference<State> _state = new AtomicReference<State>(State.OPENING); - private final ConcurrentMap<String, ReplicationNode> _remoteReplicationNodes = new ConcurrentHashMap<String, ReplicationNode>(); - private final AtomicReference<ReplicationGroupListener> _replicationGroupListener = new AtomicReference<ReplicationGroupListener>(); - private final AtomicReference<StateChangeListener> _stateChangeListener = new AtomicReference<StateChangeListener>(); + private final AtomicReference<State> _state = new AtomicReference<>(State.OPENING); + private final ConcurrentMap<String, ReplicationNode> _remoteReplicationNodes = new ConcurrentHashMap<>(); + private final AtomicReference<ReplicationGroupListener> _replicationGroupListener = new AtomicReference<>(); + private final AtomicReference<StateChangeListener> _stateChangeListener = new AtomicReference<>(); private final Durability _defaultDurability; private final ConcurrentMap<String, Database> _cachedDatabases = new ConcurrentHashMap<>(); private final ConcurrentMap<DatabaseEntry, Sequence> _cachedSequences = new ConcurrentHashMap<>(); private final AtomicReference<ReplicatedEnvironment> _environment = new AtomicReference<>(); - private final Set<String> _permittedNodes = new CopyOnWriteArraySet<String>(); + private final Set<String> _permittedNodes = new CopyOnWriteArraySet<>(); private volatile Durability _realMessageStoreDurability = null; private volatile Durability _messageStoreDurability; private volatile CoalescingCommiter _coalescingCommiter = null; @@ -339,22 +339,19 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan boolean success = false; try { - createEnvironment(true, new Runnable(){ - @Override - public void run() + createEnvironment(true, () -> + { + populateExistingRemoteReplicationNodes(); + int numberOfRemoteNodes = _remoteReplicationNodes.size(); + if (numberOfRemoteNodes > 0) { - populateExistingRemoteReplicationNodes(); - int numberOfRemoteNodes = _remoteReplicationNodes.size(); - if (numberOfRemoteNodes > 0) - { - int newPoolSize = numberOfRemoteNodes - + 1 /* for this node */ - + 1 /* for coordination */; - _groupChangeExecutor.setCorePoolSize(newPoolSize); - LOGGER.debug("Setting group change executor core pool size to {}", newPoolSize); - } - _groupChangeExecutor.submit(new RemoteNodeStateLearner()); + int newPoolSize = numberOfRemoteNodes + + 1 /* for this node */ + + 1 /* for coordination */; + _groupChangeExecutor.setCorePoolSize(newPoolSize); + LOGGER.debug("Setting group change executor core pool size to {}", newPoolSize); } + _groupChangeExecutor.submit(new RemoteNodeStateLearner()); }); success = true; } @@ -592,65 +589,57 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan // Tell the virtualhostnode that we are no longer attached to the group. It will close the virtualhost, // closing the connections, housekeeping etc meaning all transactions are finished before we // restart the environment. - _stateChangeExecutor.submit(new Callable<Void>() + _stateChangeExecutor.submit(() -> { - @Override - public Void call() throws Exception + StateChangeListener listener = _stateChangeListener.get(); + if (listener != null && _state.get() == State.RESTARTING) { - StateChangeListener listener = _stateChangeListener.get(); - if (listener != null && _state.get() == State.RESTARTING) + try { - try - { - StateChangeEvent detached = new StateChangeEvent(ReplicatedEnvironment.State.DETACHED, NameIdPair.NULL); - listener.stateChange(detached); - } - catch (Throwable t) - { - handleUncaughtExceptionInExecutorService(t); - } + StateChangeEvent detached = new StateChangeEvent(ReplicatedEnvironment.State.DETACHED, NameIdPair.NULL); + listener.stateChange(detached); + } + catch (Throwable t) + { + handleUncaughtExceptionInExecutorService(t); } - - return null; } - }).addListener(new Runnable() + + return null; + }).addListener(() -> { - @Override - public void run() + int attemptNumber = 1; + boolean restarted = false; + Exception lastException = null; + while(_state.get() == State.RESTARTING && attemptNumber <= _environmentRestartRetryLimit) { - int attemptNumber = 1; - boolean restarted = false; - Exception lastException = null; - while(_state.get() == State.RESTARTING && attemptNumber <= _environmentRestartRetryLimit) + try { - try - { - restartEnvironment(); - restarted = true; - break; - } - catch(EnvironmentFailureException e) - { - LOGGER.warn("Failure whilst trying to restart environment (attempt number " - + "{} of {})", attemptNumber, _environmentRestartRetryLimit, e); - lastException = e; - } - catch (Exception e) - { - LOGGER.error("Fatal failure whilst trying to restart environment", e); - lastException = e; - break; - } - attemptNumber++; + restartEnvironment(); + restarted = true; + break; + } + catch(EnvironmentFailureException e) + { + LOGGER.warn("Failure whilst trying to restart environment (attempt number " + + "{} of {})", attemptNumber, _environmentRestartRetryLimit, e); + lastException = e; } + catch (Exception e) + { + LOGGER.error("Fatal failure whilst trying to restart environment", e); + lastException = e; + break; + } + attemptNumber++; + } - if (!restarted) + if (!restarted) + { + LOGGER.error("Failed to restart environment."); + if (lastException != null) { - LOGGER.error("Failed to restart environment."); - if (lastException != null) - { - handleUncaughtExceptionInExecutorService(lastException); - } + handleUncaughtExceptionInExecutorService(lastException); } } }, _environmentJobExecutor); @@ -773,19 +762,15 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan if (_state.get() != State.CLOSING && _state.get() != State.CLOSED) { - _stateChangeExecutor.submit(new Runnable() + _stateChangeExecutor.submit(() -> { - @Override - public void run() + try { - try - { - stateChanged(stateChangeEvent); - } - catch (Throwable e) - { - handleUncaughtExceptionInExecutorService(e); - } + stateChanged(stateChangeEvent); + } + catch (Throwable e) + { + handleUncaughtExceptionInExecutorService(e); } }); } @@ -831,14 +816,10 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan { LOGGER.debug("Submitting a job to set cache size on {} to {}", _prettyGroupNodeName, cacheSize); - Callable<Void> task = new Callable<Void>() + Callable<Void> task = () -> { - @Override - public Void call() - { - setCacheSizeInternal(cacheSize); - return null; - } + setCacheSizeInternal(cacheSize); + return null; }; submitEnvironmentTask(1, task, "setting cache size"); } @@ -848,14 +829,10 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan { LOGGER.debug("Submitting a job to set update mutable config on {}", _prettyGroupNodeName); - Callable<Void> task = new Callable<Void>() + Callable<Void> task = () -> { - @Override - public Void call() - { - EnvironmentUtils.updateMutableConfig(getEnvironment(), PARAMS_SET_BY_DEFAULT, true, object); - return null; - } + EnvironmentUtils.updateMutableConfig(getEnvironment(), PARAMS_SET_BY_DEFAULT, true, object); + return null; }; submitEnvironmentTask(5, task, "updating mutable config"); @@ -868,14 +845,7 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan LOGGER.debug("Submitting a job to clean log files on {} ", _prettyGroupNodeName); int timeout = 5; - Callable<Integer> task = new Callable<Integer>() - { - @Override - public Integer call() - { - return getEnvironment().cleanLog(); - } - }; + Callable<Integer> task = () -> getEnvironment().cleanLog(); Integer fileCount = submitEnvironmentTask(timeout, task, "cleaning log files"); @@ -888,16 +858,12 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan LOGGER.debug("Submitting a job to perform checkpoint on {} ", _prettyGroupNodeName); int timeout = 5; - Callable<Void> task = new Callable<Void>() + Callable<Void> task = () -> { - @Override - public Void call() - { - CheckpointConfig checkpointConfig = new CheckpointConfig(); - checkpointConfig.setForce(force); - getEnvironment().checkpoint(checkpointConfig); - return null; - } + CheckpointConfig checkpointConfig = new CheckpointConfig(); + checkpointConfig.setForce(force); + getEnvironment().checkpoint(checkpointConfig); + return null; }; submitEnvironmentTask(timeout, task, "perform checkpoint"); @@ -909,15 +875,8 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan LOGGER.debug("Submitting a job to get environment statistics on {} ", _prettyGroupNodeName); int timeout = 5; - Callable<Map<String,Map<String,Object>>> task = new Callable<Map<String,Map<String,Object>>>() - { - @Override - public Map<String,Map<String,Object>> call() - { - return EnvironmentUtils.getEnvironmentStatistics(getEnvironment(), reset); - - } - }; + Callable<Map<String,Map<String,Object>>> task = () -> + EnvironmentUtils.getEnvironmentStatistics(getEnvironment(), reset); return submitEnvironmentTask(timeout, task, "get environment statistics"); } @@ -928,14 +887,7 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan LOGGER.debug("Submitting a job to get transaction statistics on {} ", _prettyGroupNodeName); int timeout = 5; - Callable<Map<String,Object>> task = new Callable<Map<String,Object>>() - { - @Override - public Map<String,Object> call() - { - return EnvironmentUtils.getTransactionStatistics(getEnvironment(), reset); - } - }; + Callable<Map<String,Object>> task = () -> EnvironmentUtils.getTransactionStatistics(getEnvironment(), reset); return submitEnvironmentTask(timeout, task, "get transaction statistics"); } @@ -946,15 +898,7 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan LOGGER.debug("Submitting a job to get database statistics for {} on {} ", database, _prettyGroupNodeName); int timeout = 5; - Callable<Map<String,Object>> task = new Callable<Map<String,Object>>() - { - @Override - public Map<String, Object> call() - { - - return EnvironmentUtils.getDatabaseStatistics(getEnvironment(), database, reset); - } - }; + Callable<Map<String,Object>> task = () -> EnvironmentUtils.getDatabaseStatistics(getEnvironment(), database, reset); return submitEnvironmentTask(timeout, task, "get database statistics for '" + database + "'"); @@ -1255,30 +1199,26 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan // Transfer master contacts the group (using GroupAdmin) to request the mastership change. // It needs to be done asynchronously but not on the _environmentJobExecutor, as there is // no point delaying transfer master because we are restarting. - return _groupChangeExecutor.submit(new Callable<Void>() + return _groupChangeExecutor.submit(() -> { - @Override - public Void call() throws Exception + try { - try - { - ReplicationGroupAdmin admin = createReplicationGroupAdmin(); - String newMaster = admin.transferMaster(Collections.singleton(nodeName), - _masterTransferTimeout, TimeUnit.MILLISECONDS, true); - if (LOGGER.isDebugEnabled()) - { - LOGGER.debug("The mastership has been transferred to " + newMaster); - } - } - catch (RuntimeException e) + ReplicationGroupAdmin admin = createReplicationGroupAdmin(); + String newMaster = admin.transferMaster(Collections.singleton(nodeName), + _masterTransferTimeout, TimeUnit.MILLISECONDS, true); + if (LOGGER.isDebugEnabled()) { - String message = "Exception on transferring the mastership to " + _prettyGroupNodeName - + " Master transfer timeout : " + _masterTransferTimeout; - LOGGER.warn(message, e); - throw handleDatabaseException(message, e); + LOGGER.debug("The mastership has been transferred to " + newMaster); } - return null; } + catch (RuntimeException e) + { + String message = "Exception on transferring the mastership to " + _prettyGroupNodeName + + " Master transfer timeout : " + _masterTransferTimeout; + LOGGER.warn(message, e); + throw handleDatabaseException(message, e); + } + return null; }); } @@ -1331,7 +1271,7 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan private ReplicationGroupAdmin createReplicationGroupAdmin() { - final Set<InetSocketAddress> helpers = new HashSet<InetSocketAddress>(); + final Set<InetSocketAddress> helpers = new HashSet<>(); final ReplicationConfig repConfig = getEnvironment().getRepConfig(); helpers.addAll(repConfig.getHelperSockets()); @@ -1616,14 +1556,10 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan private void createEnvironmentInSeparateThread(final File environmentPathFile, final EnvironmentConfig envConfig, final ReplicationConfig replicationConfig, final Runnable postCreationAction) { - Future<Void> environmentFuture = _environmentJobExecutor.submit(new Callable<Void>() + Future<Void> environmentFuture = _environmentJobExecutor.submit(() -> { - @Override - public Void call() throws Exception - { - createEnvironment(environmentPathFile, envConfig, replicationConfig, postCreationAction); - return null; - } + createEnvironment(environmentPathFile, envConfig, replicationConfig, postCreationAction); + return null; }); final long setUpTimeOutMillis = extractEnvSetupTimeoutMillis(replicationConfig); @@ -1835,7 +1771,7 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan try { Map<String, Object> settings = objectMapper.readValue(applicationState, Map.class); - return new HashSet<String>((Collection<String>)settings.get(PERMITTED_NODE_LIST)); + return new HashSet<>((Collection<String>) settings.get(PERMITTED_NODE_LIST)); } catch (Exception e) { @@ -1846,25 +1782,15 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan public static Collection<String> connectToHelperNodeAndCheckPermittedHosts(final String nodeName, final String hostPort, final String groupName, final String helperNodeName, final String helperHostPort, final int dbPingSocketTimeout) { ExecutorService executor = null; - Future<Collection<String>> future = null; + Future<Collection<String>> future; try { executor = Executors.newSingleThreadExecutor(new DaemonThreadFactory(String.format( "PermittedHostsCheck-%s-%s", groupName, nodeName))); - future = executor.submit(new Callable<Collection<String>>() - { - @Override - public Collection<String> call() throws Exception - { - return getPermittedHostsFromHelper(nodeName, - groupName, - helperNodeName, - helperHostPort, - dbPingSocketTimeout); - } - }); + future = executor.submit(() -> + getPermittedHostsFromHelper(nodeName, groupName, helperNodeName, helperHostPort, dbPingSocketTimeout)); try { @@ -2017,7 +1943,7 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan private byte[] permittedNodeListToBytes(Set<String> permittedNodeList) { - HashMap<String, Object> data = new HashMap<String, Object>(); + HashMap<String, Object> data = new HashMap<>(); data.put(PERMITTED_NODE_LIST, permittedNodeList); ByteArrayOutputStream baos = new ByteArrayOutputStream(); ObjectMapper objectMapper = new ObjectMapper(); @@ -2075,16 +2001,12 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan private void onException(final Exception e) { - _groupChangeExecutor.submit(new Runnable() + _groupChangeExecutor.submit(() -> { - @Override - public void run() + ReplicationGroupListener listener = _replicationGroupListener.get(); + if (listener != null) { - ReplicationGroupListener listener = _replicationGroupListener.get(); - if (listener != null) - { - listener.onException(e); - } + listener.onException(e); } }); } @@ -2264,31 +2186,27 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan private Map<ReplicationNode, NodeState> discoverNodeStates(Collection<ReplicationNode> electableNodes) { - final Map<ReplicationNode, NodeState> nodeStates = new HashMap<ReplicationNode, NodeState>(); - Map<ReplicationNode, Future<Void>> futureMap = new HashMap<ReplicationNode, Future<Void>>(); + final Map<ReplicationNode, NodeState> nodeStates = new HashMap<>(); + Map<ReplicationNode, Future<Void>> futureMap = new HashMap<>(); for (final ReplicationNode node : electableNodes) { nodeStates.put(node, null); - Future<Void> future = _groupChangeExecutor.submit(new Callable<Void>() + Future<Void> future = _groupChangeExecutor.submit(() -> { - @Override - public Void call() + NodeState nodeStateObject = null; + try { - NodeState nodeStateObject = null; - try - { - nodeStateObject = getRemoteNodeState(_configuration.getGroupName(), node, _dbPingSocketTimeout); - } - catch (IOException | ServiceConnectFailedException | com.sleepycat.je.rep.utilint.BinaryProtocol.ProtocolException e ) - { - // Cannot discover node states. The node state should be treated as UNKNOWN - } - - nodeStates.put(node, nodeStateObject); - return null; + nodeStateObject = getRemoteNodeState(_configuration.getGroupName(), node, _dbPingSocketTimeout); + } + catch (IOException | ServiceConnectFailedException | BinaryProtocol.ProtocolException e ) + { + // Cannot discover node states. The node state should be treated as UNKNOWN } + + nodeStates.put(node, nodeStateObject); + return null; }); futureMap.put(node, future); } diff --git a/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/ByteBufferBinding.java b/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/ByteBufferBinding.java index 3b10bfc1d1..ac4225b2d4 100644 --- a/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/ByteBufferBinding.java +++ b/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/ByteBufferBinding.java @@ -32,15 +32,7 @@ public class ByteBufferBinding extends TupleBinding<QpidByteBuffer> { private static final int COPY_BUFFER_SIZE = 8192; - private static final ThreadLocal<byte[]> COPY_BUFFER = new ThreadLocal<byte[]>() - { - - @Override - protected byte[] initialValue() - { - return new byte[COPY_BUFFER_SIZE]; - } - }; + private static final ThreadLocal<byte[]> COPY_BUFFER = ThreadLocal.withInitial(() -> new byte[COPY_BUFFER_SIZE]); private static final ByteBufferBinding INSTANCE = new ByteBufferBinding(); diff --git a/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/StringMapBinding.java b/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/StringMapBinding.java index fffdb0d107..7aee489545 100644 --- a/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/StringMapBinding.java +++ b/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/StringMapBinding.java @@ -35,7 +35,7 @@ public class StringMapBinding extends TupleBinding<Map<String,String>> public Map<String, String> entryToObject(final TupleInput tupleInput) { int entries = tupleInput.readInt(); - Map<String,String> map = new HashMap<String,String>(entries); + Map<String,String> map = new HashMap<>(entries); for(int i = 0; i < entries; i++) { map.put(tupleInput.readString(), tupleInput.readString()); diff --git a/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/AbstractStoreUpgrade.java b/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/AbstractStoreUpgrade.java index ec10a32a20..e1e1b1fdc1 100644 --- a/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/AbstractStoreUpgrade.java +++ b/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/AbstractStoreUpgrade.java @@ -22,9 +22,7 @@ package org.apache.qpid.server.store.berkeleydb.upgrade; import java.util.List; -import com.sleepycat.je.Database; import com.sleepycat.je.Environment; -import com.sleepycat.je.Transaction; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -63,14 +61,7 @@ public abstract class AbstractStoreUpgrade implements StoreUpgrade private long getRowCount(String databaseName, Environment environment) { - DatabaseCallable<Long> operation = new DatabaseCallable<Long>() - { - @Override - public Long call(Database sourceDatabase, Database targetDatabase, Transaction transaction) - { - return sourceDatabase.count(); - } - }; + DatabaseCallable<Long> operation = (sourceDatabase, targetDatabase, transaction) -> sourceDatabase.count(); return new DatabaseTemplate(environment, databaseName, null).call(operation); } diff --git a/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/CursorOperation.java b/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/CursorOperation.java index dad99fc32d..b8f9461db7 100644 --- a/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/CursorOperation.java +++ b/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/CursorOperation.java @@ -39,20 +39,14 @@ public abstract class CursorOperation implements DatabaseRunnable public void run(final Database sourceDatabase, final Database targetDatabase, final Transaction transaction) { _rowCount = sourceDatabase.count(); - _template = new CursorTemplate(sourceDatabase, transaction, new DatabaseEntryCallback() + _template = new CursorTemplate(sourceDatabase, transaction, (database, transaction1, key, value) -> { - @Override - public void processEntry(final Database database, final Transaction transaction, final DatabaseEntry key, - final DatabaseEntry value) + _processedRowCount++; + CursorOperation.this.processEntry(database, targetDatabase, transaction1, key, value); + if (getProcessedCount() % 1000 == 0) { - _processedRowCount++; - CursorOperation.this.processEntry(database, targetDatabase, transaction, key, value); - if (getProcessedCount() % 1000 == 0) - { - LOGGER.info("Processed " + getProcessedCount() + " records of " + getRowCount() + "."); - } + LOGGER.info("Processed " + getProcessedCount() + " records of " + getRowCount() + "."); } - }); _template.processEntries(); } diff --git a/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/DatabaseTemplate.java b/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/DatabaseTemplate.java index fd4f4b5416..c47fbdcaaa 100644 --- a/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/DatabaseTemplate.java +++ b/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/DatabaseTemplate.java @@ -84,15 +84,10 @@ public class DatabaseTemplate private DatabaseCallable<Void> runnableToCallable(final DatabaseRunnable databaseRunnable) { - return new DatabaseCallable<Void>() + return (sourceDatabase, targetDatabase, transaction) -> { - - @Override - public Void call(Database sourceDatabase, Database targetDatabase, Transaction transaction) - { - databaseRunnable.run(sourceDatabase, targetDatabase, transaction); - return null; - } + databaseRunnable.run(sourceDatabase, targetDatabase, transaction); + return null; }; } diff --git a/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom4To5.java b/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom4To5.java index 0e4f9057e4..052d0d4461 100644 --- a/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom4To5.java +++ b/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom4To5.java @@ -167,7 +167,7 @@ public class UpgradeFrom4To5 extends AbstractStoreUpgrade List<AMQShortString> potentialDurableSubs, Transaction transaction) { LOGGER.info("Queues"); - final Set<String> existingQueues = new HashSet<String>(); + final Set<String> existingQueues = new HashSet<>(); if (environment.getDatabaseNames().contains(OLD_QUEUE_DB_NAME)) { final QueueRecordBinding binding = new QueueRecordBinding(potentialDurableSubs); @@ -196,7 +196,7 @@ public class UpgradeFrom4To5 extends AbstractStoreUpgrade Transaction transaction) { final List<AMQShortString> exchangeNames = findTopicExchanges(environment); - final List<AMQShortString> queues = new ArrayList<AMQShortString>(); + final List<AMQShortString> queues = new ArrayList<>(); final PartialBindingRecordBinding binding = new PartialBindingRecordBinding(); CursorOperation databaseOperation = new CursorOperation() @@ -219,8 +219,8 @@ public class UpgradeFrom4To5 extends AbstractStoreUpgrade private Set<Long> upgradeDelivery(final Environment environment, final Set<String> existingQueues, final UpgradeInteractionHandler handler, Transaction transaction) { - final Set<Long> messagesToDiscard = new HashSet<Long>(); - final Set<String> queuesToDiscard = new HashSet<String>(); + final Set<Long> messagesToDiscard = new HashSet<>(); + final Set<String> queuesToDiscard = new HashSet<>(); final QueueEntryKeyBinding queueEntryKeyBinding = new QueueEntryKeyBinding(); LOGGER.info("Delivery Records"); @@ -307,40 +307,37 @@ public class UpgradeFrom4To5 extends AbstractStoreUpgrade final QueueRecordBinding binding = new QueueRecordBinding(null); final BindingTuple bindingTuple = new BindingTuple(); - DatabaseRunnable queueCreateOperation = new DatabaseRunnable() + DatabaseRunnable queueCreateOperation = (newQueueDatabase, newBindingsDatabase, transaction1) -> { + AMQShortString queueNameAMQ = AMQShortString.createAMQShortString(queueName); + QueueRecord record = new QueueRecord(queueNameAMQ, null, false, null); - @Override - public void run(Database newQueueDatabase, Database newBindingsDatabase, Transaction transaction) - { - AMQShortString queueNameAMQ = AMQShortString.createAMQShortString(queueName); - QueueRecord record = new QueueRecord(queueNameAMQ, null, false, null); - - DatabaseEntry key = new DatabaseEntry(); + DatabaseEntry key = new DatabaseEntry(); - TupleOutput output = new TupleOutput(); - AMQShortStringEncoding.writeShortString(record.getNameShortString(), output); - TupleBase.outputToEntry(output, key); + TupleOutput output = new TupleOutput(); + AMQShortStringEncoding.writeShortString(record.getNameShortString(), output); + TupleBase.outputToEntry(output, key); - DatabaseEntry newValue = new DatabaseEntry(); - binding.objectToEntry(record, newValue); - newQueueDatabase.put(transaction, key, newValue); + DatabaseEntry newValue = new DatabaseEntry(); + binding.objectToEntry(record, newValue); + newQueueDatabase.put(transaction1, key, newValue); - FieldTable emptyArguments = FieldTableFactory.createFieldTable(Collections.emptyMap()); - addBindingToDatabase(bindingTuple, newBindingsDatabase, transaction, queueNameAMQ, - AMQShortString.valueOf(ExchangeDefaults.DIRECT_EXCHANGE_NAME), queueNameAMQ, emptyArguments); + FieldTable emptyArguments = FieldTableFactory.createFieldTable(Collections.emptyMap()); + addBindingToDatabase(bindingTuple, newBindingsDatabase, + transaction1, queueNameAMQ, + AMQShortString.valueOf(ExchangeDefaults.DIRECT_EXCHANGE_NAME), queueNameAMQ, emptyArguments); - // TODO QPID-3490 we should not persist a default exchange binding - addBindingToDatabase(bindingTuple, newBindingsDatabase, transaction, queueNameAMQ, - AMQShortString.valueOf(ExchangeDefaults.DEFAULT_EXCHANGE_NAME), queueNameAMQ, emptyArguments); - } + // TODO QPID-3490 we should not persist a default exchange binding + addBindingToDatabase(bindingTuple, newBindingsDatabase, + transaction1, queueNameAMQ, + AMQShortString.valueOf(ExchangeDefaults.DEFAULT_EXCHANGE_NAME), queueNameAMQ, emptyArguments); }; new DatabaseTemplate(environment, NEW_QUEUE_DB_NAME, NEW_BINDINGS_DB_NAME, transaction).run(queueCreateOperation); } private List<AMQShortString> findTopicExchanges(final Environment environment) { - final List<AMQShortString> topicExchanges = new ArrayList<AMQShortString>(); + final List<AMQShortString> topicExchanges = new ArrayList<>(); final ExchangeRecordBinding binding = new ExchangeRecordBinding(); CursorOperation databaseOperation = new CursorOperation() { diff --git a/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6.java b/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6.java index 5262e0b2c5..61916602a9 100644 --- a/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6.java +++ b/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6.java @@ -26,9 +26,7 @@ import static org.apache.qpid.server.store.berkeleydb.upgrade.UpgradeInteraction import java.io.IOException; import java.util.ArrayList; -import java.util.Arrays; import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; @@ -69,13 +67,11 @@ public class UpgradeFrom5To6 extends AbstractStoreUpgrade { private static final Logger LOGGER = LoggerFactory.getLogger(UpgradeFrom5To6.class); - private static final Set<String> DEFAULT_EXCHANGES_SET = - new HashSet<String>(Arrays.asList( - ExchangeDefaults.DEFAULT_EXCHANGE_NAME, - ExchangeDefaults.FANOUT_EXCHANGE_NAME, - ExchangeDefaults.HEADERS_EXCHANGE_NAME, - ExchangeDefaults.TOPIC_EXCHANGE_NAME, - ExchangeDefaults.DIRECT_EXCHANGE_NAME)); + private static final Set<String> DEFAULT_EXCHANGES_SET = Set.of(ExchangeDefaults.DEFAULT_EXCHANGE_NAME, + ExchangeDefaults.FANOUT_EXCHANGE_NAME, + ExchangeDefaults.HEADERS_EXCHANGE_NAME, + ExchangeDefaults.TOPIC_EXCHANGE_NAME, + ExchangeDefaults.DIRECT_EXCHANGE_NAME); private static final String ARGUMENTS = "arguments"; @@ -172,28 +168,23 @@ public class UpgradeFrom5To6 extends AbstractStoreUpgrade LOGGER.info("Message Contents"); if (environment.getDatabaseNames().contains(OLD_CONTENT_DB_NAME)) { - DatabaseRunnable contentOperation = new DatabaseRunnable() + DatabaseRunnable contentOperation = (oldContentDatabase, newContentDatabase, contentTransaction) -> { - @Override - public void run(final Database oldContentDatabase, final Database newContentDatabase, - Transaction contentTransaction) + CursorOperation metaDataDatabaseOperation = new CursorOperation() { - CursorOperation metaDataDatabaseOperation = new CursorOperation() - { - @Override - public void processEntry(Database metadataDatabase, Database notUsed, - Transaction metaDataTransaction, DatabaseEntry key, DatabaseEntry value) - { - long messageId = LongBinding.entryToLong(key); - upgradeMessage(messageId, oldContentDatabase, newContentDatabase, handler, metaDataTransaction, - metadataDatabase); - } - }; - new DatabaseTemplate(environment, OLD_META_DATA_DB_NAME, contentTransaction) - .run(metaDataDatabaseOperation); - LOGGER.info(metaDataDatabaseOperation.getRowCount() + " Message Content Entries"); - } + @Override + public void processEntry(Database metadataDatabase, Database notUsed, + Transaction metaDataTransaction, DatabaseEntry key, DatabaseEntry value) + { + long messageId = LongBinding.entryToLong(key); + upgradeMessage(messageId, oldContentDatabase, newContentDatabase, handler, metaDataTransaction, + metadataDatabase); + } + }; + new DatabaseTemplate(environment, OLD_META_DATA_DB_NAME, contentTransaction) + .run(metaDataDatabaseOperation); + LOGGER.info(metaDataDatabaseOperation.getRowCount() + " Message Content Entries"); }; new DatabaseTemplate(environment, OLD_CONTENT_DB_NAME, NEW_CONTENT_DB_NAME, transaction).run(contentOperation); environment.removeDatabase(transaction, OLD_CONTENT_DB_NAME); @@ -275,7 +266,7 @@ public class UpgradeFrom5To6 extends AbstractStoreUpgrade */ private SortedMap<Integer, byte[]> getMessageData(final long messageId, final Database oldDatabase) { - TreeMap<Integer, byte[]> data = new TreeMap<Integer, byte[]>(); + TreeMap<Integer, byte[]> data = new TreeMap<>(); Cursor cursor = oldDatabase.openCursor(null, CursorConfig.READ_COMMITTED); try @@ -438,7 +429,7 @@ public class UpgradeFrom5To6 extends AbstractStoreUpgrade private List<String> upgradeExchanges(Environment environment, Transaction transaction, final String virtualHostName) { - final List<String> exchangeNames = new ArrayList<String>(); + final List<String> exchangeNames = new ArrayList<>(); LOGGER.info("Exchanges"); if (environment.getDatabaseNames().contains(OLD_EXCHANGE_DB_NAME)) { @@ -475,7 +466,7 @@ public class UpgradeFrom5To6 extends AbstractStoreUpgrade private List<String> upgradeQueues(Environment environment, Transaction transaction, final String virtualHostName) { - final List<String> queueNames = new ArrayList<String>(); + final List<String> queueNames = new ArrayList<>(); LOGGER.info("Queues"); if (environment.getDatabaseNames().contains(OLD_QUEUE_DB_NAME)) { @@ -532,7 +523,7 @@ public class UpgradeFrom5To6 extends AbstractStoreUpgrade String owner, boolean exclusive, FieldTable arguments) { - Map<String, Object> attributesMap = new HashMap<String, Object>(); + Map<String, Object> attributesMap = new HashMap<>(); attributesMap.put(Queue.NAME, queueName); attributesMap.put(Queue.EXCLUSIVE, exclusive); @@ -569,7 +560,7 @@ public class UpgradeFrom5To6 extends AbstractStoreUpgrade private UpgradeConfiguredObjectRecord createExchangeConfiguredObjectRecord(String exchangeName, String exchangeType, boolean autoDelete) { - Map<String, Object> attributesMap = new HashMap<String, Object>(); + Map<String, Object> attributesMap = new HashMap<>(); attributesMap.put(Exchange.NAME, exchangeName); attributesMap.put(Exchange.TYPE, exchangeType); attributesMap.put(Exchange.LIFETIME_POLICY, autoDelete ? "AUTO_DELETE" @@ -582,7 +573,7 @@ public class UpgradeFrom5To6 extends AbstractStoreUpgrade private UpgradeConfiguredObjectRecord createBindingConfiguredObjectRecord(String exchangeName, String queueName, String routingKey, FieldTable arguments, String virtualHostName) { - Map<String, Object> attributesMap = new HashMap<String, Object>(); + Map<String, Object> attributesMap = new HashMap<>(); attributesMap.put("name", routingKey); attributesMap.put("exchange", UUIDGenerator.generateExchangeUUID(exchangeName, virtualHostName)); attributesMap.put("queue", UUIDGenerator.generateQueueUUID(queueName, virtualHostName)); diff --git a/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom7To8.java b/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom7To8.java index 0545d7186a..7de466f5ee 100644 --- a/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom7To8.java +++ b/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom7To8.java @@ -22,7 +22,7 @@ package org.apache.qpid.server.store.berkeleydb.upgrade; import java.io.IOException; import java.io.StringWriter; -import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; import java.util.HashMap; import java.util.Map; import java.util.UUID; @@ -57,22 +57,22 @@ import com.sleepycat.je.Transaction; public class UpgradeFrom7To8 extends AbstractStoreUpgrade { - private static final TypeReference<HashMap<String, Object>> MAP_TYPE_REFERENCE = new TypeReference<HashMap<String,Object>>(){}; + private static final TypeReference<HashMap<String, Object>> MAP_TYPE_REFERENCE = new TypeReference<>() + { + }; - private static final DatabaseEntry MESSAGE_METADATA_SEQ_KEY = new DatabaseEntry("MESSAGE_METADATA_SEQ_KEY".getBytes(Charset.forName("UTF-8"))); + private static final DatabaseEntry MESSAGE_METADATA_SEQ_KEY = + new DatabaseEntry("MESSAGE_METADATA_SEQ_KEY".getBytes(StandardCharsets.UTF_8)); private static final SequenceConfig MESSAGE_METADATA_SEQ_CONFIG = SequenceConfig.DEFAULT. setAllowCreate(true). setWrap(true). setCacheSize(100000); - private final Map<String, String> _defaultExchanges = new HashMap<String, String>() - {{ - put("amq.direct", "direct"); - put("amq.topic", "topic"); - put("amq.fanout", "fanout"); - put("amq.match", "headers"); - }}; + private final Map<String, String> _defaultExchanges = Map.of("amq.direct", "direct", + "amq.topic", "topic", + "amq.fanout", "fanout", + "amq.match", "headers"); @Override public void performUpgrade(Environment environment, UpgradeInteractionHandler handler, ConfiguredObject<?> parent) @@ -103,7 +103,7 @@ public class UpgradeFrom7To8 extends AbstractStoreUpgrade configVersionDb.close(); String virtualHostName = parent.getName(); - Map<String, Object> virtualHostAttributes = new HashMap<String, Object>(); + Map<String, Object> virtualHostAttributes = new HashMap<>(); virtualHostAttributes.put("modelVersion", stringifiedConfigVersion); virtualHostAttributes.put("name", virtualHostName); UUID virtualHostId = UUIDGenerator.generateVhostUUID(virtualHostName); @@ -202,7 +202,7 @@ public class UpgradeFrom7To8 extends AbstractStoreUpgrade for (Map.Entry<String, String> defaultExchangeEntry : _defaultExchanges.entrySet()) { UUID id = UUIDGenerator.generateExchangeUUID(defaultExchangeEntry.getKey(), virtualHostName); - Map<String, Object> exchangeAttributes = new HashMap<String, Object>(); + Map<String, Object> exchangeAttributes = new HashMap<>(); exchangeAttributes.put("name", defaultExchangeEntry.getKey()); exchangeAttributes.put("type", defaultExchangeEntry.getValue()); exchangeAttributes.put("lifetimePolicy", "PERMANENT"); diff --git a/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeInteractionHandler.java b/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeInteractionHandler.java index c3f3ebefbb..74212b5096 100644 --- a/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeInteractionHandler.java +++ b/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeInteractionHandler.java @@ -25,14 +25,6 @@ public interface UpgradeInteractionHandler UpgradeInteractionResponse requireResponse(String question, UpgradeInteractionResponse defaultResponse, UpgradeInteractionResponse... possibleResponses); - public static final UpgradeInteractionHandler DEFAULT_HANDLER = new UpgradeInteractionHandler() - { - @Override - public UpgradeInteractionResponse requireResponse(final String question, - final UpgradeInteractionResponse defaultResponse, - final UpgradeInteractionResponse... possibleResponses) - { - return defaultResponse; - } - }; + public static final UpgradeInteractionHandler DEFAULT_HANDLER = + (question, defaultResponse, possibleResponses) -> defaultResponse; } diff --git a/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHARemoteReplicationNodeImpl.java b/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHARemoteReplicationNodeImpl.java index 946e2b7996..16fda98fbc 100644 --- a/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHARemoteReplicationNodeImpl.java +++ b/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHARemoteReplicationNodeImpl.java @@ -43,7 +43,6 @@ import org.apache.qpid.server.logging.subjects.GroupLogSubject; import org.apache.qpid.server.model.AbstractConfiguredObject; import org.apache.qpid.server.model.Broker; import org.apache.qpid.server.model.ConfiguredObject; -import org.apache.qpid.server.model.IllegalStateTransitionException; import org.apache.qpid.server.model.ManagedAttributeField; import org.apache.qpid.server.model.State; import org.apache.qpid.server.store.berkeleydb.replication.ReplicatedEnvironmentFacade; diff --git a/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java b/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java index c1a875c03f..114cc41bbb 100644 --- a/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java +++ b/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java @@ -300,7 +300,7 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu if (!isFirstNodeInAGroup()) { List<String> permittedNodes = new ArrayList<>(getPermittedNodesFromHelper()); - setAttributes(Collections.<String, Object>singletonMap(PERMITTED_NODES, permittedNodes)); + setAttributes(Collections.singletonMap(PERMITTED_NODES, permittedNodes)); } } @@ -384,7 +384,7 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu final SettableFuture<Void> returnVal = SettableFuture.create(); ListenableFuture<Void> superFuture = super.doStop(); - addFutureCallback(superFuture, new FutureCallback<Void>() + addFutureCallback(superFuture, new FutureCallback<>() { @Override public void onSuccess(final Void result) @@ -413,7 +413,6 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu { returnVal.set(null); } - } }, getTaskExecutor()); return returnVal; @@ -482,7 +481,7 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu private Set<InetSocketAddress> getRemoteNodeAddresses() { - Set<InetSocketAddress> helpers = new HashSet<InetSocketAddress>(); + Set<InetSocketAddress> helpers = new HashSet<>(); @SuppressWarnings("rawtypes") Collection<? extends RemoteReplicationNode> remoteNodes = getRemoteReplicationNodes(); for (RemoteReplicationNode<?> node : remoteNodes) @@ -497,16 +496,7 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu @Override protected ListenableFuture<Void> onClose() { - return doAfterAlways(super.onClose(), - new Runnable() - { - @Override - public void run() - { - closeEnvironment(); - } - }); - + return doAfterAlways(super.onClose(), this::closeEnvironment); } @Override @@ -626,7 +616,7 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu ConfiguredObjectRecord[] initialRecords = getInitialRecords(); if(upgraderAndRecoverer.upgradeAndRecover(getConfigurationStore(), initialRecords)) { - setAttributes(Collections.<String, Object>singletonMap(VIRTUALHOST_INITIAL_CONFIGURATION, "{}")); + setAttributes(Collections.singletonMap(VIRTUALHOST_INITIAL_CONFIGURATION, "{}")); firstOpening = initialRecords.length == 0; } @@ -665,14 +655,10 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu { ((QueueManagingVirtualHost<?>) recoveredHost).setFirstOpening(firstOpening); } - Subject.doAs(getSubjectWithAddedSystemRights(), new PrivilegedAction<Object>() + Subject.doAs(getSubjectWithAddedSystemRights(), (PrivilegedAction<Object>) () -> { - @Override - public Object run() - { - recoveredHost.open(); - return null; - } + recoveredHost.open(); + return null; }); } success = true; @@ -1124,14 +1110,10 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu @Override public void onNodeState(final ReplicationNode node, final NodeState nodeState) { - Subject.doAs(getSystemTaskSubject(_virtualHostNodePrincipalName), new PrivilegedAction<Void>() + Subject.doAs(getSystemTaskSubject(_virtualHostNodePrincipalName), (PrivilegedAction<Void>) () -> { - @Override - public Void run() - { - processNodeState(node, nodeState); - return null; - } + processNodeState(node, nodeState); + return null; }); } @@ -1181,7 +1163,7 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu { if (_permittedNodes.contains(remoteNode.getAddress())) { - setAttributes(Collections.<String, Object>singletonMap(PERMITTED_NODES, new ArrayList<>(permittedNodes))); + setAttributes(Collections.singletonMap(PERMITTED_NODES, new ArrayList<>(permittedNodes))); } else { LOGGER.warn("Cannot accept the new permitted node list from the master as the master '" + remoteNode.getName() @@ -1212,15 +1194,8 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu @Override public boolean onIntruderNode(final ReplicationNode node) { - return Subject.doAs(getSystemTaskSubject(_virtualHostNodePrincipalName), new PrivilegedAction<Boolean>() - { - @Override - public Boolean run() - { - return processIntruderNode(node); - - } - }); + return Subject.doAs(getSystemTaskSubject(_virtualHostNodePrincipalName), (PrivilegedAction<Boolean>) () -> + processIntruderNode(node)); } private boolean processIntruderNode(final ReplicationNode node) @@ -1314,7 +1289,7 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu private Map<String, Object> nodeToAttributes(ReplicationNode replicationNode) { - Map<String, Object> attributes = new HashMap<String, Object>(); + Map<String, Object> attributes = new HashMap<>(); attributes.put(ConfiguredObject.NAME, replicationNode.getName()); attributes.put(ConfiguredObject.DURABLE, false); attributes.put(BDBHARemoteReplicationNode.ADDRESS, replicationNode.getHostName() + ":" + replicationNode.getPort()); @@ -1330,18 +1305,14 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu final State initialState = getState(); - ListenableFuture<Void> future = doAfterAlways(stopAndSetStateTo(State.ERRORED), new Runnable() + ListenableFuture<Void> future = doAfterAlways(stopAndSetStateTo(State.ERRORED), () -> { - @Override - public void run() - { - _lastRole.set(NodeRole.DETACHED); - attributeSet(ROLE, _role, NodeRole.DETACHED); - notifyStateChanged(initialState, State.ERRORED); - } + _lastRole.set(NodeRole.DETACHED); + attributeSet(ROLE, _role, NodeRole.DETACHED); + notifyStateChanged(initialState, State.ERRORED); }); - addFutureCallback(future, new FutureCallback<Void>() + addFutureCallback(future, new FutureCallback<>() { @Override public void onSuccess(final Void result) @@ -1368,14 +1339,10 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu @Override public Void execute() { - return Subject.doAs(getSystemTaskSubject(_virtualHostNodePrincipalName), new PrivilegedAction<Void>() + return Subject.doAs(getSystemTaskSubject(_virtualHostNodePrincipalName), (PrivilegedAction<Void>) () -> { - @Override - public Void run() - { - perform(); - return null; - } + perform(); + return null; }); } diff --git a/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeTest.java b/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeTest.java index abfe7eee67..513793651e 100644 --- a/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeTest.java +++ b/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeTest.java @@ -513,14 +513,14 @@ public class BDBHAVirtualHostNodeTest extends UnitTestBase _helper.awaitRemoteNodes(node1, 2); // Create new "proposed" permitted nodes list with a current node missing - List<String> amendedPermittedNodes = new ArrayList<String>(); + List<String> amendedPermittedNodes = new ArrayList<>(); amendedPermittedNodes.add(node1Address); amendedPermittedNodes.add(node2Address); // Try to update the permitted nodes attributes using the new list try { - node1.setAttributes(Collections.<String, Object>singletonMap(BDBHAVirtualHostNode.PERMITTED_NODES, amendedPermittedNodes)); + node1.setAttributes(Collections.singletonMap(BDBHAVirtualHostNode.PERMITTED_NODES, amendedPermittedNodes)); fail("Operation to remove current group node from permitted nodes should have failed"); } catch(IllegalArgumentException e) @@ -561,7 +561,7 @@ public class BDBHAVirtualHostNodeTest extends UnitTestBase _helper.awaitRemoteNodes(node1, 2); // Create new "proposed" permitted nodes list for update - List<String> amendedPermittedNodes = new ArrayList<String>(); + List<String> amendedPermittedNodes = new ArrayList<>(); amendedPermittedNodes.add(node1Address); amendedPermittedNodes.add(node2Address); amendedPermittedNodes.add(node3Address); diff --git a/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBPreferenceStoreTest.java b/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBPreferenceStoreTest.java index 06c9385381..9741220254 100644 --- a/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBPreferenceStoreTest.java +++ b/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBPreferenceStoreTest.java @@ -207,7 +207,7 @@ public class BDBPreferenceStoreTest extends UnitTestBase { try { - _preferenceStore.replace(Collections.<UUID>emptyList(), Collections.emptyList()); + _preferenceStore.replace(Collections.emptyList(), Collections.emptyList()); fail("Should not be able to replace"); } catch (IllegalStateException e) diff --git a/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgradeTestPreparer.java b/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgradeTestPreparer.java index 477f6ca7ac..4b3320b6e2 100644 --- a/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgradeTestPreparer.java +++ b/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgradeTestPreparer.java @@ -168,7 +168,7 @@ public class BDBStoreUpgradeTestPreparer session = connection.createSession(true, Session.SESSION_TRANSACTED); // Create a priority queue on broker - final Map<String,Object> priorityQueueArguments = new HashMap<String, Object>(); + final Map<String,Object> priorityQueueArguments = new HashMap<>(); priorityQueueArguments.put(QueueArgumentsConverter.X_QPID_PRIORITIES,10); Queue priorityQueue = createAndBindQueueOnBroker(session, PRIORITY_QUEUE_NAME, priorityQueueArguments); MessageProducer priorityQueueProducer = session.createProducer(priorityQueue); @@ -184,7 +184,7 @@ public class BDBStoreUpgradeTestPreparer priorityQueueProducer.close(); // Create a queue that has a DLQ - final Map<String,Object> queueWithDLQArguments = new HashMap<String, Object>(); + final Map<String,Object> queueWithDLQArguments = new HashMap<>(); queueWithDLQArguments.put("x-qpid-dlq-enabled", true); queueWithDLQArguments.put("x-qpid-maximum-delivery-count", 2); createAndBindQueueOnBroker(session, QUEUE_WITH_DLQ_NAME, queueWithDLQArguments); diff --git a/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java b/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java index 755d3f3dec..253f238179 100644 --- a/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java +++ b/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java @@ -22,7 +22,17 @@ package org.apache.qpid.server.store.berkeleydb.replication; import static org.apache.qpid.server.store.berkeleydb.EnvironmentFacade.JUL_LOGGER_LEVEL_OVERRIDE; import static org.apache.qpid.server.store.berkeleydb.EnvironmentFacade.LOG_HANDLER_CLEANER_PROTECTED_FILES_LIMIT_PROPERTY_NAME; -import static org.apache.qpid.server.store.berkeleydb.replication.ReplicatedEnvironmentFacade.*; +import static org.apache.qpid.server.store.berkeleydb.replication.ReplicatedEnvironmentFacade.DB_PING_SOCKET_TIMEOUT_PROPERTY_NAME; +import static org.apache.qpid.server.store.berkeleydb.replication.ReplicatedEnvironmentFacade.DISABLE_COALESCING_COMMITTER_PROPERTY_NAME; +import static org.apache.qpid.server.store.berkeleydb.replication.ReplicatedEnvironmentFacade.ENVIRONMENT_RESTART_RETRY_LIMIT_PROPERTY_NAME; +import static org.apache.qpid.server.store.berkeleydb.replication.ReplicatedEnvironmentFacade.EXECUTOR_SHUTDOWN_TIMEOUT_PROPERTY_NAME; +import static org.apache.qpid.server.store.berkeleydb.replication.ReplicatedEnvironmentFacade.MASTER_TRANSFER_TIMEOUT_PROPERTY_NAME; +import static org.apache.qpid.server.store.berkeleydb.replication.ReplicatedEnvironmentFacade.NO_SYNC_TX_DURABILITY_PROPERTY_NAME; +import static org.apache.qpid.server.store.berkeleydb.replication.ReplicatedEnvironmentFacade.PERMITTED_NODE_LIST; +import static org.apache.qpid.server.store.berkeleydb.replication.ReplicatedEnvironmentFacade.REMOTE_NODE_MONITOR_INTERVAL_PROPERTY_NAME; +import static org.apache.qpid.server.store.berkeleydb.replication.ReplicatedEnvironmentFacade.REMOTE_NODE_MONITOR_TIMEOUT_PROPERTY_NAME; +import static org.apache.qpid.server.store.berkeleydb.replication.ReplicatedEnvironmentFacade.ReplicationNodeImpl; +import static org.apache.qpid.server.store.berkeleydb.replication.ReplicatedEnvironmentFacade.getRemoteNodeState; import static org.apache.qpid.server.virtualhost.berkeleydb.BDBVirtualHost.DEFAULT_QPID_BROKER_BDB_COMMITER_NOTIFY_THRESHOLD; import static org.apache.qpid.server.virtualhost.berkeleydb.BDBVirtualHost.DEFAULT_QPID_BROKER_BDB_COMMITER_WAIT_TIMEOUT; import static org.apache.qpid.server.virtualhost.berkeleydb.BDBVirtualHost.QPID_BROKER_BDB_COMMITER_NOTIFY_THRESHOLD; @@ -878,7 +888,8 @@ public class ReplicatedEnvironmentFacadeTest extends UnitTestBase permittedNodes.add("localhost:" + _portHelper.getNextAvailable()); firstNode.setPermittedNodes(permittedNodes); - ReplicationNodeImpl replicationNode = new ReplicationNodeImpl(TEST_NODE_NAME, TEST_NODE_HOST_PORT); + ReplicationNodeImpl + replicationNode = new ReplicationNodeImpl(TEST_NODE_NAME, TEST_NODE_HOST_PORT); NodeState nodeState = getRemoteNodeState(TEST_GROUP_NAME, replicationNode, 5000); ObjectMapper objectMapper = new ObjectMapper(); diff --git a/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgraderTest.java b/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgraderTest.java index d8f9547767..1c0d7ed13a 100644 --- a/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgraderTest.java +++ b/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgraderTest.java @@ -108,7 +108,7 @@ public class UpgraderTest extends AbstractUpgradeTestCase _upgrader.upgradeIfNecessary(); List<String> databaseNames = emptyEnvironment.getDatabaseNames(); - List<String> expectedDatabases = new ArrayList<String>(); + List<String> expectedDatabases = new ArrayList<>(); expectedDatabases.add(Upgrader.VERSION_DB_NAME); assertEquals(expectedDatabases, databaseNames, "Expectedonly VERSION table in initially empty store after upgrade: "); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org