cleanup
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/76101027 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/76101027 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/76101027 Branch: refs/heads/trunk Commit: 76101027eef2fa097b2b55cdc037a3bfbe235753 Parents: 45a6373 Author: Jonathan Ellis <jbel...@apache.org> Authored: Wed Mar 26 10:50:41 2014 -0500 Committer: Jonathan Ellis <jbel...@apache.org> Committed: Wed Mar 26 10:50:41 2014 -0500 ---------------------------------------------------------------------- .../cassandra/service/StorageService.java | 249 ++++++++++--------- 1 file changed, 130 insertions(+), 119 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/76101027/src/java/org/apache/cassandra/service/StorageService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java index 17bd514..042e2bc 100644 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@ -183,9 +183,9 @@ public class StorageService extends NotificationBroadcasterSupport implements IE private static final AtomicInteger nextRepairCommand = new AtomicInteger(); - private static ScheduledRangeTransferExecutorService rangeXferExecutor = new ScheduledRangeTransferExecutorService(); + private static final ScheduledRangeTransferExecutorService rangeXferExecutor = new ScheduledRangeTransferExecutorService(); - private final List<IEndpointLifecycleSubscriber> lifecycleSubscribers = new CopyOnWriteArrayList<IEndpointLifecycleSubscriber>(); + private final List<IEndpointLifecycleSubscriber> lifecycleSubscribers = new CopyOnWriteArrayList<>(); private static final BackgroundActivityMonitor bgMonitor = new BackgroundActivityMonitor(); @@ -564,7 +564,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE Thread drainOnShutdown = new Thread(new WrappedRunnable() { @Override - public void runMayThrow() throws ExecutionException, InterruptedException, IOException + public void runMayThrow() throws InterruptedException { ExecutorService counterMutationStage = StageManager.getStage(Stage.COUNTER_MUTATION); ExecutorService mutationStage = StageManager.getStage(Stage.MUTATION); @@ -584,7 +584,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE mutationStage.awaitTermination(3600, TimeUnit.SECONDS); StorageProxy.instance.verifyNoHintsInProgress(); - List<Future<?>> flushes = new ArrayList<Future<?>>(); + List<Future<?>> flushes = new ArrayList<>(); for (Keyspace keyspace : Keyspace.all()) { KSMetaData ksm = Schema.instance.getKSMetaData(keyspace.getName()); @@ -634,7 +634,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE joined = true; Collection<Token> tokens = null; - Map<ApplicationState, VersionedValue> appStates = new HashMap<ApplicationState, VersionedValue>(); + Map<ApplicationState, VersionedValue> appStates = new HashMap<>(); if (DatabaseDescriptor.getReplaceTokens().size() > 0 || DatabaseDescriptor.getReplaceNode() != null) throw new RuntimeException("Replace method removed; use cassandra.replace_address instead"); @@ -684,7 +684,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE // // We attempted to replace this with a schema-presence check, but you need a meaningful sleep // to get schema info from gossip which defeats the purpose. See CASSANDRA-4427 for the gory details. - Set<InetAddress> current = new HashSet<InetAddress>(); + Set<InetAddress> current = new HashSet<>(); logger.debug("Bootstrap variables: {} {} {} {}", DatabaseDescriptor.isAutoBootstrap(), SystemKeyspace.bootstrapInProgress(), @@ -800,7 +800,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE } else { - tokens = new ArrayList<Token>(initialTokens.size()); + tokens = new ArrayList<>(initialTokens.size()); for (String token : initialTokens) tokens.add(getPartitioner().getTokenFactory().fromString(token)); logger.info("Saved tokens not found. Using configuration value: {}", tokens); @@ -1054,7 +1054,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE public Map<List<String>, List<String>> getRangeToEndpointMap(String keyspace) { /* All the ranges for the tokens */ - Map<List<String>, List<String>> map = new HashMap<List<String>, List<String>>(); + Map<List<String>, List<String>> map = new HashMap<>(); for (Map.Entry<Range<Token>,List<InetAddress>> entry : getRangeToAddressMap(keyspace).entrySet()) { map.put(entry.getKey().asList(), stringify(entry.getValue())); @@ -1085,10 +1085,10 @@ public class StorageService extends NotificationBroadcasterSupport implements IE public Map<List<String>, List<String>> getRangeToRpcaddressMap(String keyspace) { /* All the ranges for the tokens */ - Map<List<String>, List<String>> map = new HashMap<List<String>, List<String>>(); + Map<List<String>, List<String>> map = new HashMap<>(); for (Map.Entry<Range<Token>, List<InetAddress>> entry : getRangeToAddressMap(keyspace).entrySet()) { - List<String> rpcaddrs = new ArrayList<String>(entry.getValue().size()); + List<String> rpcaddrs = new ArrayList<>(entry.getValue().size()); for (InetAddress endpoint: entry.getValue()) { rpcaddrs.add(getRpcaddress(endpoint)); @@ -1105,10 +1105,10 @@ public class StorageService extends NotificationBroadcasterSupport implements IE if (keyspace == null) keyspace = Schema.instance.getNonSystemKeyspaces().get(0); - Map<List<String>, List<String>> map = new HashMap<List<String>, List<String>>(); + Map<List<String>, List<String>> map = new HashMap<>(); for (Map.Entry<Range<Token>, Collection<InetAddress>> entry : tokenMetadata.getPendingRanges(keyspace).entrySet()) { - List<InetAddress> l = new ArrayList<InetAddress>(entry.getValue()); + List<InetAddress> l = new ArrayList<>(entry.getValue()); map.put(entry.getKey().asList(), stringify(l)); } return map; @@ -1189,7 +1189,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE { throw new IOException(e.getMessage()); } - List<String> result = new ArrayList<String>(tokenRanges.size()); + List<String> result = new ArrayList<>(tokenRanges.size()); for (TokenRange tokenRange : tokenRanges) result.add(tokenRange.toString()); @@ -1227,7 +1227,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE if (keyspace == null || Keyspace.open(keyspace).getReplicationStrategy() instanceof LocalStrategy) throw new InvalidRequestException("There is no ring for the keyspace: " + keyspace); - List<TokenRange> ranges = new ArrayList<TokenRange>(); + List<TokenRange> ranges = new ArrayList<>(); Token.TokenFactory tf = getPartitioner().getTokenFactory(); Map<Range<Token>, List<InetAddress>> rangeToAddressMap = @@ -1239,9 +1239,9 @@ public class StorageService extends NotificationBroadcasterSupport implements IE { Range range = entry.getKey(); List<InetAddress> addresses = entry.getValue(); - List<String> endpoints = new ArrayList<String>(addresses.size()); - List<String> rpc_endpoints = new ArrayList<String>(addresses.size()); - List<EndpointDetails> epDetails = new ArrayList<EndpointDetails>(addresses.size()); + List<String> endpoints = new ArrayList<>(addresses.size()); + List<String> rpc_endpoints = new ArrayList<>(addresses.size()); + List<EndpointDetails> epDetails = new ArrayList<>(addresses.size()); for (InetAddress endpoint : addresses) { @@ -1270,8 +1270,8 @@ public class StorageService extends NotificationBroadcasterSupport implements IE { Map<Token, InetAddress> mapInetAddress = tokenMetadata.getNormalAndBootstrappingTokenToEndpointMap(); // in order to preserve tokens in ascending order, we use LinkedHashMap here - Map<String, String> mapString = new LinkedHashMap<String, String>(mapInetAddress.size()); - List<Token> tokens = new ArrayList<Token>(mapInetAddress.keySet()); + Map<String, String> mapString = new LinkedHashMap<>(mapInetAddress.size()); + List<Token> tokens = new ArrayList<>(mapInetAddress.keySet()); Collections.sort(tokens); for (Token token : tokens) { @@ -1287,7 +1287,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE public Map<String, String> getHostIdMap() { - Map<String, String> mapOut = new HashMap<String, String>(); + Map<String, String> mapOut = new HashMap<>(); for (Map.Entry<InetAddress, UUID> entry : getTokenMetadata().getEndpointToHostIdMapForReading().entrySet()) mapOut.put(entry.getKey().getHostAddress(), entry.getValue().toString()); return mapOut; @@ -1301,7 +1301,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE */ private Map<Range<Token>, List<InetAddress>> constructRangeToEndpointMap(String keyspace, List<Range<Token>> ranges) { - Map<Range<Token>, List<InetAddress>> rangeToEndpointMap = new HashMap<Range<Token>, List<InetAddress>>(); + Map<Range<Token>, List<InetAddress>> rangeToEndpointMap = new HashMap<>(); for (Range<Token> range : ranges) { rangeToEndpointMap.put(range, Keyspace.open(keyspace).getReplicationStrategy().getNaturalEndpoints(range.right)); @@ -1358,20 +1358,31 @@ public class StorageService extends NotificationBroadcasterSupport implements IE String moveName = pieces[0]; - if (moveName.equals(VersionedValue.STATUS_BOOTSTRAPPING)) - handleStateBootstrap(endpoint); - else if (moveName.equals(VersionedValue.STATUS_NORMAL)) - handleStateNormal(endpoint); - else if (moveName.equals(VersionedValue.REMOVING_TOKEN) || moveName.equals(VersionedValue.REMOVED_TOKEN)) - handleStateRemoving(endpoint, pieces); - else if (moveName.equals(VersionedValue.STATUS_LEAVING)) - handleStateLeaving(endpoint); - else if (moveName.equals(VersionedValue.STATUS_LEFT)) - handleStateLeft(endpoint, pieces); - else if (moveName.equals(VersionedValue.STATUS_MOVING)) - handleStateMoving(endpoint, pieces); - else if (moveName.equals(VersionedValue.STATUS_RELOCATING)) - handleStateRelocating(endpoint, pieces); + switch (moveName) + { + case VersionedValue.STATUS_BOOTSTRAPPING: + handleStateBootstrap(endpoint); + break; + case VersionedValue.STATUS_NORMAL: + handleStateNormal(endpoint); + break; + case VersionedValue.REMOVING_TOKEN: + case VersionedValue.REMOVED_TOKEN: + handleStateRemoving(endpoint, pieces); + break; + case VersionedValue.STATUS_LEAVING: + handleStateLeaving(endpoint); + break; + case VersionedValue.STATUS_LEFT: + handleStateLeft(endpoint, pieces); + break; + case VersionedValue.STATUS_MOVING: + handleStateMoving(endpoint, pieces); + break; + case VersionedValue.STATUS_RELOCATING: + handleStateRelocating(endpoint, pieces); + break; + } } else { @@ -1477,10 +1488,10 @@ public class StorageService extends NotificationBroadcasterSupport implements IE tokens = getTokensFor(endpoint); - Set<Token> tokensToUpdateInMetadata = new HashSet<Token>(); - Set<Token> tokensToUpdateInSystemKeyspace = new HashSet<Token>(); - Set<Token> localTokensToRemove = new HashSet<Token>(); - Set<InetAddress> endpointsToRemove = new HashSet<InetAddress>(); + Set<Token> tokensToUpdateInMetadata = new HashSet<>(); + Set<Token> tokensToUpdateInSystemKeyspace = new HashSet<>(); + Set<Token> localTokensToRemove = new HashSet<>(); + Set<InetAddress> endpointsToRemove = new HashSet<>(); if (logger.isDebugEnabled()) @@ -1704,7 +1715,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE { assert pieces.length >= 2; - List<Token> tokens = new ArrayList<Token>(pieces.length - 1); + List<Token> tokens = new ArrayList<>(pieces.length - 1); for (String tStr : Arrays.copyOfRange(pieces, 1, pieces.length)) tokens.add(getPartitioner().getTokenFactory().fromString(tStr)); @@ -1889,12 +1900,12 @@ public class StorageService extends NotificationBroadcasterSupport implements IE { Multimap<String, Map.Entry<InetAddress, Collection<Range<Token>>>> rangesToFetch = HashMultimap.create(); - final InetAddress myAddress = FBUtilities.getBroadcastAddress(); + InetAddress myAddress = FBUtilities.getBroadcastAddress(); for (String keyspaceName : Schema.instance.getNonSystemKeyspaces()) { Multimap<Range<Token>, InetAddress> changedRanges = getChangedRangesForLeaving(keyspaceName, endpoint); - Set<Range<Token>> myNewRanges = new HashSet<Range<Token>>(); + Set<Range<Token>> myNewRanges = new HashSet<>(); for (Map.Entry<Range<Token>, InetAddress> entry : changedRanges.entries()) { if (entry.getValue().equals(myAddress)) @@ -1908,11 +1919,11 @@ public class StorageService extends NotificationBroadcasterSupport implements IE } StreamPlan stream = new StreamPlan("Restore replica count"); - for (final String keyspaceName : rangesToFetch.keySet()) + for (String keyspaceName : rangesToFetch.keySet()) { for (Map.Entry<InetAddress, Collection<Range<Token>>> entry : rangesToFetch.get(keyspaceName)) { - final InetAddress source = entry.getKey(); + InetAddress source = entry.getKey(); Collection<Range<Token>> ranges = entry.getValue(); if (logger.isDebugEnabled()) logger.debug("Requesting from {} ranges {}", source, StringUtils.join(ranges, ", ")); @@ -1945,7 +1956,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE if (logger.isDebugEnabled()) logger.debug("Node {} ranges [{}]", endpoint, StringUtils.join(ranges, ", ")); - Map<Range<Token>, List<InetAddress>> currentReplicaEndpoints = new HashMap<Range<Token>, List<InetAddress>>(); + Map<Range<Token>, List<InetAddress>> currentReplicaEndpoints = new HashMap<>(); // Find (for each range) all nodes that store replicas for these ranges as well for (Range<Token> range : ranges) @@ -2051,7 +2062,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE public Map<String, String> getLoadMap() { - Map<String, String> map = new HashMap<String, String>(); + Map<String, String> map = new HashMap<>(); for (Map.Entry<InetAddress,Double> entry : LoadBroadcaster.instance.getLoadInfo().entrySet()) { map.put(entry.getKey().getHostAddress(), FileUtils.stringifyFileSize(entry.getValue())); @@ -2087,7 +2098,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE private List<String> getTokens(InetAddress endpoint) { - List<String> strTokens = new ArrayList<String>(); + List<String> strTokens = new ArrayList<>(); for (Token tok : getTokenMetadata().getTokens(endpoint)) strTokens.add(tok.toString()); return strTokens; @@ -2110,7 +2121,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE public List<String> getMovingNodes() { - List<String> endpoints = new ArrayList<String>(); + List<String> endpoints = new ArrayList<>(); for (Pair<Token, InetAddress> node : tokenMetadata.getMovingEndpoints()) { @@ -2155,7 +2166,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE private List<String> stringify(Iterable<InetAddress> endpoints) { - List<String> stringEndpoints = new ArrayList<String>(); + List<String> stringEndpoints = new ArrayList<>(); for (InetAddress ep : endpoints) { stringEndpoints.add(ep.getHostAddress()); @@ -2235,7 +2246,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE } else { - ArrayList<Keyspace> t = new ArrayList<Keyspace>(keyspaceNames.length); + ArrayList<Keyspace> t = new ArrayList<>(keyspaceNames.length); for (String keyspaceName : keyspaceNames) t.add(getValidKeyspace(keyspaceName)); keyspaces = t; @@ -2319,15 +2330,15 @@ public class StorageService extends NotificationBroadcasterSupport implements IE public Map<String, TabularData> getSnapshotDetails() { - final Map<String, TabularData> snapshotMap = new HashMap<>(); - for (final Keyspace keyspace : Keyspace.all()) + Map<String, TabularData> snapshotMap = new HashMap<>(); + for (Keyspace keyspace : Keyspace.all()) { if (Keyspace.SYSTEM_KS.equals(keyspace.getName())) continue; - for (final ColumnFamilyStore cfStore : keyspace.getColumnFamilyStores()) + for (ColumnFamilyStore cfStore : keyspace.getColumnFamilyStores()) { - for (final Map.Entry<String, Pair<Long,Long>> snapshotDetail : cfStore.getSnapshotDetails().entrySet()) + for (Map.Entry<String, Pair<Long,Long>> snapshotDetail : cfStore.getSnapshotDetails().entrySet()) { TabularDataSupport data = (TabularDataSupport)snapshotMap.get(snapshotDetail.getKey()); if (data == null) @@ -2346,12 +2357,12 @@ public class StorageService extends NotificationBroadcasterSupport implements IE public long trueSnapshotsSize() { long total = 0; - for (final Keyspace keyspace : Keyspace.all()) + for (Keyspace keyspace : Keyspace.all()) { if (Keyspace.SYSTEM_KS.equals(keyspace.getName())) continue; - for (final ColumnFamilyStore cfStore : keyspace.getColumnFamilyStores()) + for (ColumnFamilyStore cfStore : keyspace.getColumnFamilyStores()) { total += cfStore.trueSnapshotsSize(); } @@ -2419,7 +2430,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE } if (idxName != null) { - Collection< SecondaryIndex > indexes = cfStore.indexManager.getIndexesByNames(new HashSet<String>(Arrays.asList(cfName))); + Collection< SecondaryIndex > indexes = cfStore.indexManager.getIndexesByNames(new HashSet<>(Arrays.asList(cfName))); if (indexes.isEmpty()) logger.warn(String.format("Invalid column family index specified: %s/%s. Proceeding with others.", baseCfName, idxName)); else @@ -2449,7 +2460,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE * @param columnFamilies * @throws IOException */ - public void forceKeyspaceFlush(final String keyspaceName, final String... columnFamilies) throws IOException + public void forceKeyspaceFlush(String keyspaceName, String... columnFamilies) throws IOException { for (ColumnFamilyStore cfStore : getValidColumnFamilies(true, false, keyspaceName, columnFamilies)) { @@ -2472,19 +2483,19 @@ public class StorageService extends NotificationBroadcasterSupport implements IE sendNotification(jmxNotification); } - public int forceRepairAsync(final String keyspace, final boolean isSequential, final Collection<String> dataCenters, Collection<String> hosts, final boolean primaryRange, final boolean fullRepair, final String... columnFamilies) throws IOException + public int forceRepairAsync(String keyspace, boolean isSequential, Collection<String> dataCenters, Collection<String> hosts, boolean primaryRange, boolean fullRepair, String... columnFamilies) throws IOException { - final Collection<Range<Token>> ranges = primaryRange ? getLocalPrimaryRanges(keyspace) : getLocalRanges(keyspace); + Collection<Range<Token>> ranges = primaryRange ? getLocalPrimaryRanges(keyspace) : getLocalRanges(keyspace); return forceRepairAsync(keyspace, isSequential, dataCenters, hosts, ranges, fullRepair, columnFamilies); } - public int forceRepairAsync(final String keyspace, final boolean isSequential, final Collection<String> dataCenters, Collection<String> hosts, final Collection<Range<Token>> ranges, final boolean fullRepair, final String... columnFamilies) throws IOException + public int forceRepairAsync(String keyspace, boolean isSequential, Collection<String> dataCenters, Collection<String> hosts, Collection<Range<Token>> ranges, boolean fullRepair, String... columnFamilies) { if (Keyspace.SYSTEM_KS.equals(keyspace) || ranges.isEmpty()) return 0; - final int cmd = nextRepairCommand.incrementAndGet(); + int cmd = nextRepairCommand.incrementAndGet(); if (ranges.size() > 0) { new Thread(createRepairTask(cmd, keyspace, ranges, isSequential, dataCenters, hosts, fullRepair, columnFamilies)).start(); @@ -2492,9 +2503,9 @@ public class StorageService extends NotificationBroadcasterSupport implements IE return cmd; } - public int forceRepairAsync(final String keyspace, final boolean isSequential, final boolean isLocal, final boolean primaryRange, final boolean fullRepair, final String... columnFamilies) + public int forceRepairAsync(String keyspace, boolean isSequential, boolean isLocal, boolean primaryRange, boolean fullRepair, String... columnFamilies) { - final Collection<Range<Token>> ranges = primaryRange ? getLocalPrimaryRanges(keyspace) : getLocalRanges(keyspace); + Collection<Range<Token>> ranges = primaryRange ? getLocalPrimaryRanges(keyspace) : getLocalRanges(keyspace); return forceRepairAsync(keyspace, isSequential, isLocal, ranges, fullRepair, columnFamilies); } @@ -2503,7 +2514,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE if (Keyspace.SYSTEM_KS.equals(keyspace) || ranges.isEmpty()) return 0; - final int cmd = nextRepairCommand.incrementAndGet(); + int cmd = nextRepairCommand.incrementAndGet(); if (!FBUtilities.isUnix() && isSequential) { logger.warn("Snapshot-based repair is not yet supported on Windows. Reverting to parallel repair."); @@ -2513,33 +2524,33 @@ public class StorageService extends NotificationBroadcasterSupport implements IE return cmd; } - public int forceRepairRangeAsync(String beginToken, String endToken, final String keyspaceName, boolean isSequential, Collection<String> dataCenters, Collection<String> hosts, boolean fullRepair, final String... columnFamilies) throws IOException + public int forceRepairRangeAsync(String beginToken, String endToken, String keyspaceName, boolean isSequential, Collection<String> dataCenters, Collection<String> hosts, boolean fullRepair, String... columnFamilies) throws IOException { Token parsedBeginToken = getPartitioner().getTokenFactory().fromString(beginToken); Token parsedEndToken = getPartitioner().getTokenFactory().fromString(endToken); logger.info("starting user-requested repair of range ({}, {}] for keyspace {} and column families {}", parsedBeginToken, parsedEndToken, keyspaceName, columnFamilies); - return forceRepairAsync(keyspaceName, isSequential, dataCenters, hosts, Collections.singleton(new Range<Token>(parsedBeginToken, parsedEndToken)), fullRepair, columnFamilies); + return forceRepairAsync(keyspaceName, isSequential, dataCenters, hosts, Collections.singleton(new Range<>(parsedBeginToken, parsedEndToken)), fullRepair, columnFamilies); } - public int forceRepairRangeAsync(String beginToken, String endToken, final String keyspaceName, boolean isSequential, boolean isLocal, boolean fullRepair, final String... columnFamilies) + public int forceRepairRangeAsync(String beginToken, String endToken, String keyspaceName, boolean isSequential, boolean isLocal, boolean fullRepair, String... columnFamilies) { Token parsedBeginToken = getPartitioner().getTokenFactory().fromString(beginToken); Token parsedEndToken = getPartitioner().getTokenFactory().fromString(endToken); logger.info("starting user-requested repair of range ({}, {}] for keyspace {} and column families {}", parsedBeginToken, parsedEndToken, keyspaceName, columnFamilies); - return forceRepairAsync(keyspaceName, isSequential, isLocal, Collections.singleton(new Range<Token>(parsedBeginToken, parsedEndToken)), fullRepair, columnFamilies); + return forceRepairAsync(keyspaceName, isSequential, isLocal, Collections.singleton(new Range<>(parsedBeginToken, parsedEndToken)), fullRepair, columnFamilies); } - private FutureTask<Object> createRepairTask(final int cmd, - final String keyspace, - final Collection<Range<Token>> ranges, - final boolean isSequential, - final boolean isLocal, - final boolean fullRepair, - final String... columnFamilies) + private FutureTask<Object> createRepairTask(int cmd, + String keyspace, + Collection<Range<Token>> ranges, + boolean isSequential, + boolean isLocal, + boolean fullRepair, + String... columnFamilies) { Set<String> dataCenters = null; if (isLocal) @@ -2657,9 +2668,9 @@ public class StorageService extends NotificationBroadcasterSupport implements IE } - public RepairFuture forceKeyspaceRepair(final UUID parentRepairSession, - final Range<Token> range, - final String keyspaceName, + public RepairFuture forceKeyspaceRepair(UUID parentRepairSession, + Range<Token> range, + String keyspaceName, boolean isSequential, Set<InetAddress> endpoints, String ... columnFamilies) throws IOException @@ -2698,13 +2709,13 @@ public class StorageService extends NotificationBroadcasterSupport implements IE public Collection<Range<Token>> getPrimaryRangesForEndpoint(String keyspace, InetAddress ep) { AbstractReplicationStrategy strategy = Keyspace.open(keyspace).getReplicationStrategy(); - Collection<Range<Token>> primaryRanges = new HashSet<Range<Token>>(); + Collection<Range<Token>> primaryRanges = new HashSet<>(); TokenMetadata metadata = tokenMetadata.cloneOnlyTokenMap(); for (Token token : metadata.sortedTokens()) { List<InetAddress> endpoints = strategy.calculateNaturalEndpoints(token, metadata); if (endpoints.size() > 0 && endpoints.get(0).equals(ep)) - primaryRanges.add(new Range<Token>(metadata.getPredecessor(token), token)); + primaryRanges.add(new Range<>(metadata.getPredecessor(token), token)); } return primaryRanges; } @@ -2750,13 +2761,13 @@ public class StorageService extends NotificationBroadcasterSupport implements IE if (sortedTokens.isEmpty()) return Collections.emptyList(); int size = sortedTokens.size(); - List<Range<Token>> ranges = new ArrayList<Range<Token>>(size + 1); + List<Range<Token>> ranges = new ArrayList<>(size + 1); for (int i = 1; i < size; ++i) { - Range<Token> range = new Range<Token>(sortedTokens.get(i - 1), sortedTokens.get(i)); + Range<Token> range = new Range<>(sortedTokens.get(i - 1), sortedTokens.get(i)); ranges.add(range); } - Range<Token> range = new Range<Token>(sortedTokens.get(size - 1), sortedTokens.get(0)); + Range<Token> range = new Range<>(sortedTokens.get(size - 1), sortedTokens.get(0)); ranges.add(range); return ranges; @@ -2811,7 +2822,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE public List<InetAddress> getLiveNaturalEndpoints(Keyspace keyspace, RingPosition pos) { List<InetAddress> endpoints = keyspace.getReplicationStrategy().getNaturalEndpoints(pos); - List<InetAddress> liveEps = new ArrayList<InetAddress>(endpoints.size()); + List<InetAddress> liveEps = new ArrayList<>(endpoints.size()); for (InetAddress endpoint : endpoints) { @@ -2843,9 +2854,9 @@ public class StorageService extends NotificationBroadcasterSupport implements IE long totalRowCountEstimate = cfs.estimatedKeysForRange(range); // splitCount should be much smaller than number of key samples, to avoid huge sampling error - final int minSamplesPerSplit = 4; - final int maxSplitCount = keys.size() / minSamplesPerSplit + 1; - final int splitCount = Math.max(1, Math.min(maxSplitCount, (int)(totalRowCountEstimate / keysPerSplit))); + int minSamplesPerSplit = 4; + int maxSplitCount = keys.size() / minSamplesPerSplit + 1; + int splitCount = Math.max(1, Math.min(maxSplitCount, (int)(totalRowCountEstimate / keysPerSplit))); List<Token> tokens = keysToTokens(range, keys); return getSplits(tokens, splitCount, cfs); @@ -2853,7 +2864,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE private List<Pair<Range<Token>, Long>> getSplits(List<Token> tokens, int splitCount, ColumnFamilyStore cfs) { - final double step = (double) (tokens.size() - 1) / splitCount; + double step = (double) (tokens.size() - 1) / splitCount; Token prevToken = tokens.get(0); List<Pair<Range<Token>, Long>> splits = Lists.newArrayListWithExpectedSize(splitCount); for (int i = 1; i <= splitCount; i++) @@ -2879,7 +2890,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE private List<DecoratedKey> keySamples(Iterable<ColumnFamilyStore> cfses, Range<Token> range) { - List<DecoratedKey> keys = new ArrayList<DecoratedKey>(); + List<DecoratedKey> keys = new ArrayList<>(); for (ColumnFamilyStore cfs : cfses) Iterables.addAll(keys, cfs.keySamples(range)); FBUtilities.sortSampledKeys(keys, range); @@ -2943,11 +2954,11 @@ public class StorageService extends NotificationBroadcasterSupport implements IE Uninterruptibles.sleepUninterruptibly(delay, TimeUnit.MILLISECONDS); } - private void unbootstrap(final Runnable onFinish) + private void unbootstrap(Runnable onFinish) { - Map<String, Multimap<Range<Token>, InetAddress>> rangesToStream = new HashMap<String, Multimap<Range<Token>, InetAddress>>(); + Map<String, Multimap<Range<Token>, InetAddress>> rangesToStream = new HashMap<>(); - for (final String keyspaceName : Schema.instance.getNonSystemKeyspaces()) + for (String keyspaceName : Schema.instance.getNonSystemKeyspaces()) { Multimap<Range<Token>, InetAddress> rangesMM = getChangedRangesForLeaving(keyspaceName, FBUtilities.getBroadcastAddress()); @@ -2984,7 +2995,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE return Futures.immediateFuture(null); // gather all live nodes in the cluster that aren't also leaving - List<InetAddress> candidates = new ArrayList<InetAddress>(StorageService.instance.getTokenMetadata().cloneAfterAllLeft().getAllEndpoints()); + List<InetAddress> candidates = new ArrayList<>(StorageService.instance.getTokenMetadata().cloneAfterAllLeft().getAllEndpoints()); candidates.remove(FBUtilities.getBroadcastAddress()); for (Iterator<InetAddress> iter = candidates.iterator(); iter.hasNext(); ) { @@ -3006,7 +3017,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE // stream all hints -- range list will be a singleton of "the entire ring" Token token = StorageService.getPartitioner().getMinimumToken(); - List<Range<Token>> ranges = Collections.singletonList(new Range<Token>(token, token)); + List<Range<Token>> ranges = Collections.singletonList(new Range<>(token, token)); return new StreamPlan("Hints").transferRanges(hintsDestinationHost, Keyspace.SYSTEM_KS, @@ -3097,7 +3108,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE private class RangeRelocator { - private StreamPlan streamPlan = new StreamPlan("Bootstrap"); + private final StreamPlan streamPlan = new StreamPlan("Bootstrap"); private RangeRelocator(Collection<Token> tokens, List<String> keyspaceNames) { @@ -3189,7 +3200,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE public void relocate(Collection<String> srcTokens) throws IOException { - List<Token> tokens = new ArrayList<Token>(srcTokens.size()); + List<Token> tokens = new ArrayList<>(srcTokens.size()); try { for (String srcT : srcTokens) @@ -3213,7 +3224,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE assert srcTokens != null; InetAddress localAddress = FBUtilities.getBroadcastAddress(); Collection<Token> localTokens = getTokenMetadata().getTokens(localAddress); - Set<Token> tokens = new HashSet<Token>(srcTokens); + Set<Token> tokens = new HashSet<>(srcTokens); Iterator<Token> it = tokens.iterator(); while (it.hasNext()) @@ -3469,7 +3480,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE totalCFs += keyspace.getColumnFamilyStores().size(); remainingCFs = totalCFs; // flush - List<Future<?>> flushes = new ArrayList<Future<?>>(); + List<Future<?>> flushes = new ArrayList<>(); for (Keyspace keyspace : Keyspace.nonSystem()) { for (ColumnFamilyStore cfs : keyspace.getColumnFamilyStores()) @@ -3543,7 +3554,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE List<Token> sortedTokens = tokenMetadata.sortedTokens(); // describeOwnership returns tokens in an unspecified order, let's re-order them Map<Token, Float> tokenMap = new TreeMap<Token, Float>(getPartitioner().describeOwnership(sortedTokens)); - Map<InetAddress, Float> nodeMap = new LinkedHashMap<InetAddress, Float>(); + Map<InetAddress, Float> nodeMap = new LinkedHashMap<>(); for (Map.Entry<Token, Float> entry : tokenMap.entrySet()) { InetAddress endpoint = tokenMetadata.getEndpoint(entry.getKey()); @@ -3577,9 +3588,9 @@ public class StorageService extends NotificationBroadcasterSupport implements IE if (keyspace == null) keyspace = Schema.instance.getNonSystemKeyspaces().get(0); - Collection<Collection<InetAddress>> endpointsGroupedByDc = new ArrayList<Collection<InetAddress>>(); + Collection<Collection<InetAddress>> endpointsGroupedByDc = new ArrayList<>(); // mapping of dc's to nodes, use sorted map so that we get dcs sorted - SortedMap<String, Collection<InetAddress>> sortedDcsToEndpoints = new TreeMap<String, Collection<InetAddress>>(); + SortedMap<String, Collection<InetAddress>> sortedDcsToEndpoints = new TreeMap<>(); sortedDcsToEndpoints.putAll(metadata.getTopology().getDatacenterEndpoints().asMap()); for (Collection<InetAddress> endpoints : sortedDcsToEndpoints.values()) endpointsGroupedByDc.add(endpoints); @@ -3625,7 +3636,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE public List<String> getKeyspaces() { - List<String> keyspaceNamesList = new ArrayList<String>(Schema.instance.getKeyspaces()); + List<String> keyspaceNamesList = new ArrayList<>(Schema.instance.getKeyspaces()); return Collections.unmodifiableList(keyspaceNamesList); } @@ -3668,10 +3679,10 @@ public class StorageService extends NotificationBroadcasterSupport implements IE * @param rangesToStreamByKeyspace keyspaces and data ranges with endpoints included for each * @return async Future for whether stream was success */ - private Future<StreamState> streamRanges(final Map<String, Multimap<Range<Token>, InetAddress>> rangesToStreamByKeyspace) + private Future<StreamState> streamRanges(Map<String, Multimap<Range<Token>, InetAddress>> rangesToStreamByKeyspace) { // First, we build a list of ranges to stream to each host, per table - final Map<String, Map<InetAddress, List<Range<Token>>>> sessionsToStreamByKeyspace = new HashMap<String, Map<InetAddress, List<Range<Token>>>>(); + Map<String, Map<InetAddress, List<Range<Token>>>> sessionsToStreamByKeyspace = new HashMap<>(); for (Map.Entry<String, Multimap<Range<Token>, InetAddress>> entry : rangesToStreamByKeyspace.entrySet()) { String keyspace = entry.getKey(); @@ -3680,16 +3691,16 @@ public class StorageService extends NotificationBroadcasterSupport implements IE if (rangesWithEndpoints.isEmpty()) continue; - Map<InetAddress, List<Range<Token>>> rangesPerEndpoint = new HashMap<InetAddress, List<Range<Token>>>(); - for (final Map.Entry<Range<Token>, InetAddress> endPointEntry : rangesWithEndpoints.entries()) + Map<InetAddress, List<Range<Token>>> rangesPerEndpoint = new HashMap<>(); + for (Map.Entry<Range<Token>, InetAddress> endPointEntry : rangesWithEndpoints.entries()) { - final Range<Token> range = endPointEntry.getKey(); - final InetAddress endpoint = endPointEntry.getValue(); + Range<Token> range = endPointEntry.getKey(); + InetAddress endpoint = endPointEntry.getValue(); List<Range<Token>> curRanges = rangesPerEndpoint.get(endpoint); if (curRanges == null) { - curRanges = new LinkedList<Range<Token>>(); + curRanges = new LinkedList<>(); rangesPerEndpoint.put(endpoint, curRanges); } curRanges.add(range); @@ -3701,13 +3712,13 @@ public class StorageService extends NotificationBroadcasterSupport implements IE StreamPlan streamPlan = new StreamPlan("Unbootstrap"); for (Map.Entry<String, Map<InetAddress, List<Range<Token>>>> entry : sessionsToStreamByKeyspace.entrySet()) { - final String keyspaceName = entry.getKey(); - final Map<InetAddress, List<Range<Token>>> rangesPerEndpoint = entry.getValue(); + String keyspaceName = entry.getKey(); + Map<InetAddress, List<Range<Token>>> rangesPerEndpoint = entry.getValue(); - for (final Map.Entry<InetAddress, List<Range<Token>>> rangesEntry : rangesPerEndpoint.entrySet()) + for (Map.Entry<InetAddress, List<Range<Token>>> rangesEntry : rangesPerEndpoint.entrySet()) { - final List<Range<Token>> ranges = rangesEntry.getValue(); - final InetAddress newEndpoint = rangesEntry.getKey(); + List<Range<Token>> ranges = rangesEntry.getValue(); + InetAddress newEndpoint = rangesEntry.getKey(); // TODO each call to transferRanges re-flushes, this is potentially a lot of waste streamPlan.transferRanges(newEndpoint, keyspaceName, ranges); @@ -3726,8 +3737,8 @@ public class StorageService extends NotificationBroadcasterSupport implements IE */ public Pair<Set<Range<Token>>, Set<Range<Token>>> calculateStreamAndFetchRanges(Collection<Range<Token>> current, Collection<Range<Token>> updated) { - Set<Range<Token>> toStream = new HashSet<Range<Token>>(); - Set<Range<Token>> toFetch = new HashSet<Range<Token>>(); + Set<Range<Token>> toStream = new HashSet<>(); + Set<Range<Token>> toFetch = new HashSet<>(); for (Range r1 : current) @@ -3846,14 +3857,14 @@ public class StorageService extends NotificationBroadcasterSupport implements IE */ public List<String> sampleKeyRange() // do not rename to getter - see CASSANDRA-4452 for details { - List<DecoratedKey> keys = new ArrayList<DecoratedKey>(); + List<DecoratedKey> keys = new ArrayList<>(); for (Keyspace keyspace : Keyspace.nonSystem()) { for (Range<Token> range : getPrimaryRangesForEndpoint(keyspace.getName(), FBUtilities.getBroadcastAddress())) keys.addAll(keySamples(keyspace.getColumnFamilyStores(), range)); } - List<String> sampledKeys = new ArrayList<String>(keys.size()); + List<String> sampledKeys = new ArrayList<>(keys.size()); for (DecoratedKey key : keys) sampledKeys.add(key.getToken().toString()); return sampledKeys;