This is an automated email from the ASF dual-hosted git repository. aleksey pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push: new a8327eb Fix CqlInputFormat regression from the switch to system.size_estimates a8327eb is described below commit a8327eb8868c8d9d03c253a88509ce64d2ac227b Author: David Capwell <dcapw...@gmail.com> AuthorDate: Thu Mar 12 10:29:37 2020 -0700 Fix CqlInputFormat regression from the switch to system.size_estimates patch by David Capwell; reviewed by Aleksey Yeschenko and Brandon Williams for CASSANDRA-15637 --- CHANGES.txt | 1 + .../apache/cassandra/db/SizeEstimatesRecorder.java | 48 +++- .../org/apache/cassandra/db/SystemKeyspace.java | 79 +++++- .../cassandra/hadoop/cql3/CqlClientHelper.java | 91 +++++++ .../cassandra/hadoop/cql3/CqlInputFormat.java | 298 ++++++++++++++++----- .../LimitedLocalNodeFirstLocalBalancingPolicy.java | 18 +- .../apache/cassandra/service/StorageService.java | 30 ++- .../service/StorageServiceServerTest.java | 39 +++ 8 files changed, 508 insertions(+), 96 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index d7e764c..ab8a7eb 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 4.0-alpha4 + * Fix CqlInputFormat regression from the switch to system.size_estimates (CASSANDRA-15637) * Allow sending Entire SSTables over SSL (CASSANDRA-15740) * Fix CQLSH UTF-8 encoding issue for Python 2/3 compatibility (CASSANDRA-15739) * Fix batch statement preparation when multiple tables and parameters are used (CASSANDRA-15730) diff --git a/src/java/org/apache/cassandra/db/SizeEstimatesRecorder.java b/src/java/org/apache/cassandra/db/SizeEstimatesRecorder.java index d61c297..fe38d64 100644 --- a/src/java/org/apache/cassandra/db/SizeEstimatesRecorder.java +++ b/src/java/org/apache/cassandra/db/SizeEstimatesRecorder.java @@ -70,12 +70,39 @@ public class SizeEstimatesRecorder extends SchemaChangeListener implements Runna for (Keyspace keyspace : Keyspace.nonLocalStrategy()) { - Collection<Range<Token>> localRanges = StorageService.instance.getPrimaryRangesForEndpoint(keyspace.getName(), - FBUtilities.getBroadcastAddressAndPort()); + // In tools the call to describe_splits_ex() used to be coupled with the call to describe_local_ring() so + // most access was for the local primary range; after creating the size_estimates table this was changed + // to be the primary range. + // In a multi-dc setup its not uncommon for the local ring to be offset by 1 for the next DC; example: + // DC1: [0, 10, 20, 30] + // DC2: [1, 11, 21, 31] + // DC3: [2, 12, 22, 32] + // When working with the primary ring we have: + // [0, 1, 2, 10, 11, 12, 20, 21, 22, 30, 31, 32] + // this then leads to primrary ranges with one token in it, which cause the estimates to be less useful. + // Since only one range was published some tools make this assumption; for this reason we can't publish + // all ranges (including the replica ranges) nor can we keep backwards compatability and publish primary + // range. If we publish multiple ranges downstream integrations may start to see duplicate data. + // See CASSANDRA-15637 + Collection<Range<Token>> primaryRanges = StorageService.instance.getPrimaryRanges(keyspace.getName()); + Collection<Range<Token>> localPrimaryRanges = StorageService.instance.getLocalPrimaryRange(); + boolean rangesAreEqual = primaryRanges.equals(localPrimaryRanges); for (ColumnFamilyStore table : keyspace.getColumnFamilyStores()) { long start = System.nanoTime(); - recordSizeEstimates(table, localRanges); + + // compute estimates for primary ranges for backwards compatability + Map<Range<Token>, Pair<Long, Long>> estimates = computeSizeEstimates(table, primaryRanges); + SystemKeyspace.updateSizeEstimates(table.metadata.keyspace, table.metadata.name, estimates); + SystemKeyspace.updateTableEstimates(table.metadata.keyspace, table.metadata.name, SystemKeyspace.TABLE_ESTIMATES_TYPE_PRIMARY, estimates); + + if (!rangesAreEqual) + { + // compute estimate for local primary range + estimates = computeSizeEstimates(table, localPrimaryRanges); + } + SystemKeyspace.updateTableEstimates(table.metadata.keyspace, table.metadata.name, SystemKeyspace.TABLE_ESTIMATES_TYPE_LOCAL_PRIMARY, estimates); + long passed = System.nanoTime() - start; if (logger.isTraceEnabled()) logger.trace("Spent {} milliseconds on estimating {}.{} size", @@ -87,11 +114,11 @@ public class SizeEstimatesRecorder extends SchemaChangeListener implements Runna } @SuppressWarnings("resource") - private void recordSizeEstimates(ColumnFamilyStore table, Collection<Range<Token>> localRanges) + private static Map<Range<Token>, Pair<Long, Long>> computeSizeEstimates(ColumnFamilyStore table, Collection<Range<Token>> ranges) { // for each local primary range, estimate (crudely) mean partition size and partitions count. - Map<Range<Token>, Pair<Long, Long>> estimates = new HashMap<>(localRanges.size()); - for (Range<Token> localRange : localRanges) + Map<Range<Token>, Pair<Long, Long>> estimates = new HashMap<>(ranges.size()); + for (Range<Token> localRange : ranges) { for (Range<Token> unwrappedRange : localRange.unwrap()) { @@ -124,11 +151,10 @@ public class SizeEstimatesRecorder extends SchemaChangeListener implements Runna } } - // atomically update the estimates. - SystemKeyspace.updateSizeEstimates(table.metadata.keyspace, table.metadata.name, estimates); + return estimates; } - private long estimatePartitionsCount(Collection<SSTableReader> sstables, Range<Token> range) + private static long estimatePartitionsCount(Collection<SSTableReader> sstables, Range<Token> range) { long count = 0; for (SSTableReader sstable : sstables) @@ -136,7 +162,7 @@ public class SizeEstimatesRecorder extends SchemaChangeListener implements Runna return count; } - private long estimateMeanPartitionSize(Collection<SSTableReader> sstables) + private static long estimateMeanPartitionSize(Collection<SSTableReader> sstables) { long sum = 0, count = 0; for (SSTableReader sstable : sstables) @@ -151,6 +177,6 @@ public class SizeEstimatesRecorder extends SchemaChangeListener implements Runna @Override public void onDropTable(String keyspace, String table) { - SystemKeyspace.clearSizeEstimates(keyspace, table); + SystemKeyspace.clearEstimates(keyspace, table); } } diff --git a/src/java/org/apache/cassandra/db/SystemKeyspace.java b/src/java/org/apache/cassandra/db/SystemKeyspace.java index c427c8f..eb31f2d 100644 --- a/src/java/org/apache/cassandra/db/SystemKeyspace.java +++ b/src/java/org/apache/cassandra/db/SystemKeyspace.java @@ -100,7 +100,9 @@ public final class SystemKeyspace public static final String PEER_EVENTS_V2 = "peer_events_v2"; public static final String COMPACTION_HISTORY = "compaction_history"; public static final String SSTABLE_ACTIVITY = "sstable_activity"; - public static final String SIZE_ESTIMATES = "size_estimates"; + public static final String TABLE_ESTIMATES = "table_estimates"; + public static final String TABLE_ESTIMATES_TYPE_PRIMARY = "primary"; + public static final String TABLE_ESTIMATES_TYPE_LOCAL_PRIMARY = "local_primary"; public static final String AVAILABLE_RANGES_V2 = "available_ranges_v2"; public static final String TRANSFERRED_RANGES_V2 = "transferred_ranges_v2"; public static final String VIEW_BUILDS_IN_PROGRESS = "view_builds_in_progress"; @@ -112,6 +114,7 @@ public final class SystemKeyspace @Deprecated public static final String LEGACY_PEER_EVENTS = "peer_events"; @Deprecated public static final String LEGACY_TRANSFERRED_RANGES = "transferred_ranges"; @Deprecated public static final String LEGACY_AVAILABLE_RANGES = "available_ranges"; + @Deprecated public static final String LEGACY_SIZE_ESTIMATES = "size_estimates"; public static final TableMetadata Batches = @@ -237,9 +240,10 @@ public final class SystemKeyspace + "PRIMARY KEY ((keyspace_name, columnfamily_name, generation)))") .build(); - private static final TableMetadata SizeEstimates = - parse(SIZE_ESTIMATES, - "per-table primary range size estimates", + @Deprecated + private static final TableMetadata LegacySizeEstimates = + parse(LEGACY_SIZE_ESTIMATES, + "per-table primary range size estimates, table is deprecated in favor of " + TABLE_ESTIMATES, "CREATE TABLE %s (" + "keyspace_name text," + "table_name text," @@ -250,6 +254,20 @@ public final class SystemKeyspace + "PRIMARY KEY ((keyspace_name), table_name, range_start, range_end))") .build(); + private static final TableMetadata TableEstimates = + parse(TABLE_ESTIMATES, + "per-table range size estimates", + "CREATE TABLE %s (" + + "keyspace_name text," + + "table_name text," + + "range_type text," + + "range_start text," + + "range_end text," + + "mean_partition_size bigint," + + "partitions_count bigint," + + "PRIMARY KEY ((keyspace_name), table_name, range_type, range_start, range_end))") + .build(); + private static final TableMetadata AvailableRangesV2 = parse(AVAILABLE_RANGES_V2, "available keyspace/ranges during bootstrap/replace that are ready to be served", @@ -397,7 +415,8 @@ public final class SystemKeyspace LegacyPeerEvents, CompactionHistory, SSTableActivity, - SizeEstimates, + LegacySizeEstimates, + TableEstimates, AvailableRangesV2, LegacyAvailableRanges, TransferredRangesV2, @@ -1248,40 +1267,74 @@ public final class SystemKeyspace public static void updateSizeEstimates(String keyspace, String table, Map<Range<Token>, Pair<Long, Long>> estimates) { long timestamp = FBUtilities.timestampMicros(); - PartitionUpdate.Builder update = new PartitionUpdate.Builder(SizeEstimates, UTF8Type.instance.decompose(keyspace), SizeEstimates.regularAndStaticColumns(), estimates.size()); + int nowInSec = FBUtilities.nowInSeconds(); + PartitionUpdate.Builder update = new PartitionUpdate.Builder(LegacySizeEstimates, UTF8Type.instance.decompose(keyspace), LegacySizeEstimates.regularAndStaticColumns(), estimates.size()); // delete all previous values with a single range tombstone. + update.add(new RangeTombstone(Slice.make(LegacySizeEstimates.comparator, table), new DeletionTime(timestamp - 1, nowInSec))); + + // add a CQL row for each primary token range. + for (Map.Entry<Range<Token>, Pair<Long, Long>> entry : estimates.entrySet()) + { + Range<Token> range = entry.getKey(); + Pair<Long, Long> values = entry.getValue(); + update.add(Rows.simpleBuilder(LegacySizeEstimates, table, range.left.toString(), range.right.toString()) + .timestamp(timestamp) + .add("partitions_count", values.left) + .add("mean_partition_size", values.right) + .build()); + } + new Mutation(update.build()).apply(); + } + + /** + * Writes the current partition count and size estimates into table_estimates + */ + public static void updateTableEstimates(String keyspace, String table, String type, Map<Range<Token>, Pair<Long, Long>> estimates) + { + long timestamp = FBUtilities.timestampMicros(); int nowInSec = FBUtilities.nowInSeconds(); - update.add(new RangeTombstone(Slice.make(SizeEstimates.comparator, table), new DeletionTime(timestamp - 1, nowInSec))); + PartitionUpdate.Builder update = new PartitionUpdate.Builder(TableEstimates, UTF8Type.instance.decompose(keyspace), TableEstimates.regularAndStaticColumns(), estimates.size()); + + // delete all previous values with a single range tombstone. + update.add(new RangeTombstone(Slice.make(TableEstimates.comparator, table, type), new DeletionTime(timestamp - 1, nowInSec))); // add a CQL row for each primary token range. for (Map.Entry<Range<Token>, Pair<Long, Long>> entry : estimates.entrySet()) { Range<Token> range = entry.getKey(); Pair<Long, Long> values = entry.getValue(); - update.add(Rows.simpleBuilder(SizeEstimates, table, range.left.toString(), range.right.toString()) + update.add(Rows.simpleBuilder(TableEstimates, table, type, range.left.toString(), range.right.toString()) .timestamp(timestamp) .add("partitions_count", values.left) .add("mean_partition_size", values.right) .build()); } + new Mutation(update.build()).apply(); } + /** * Clears size estimates for a table (on table drop) */ - public static void clearSizeEstimates(String keyspace, String table) + public static void clearEstimates(String keyspace, String table) { - String cql = format("DELETE FROM %s WHERE keyspace_name = ? AND table_name = ?", SizeEstimates.toString()); + String cqlFormat = "DELETE FROM %s WHERE keyspace_name = ? AND table_name = ?"; + String cql = format(cqlFormat, LegacySizeEstimates.toString()); + executeInternal(cql, keyspace, table); + cql = String.format(cqlFormat, TableEstimates.toString()); executeInternal(cql, keyspace, table); } /** * Clears size estimates for a keyspace (used to manually clean when we miss a keyspace drop) */ - public static void clearSizeEstimates(String keyspace) + public static void clearEstimates(String keyspace) { - String cql = String.format("DELETE FROM %s.%s WHERE keyspace_name = ?", SchemaConstants.SYSTEM_KEYSPACE_NAME, SIZE_ESTIMATES); + String cqlFormat = "DELETE FROM %s WHERE keyspace_name = ?"; + String cql = String.format(cqlFormat, LegacySizeEstimates.toString()); + executeInternal(cql, keyspace); + cql = String.format(cqlFormat, TableEstimates.toString()); executeInternal(cql, keyspace); } @@ -1291,7 +1344,7 @@ public final class SystemKeyspace public static synchronized SetMultimap<String, String> getTablesWithSizeEstimates() { SetMultimap<String, String> keyspaceTableMap = HashMultimap.create(); - String cql = String.format("SELECT keyspace_name, table_name FROM %s.%s", SchemaConstants.SYSTEM_KEYSPACE_NAME, SIZE_ESTIMATES); + String cql = String.format("SELECT keyspace_name, table_name FROM %s", TableEstimates.toString(), TABLE_ESTIMATES_TYPE_PRIMARY); UntypedResultSet rs = executeInternal(cql); for (UntypedResultSet.Row row : rs) { diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlClientHelper.java b/src/java/org/apache/cassandra/hadoop/cql3/CqlClientHelper.java new file mode 100644 index 0000000..d154243 --- /dev/null +++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlClientHelper.java @@ -0,0 +1,91 @@ +package org.apache.cassandra.hadoop.cql3; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; + +import com.datastax.driver.core.Host; +import com.datastax.driver.core.Metadata; +import com.datastax.driver.core.Token; +import com.datastax.driver.core.TokenRange; + +public class CqlClientHelper +{ + private CqlClientHelper() + { + } + + public static Map<TokenRange, List<Host>> getLocalPrimaryRangeForDC(String keyspace, Metadata metadata, String targetDC) + { + Objects.requireNonNull(keyspace, "keyspace"); + Objects.requireNonNull(metadata, "metadata"); + Objects.requireNonNull(targetDC, "targetDC"); + + // In 2.1 the logic was to have a set of nodes used as a seed, they were used to query + // client.describe_local_ring(keyspace) -> List<TokenRange>; this should include all nodes in the local dc. + // TokenRange contained the endpoints in order, so .endpoints.get(0) is the primary owner + // Client does not have a similar API, instead it returns Set<Host>. To replicate this we first need + // to compute the primary owners, then add in the replicas + + List<Token> tokens = new ArrayList<>(); + Map<Token, Host> tokenToHost = new HashMap<>(); + for (Host host : metadata.getAllHosts()) + { + if (!targetDC.equals(host.getDatacenter())) + continue; + + for (Token token : host.getTokens()) + { + Host previous = tokenToHost.putIfAbsent(token, host); + if (previous != null) + throw new IllegalStateException("Two hosts share the same token; hosts " + host.getHostId() + ":" + + host.getTokens() + ", " + previous.getHostId() + ":" + previous.getTokens()); + tokens.add(token); + } + } + Collections.sort(tokens); + + Map<TokenRange, List<Host>> rangeToReplicas = new HashMap<>(); + + // The first token in the ring uses the last token as its 'start', handle this here to simplify the loop + Token start = tokens.get(tokens.size() - 1); + Token end = tokens.get(0); + + addRange(keyspace, metadata, tokenToHost, rangeToReplicas, start, end); + for (int i = 1; i < tokens.size(); i++) + { + start = tokens.get(i - 1); + end = tokens.get(i); + + addRange(keyspace, metadata, tokenToHost, rangeToReplicas, start, end); + } + + return rangeToReplicas; + } + + private static void addRange(String keyspace, + Metadata metadata, + Map<Token, Host> tokenToHost, + Map<TokenRange, List<Host>> rangeToReplicas, + Token start, Token end) + { + Host host = tokenToHost.get(end); + String dc = host.getDatacenter(); + + TokenRange range = metadata.newTokenRange(start, end); + List<Host> replicas = new ArrayList<>(); + replicas.add(host); + // get all the replicas for the specific DC + for (Host replica : metadata.getReplicas(keyspace, range)) + { + if (dc.equals(replica.getDatacenter()) && !host.equals(replica)) + replicas.add(replica); + } + List<Host> previous = rangeToReplicas.put(range, replicas); + if (previous != null) + throw new IllegalStateException("Two hosts (" + host + ", " + previous + ") map to the same token range: " + range); + } +} diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlInputFormat.java b/src/java/org/apache/cassandra/hadoop/cql3/CqlInputFormat.java index eae1fa2..262965f 100644 --- a/src/java/org/apache/cassandra/hadoop/cql3/CqlInputFormat.java +++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlInputFormat.java @@ -18,6 +18,7 @@ package org.apache.cassandra.hadoop.cql3; import java.io.IOException; +import java.net.InetAddress; import java.util.*; import java.util.concurrent.*; @@ -27,9 +28,16 @@ import com.datastax.driver.core.Metadata; import com.datastax.driver.core.ResultSet; import com.datastax.driver.core.Row; import com.datastax.driver.core.Session; +import com.datastax.driver.core.SimpleStatement; +import com.datastax.driver.core.Statement; import com.datastax.driver.core.TokenRange; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Iterables; import com.google.common.collect.Maps; +import com.google.common.collect.Multimap; +import com.datastax.driver.core.exceptions.InvalidQueryException; import org.apache.cassandra.schema.SchemaConstants; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapred.InputSplit; @@ -130,10 +138,15 @@ public class CqlInputFormat extends org.apache.hadoop.mapreduce.InputFormat<Long ExecutorService executor = new ThreadPoolExecutor(0, 128, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>()); List<org.apache.hadoop.mapreduce.InputSplit> splits = new ArrayList<>(); - try (Cluster cluster = CqlConfigHelper.getInputCluster(ConfigHelper.getInputInitialAddress(conf).split(","), conf); + String[] inputInitialAddress = ConfigHelper.getInputInitialAddress(conf).split(","); + try (Cluster cluster = CqlConfigHelper.getInputCluster(inputInitialAddress, conf); Session session = cluster.connect()) { - List<Future<List<org.apache.hadoop.mapreduce.InputSplit>>> splitfutures = new ArrayList<>(); + List<SplitFuture> splitfutures = new ArrayList<>(); + //TODO if the job range is defined and does perfectly match tokens, then the logic will be unable to get estimates since they are pre-computed + // tokens: [0, 10, 20] + // job range: [0, 10) - able to get estimate + // job range: [5, 15) - unable to get estimate Pair<String, String> jobKeyRange = ConfigHelper.getInputKeyRange(conf); Range<Token> jobRange = null; if (jobKeyRange != null) @@ -145,14 +158,18 @@ public class CqlInputFormat extends org.apache.hadoop.mapreduce.InputFormat<Long Metadata metadata = cluster.getMetadata(); // canonical ranges and nodes holding replicas - Map<TokenRange, Set<Host>> masterRangeNodes = getRangeMap(keyspace, metadata); - + Map<TokenRange, List<Host>> masterRangeNodes = getRangeMap(keyspace, metadata, getTargetDC(metadata, inputInitialAddress)); for (TokenRange range : masterRangeNodes.keySet()) { if (jobRange == null) { - // for each tokenRange, pick a live owner and ask it to compute bite-sized splits - splitfutures.add(executor.submit(new SplitCallable(range, masterRangeNodes.get(range), conf, session))); + for (TokenRange unwrapped : range.unwrap()) + { + // for each tokenRange, pick a live owner and ask it for the byte-sized splits + SplitFuture task = new SplitFuture(new SplitCallable(unwrapped, masterRangeNodes.get(range), conf, session)); + executor.submit(task); + splitfutures.add(task); + } } else { @@ -161,24 +178,61 @@ public class CqlInputFormat extends org.apache.hadoop.mapreduce.InputFormat<Long { for (TokenRange intersection: range.intersectWith(jobTokenRange)) { - // for each tokenRange, pick a live owner and ask it to compute bite-sized splits - splitfutures.add(executor.submit(new SplitCallable(intersection, masterRangeNodes.get(range), conf, session))); + for (TokenRange unwrapped : intersection.unwrap()) + { + // for each tokenRange, pick a live owner and ask it for the byte-sized splits + SplitFuture task = new SplitFuture(new SplitCallable(unwrapped, masterRangeNodes.get(range), conf, session)); + executor.submit(task); + splitfutures.add(task); + } } } } } // wait until we have all the results back - for (Future<List<org.apache.hadoop.mapreduce.InputSplit>> futureInputSplits : splitfutures) + List<SplitFuture> failedTasks = new ArrayList<>(); + int maxSplits = 0; + long expectedPartionsForFailedRanges = 0; + for (SplitFuture task : splitfutures) { try { - splits.addAll(futureInputSplits.get()); + List<ColumnFamilySplit> tokenRangeSplits = task.get(); + if (tokenRangeSplits.size() > maxSplits) + { + maxSplits = tokenRangeSplits.size(); + expectedPartionsForFailedRanges = tokenRangeSplits.get(0).getLength(); + } } catch (Exception e) { - throw new IOException("Could not get input splits", e); + failedTasks.add(task); + } + } + // The estimate is only stored on a single host, if that host is down then can not get the estimate + // its more than likely that a single host could be "too large" for one split but there is no way of + // knowning! + // This logic attempts to guess the estimate from all the successful ranges + if (!failedTasks.isEmpty()) + { + // if every split failed this will be 0 + if (maxSplits == 0) + throwAllSplitsFailed(failedTasks); + for (SplitFuture task : failedTasks) + { + try + { + // the task failed, so this should throw + task.get(); + } + catch (Exception cause) + { + logger.warn("Unable to get estimate for {}, the host {} had a exception; falling back to default estimate", task.splitCallable.tokenRange, task.splitCallable.hosts.get(0), cause); + } } + for (SplitFuture task : failedTasks) + splits.addAll(toSplit(task.splitCallable.hosts, splitTokenRange(task.splitCallable.tokenRange, maxSplits, expectedPartionsForFailedRanges))); } } finally @@ -191,19 +245,73 @@ public class CqlInputFormat extends org.apache.hadoop.mapreduce.InputFormat<Long return splits; } - private TokenRange rangeToTokenRange(Metadata metadata, Range<Token> range) + private static IllegalStateException throwAllSplitsFailed(List<SplitFuture> failedTasks) { - return metadata.newTokenRange(metadata.newToken(partitioner.getTokenFactory().toString(range.left)), - metadata.newToken(partitioner.getTokenFactory().toString(range.right))); + IllegalStateException exception = new IllegalStateException("No successful tasks found"); + for (SplitFuture task : failedTasks) + { + try + { + // the task failed, so this should throw + task.get(); + } + catch (Exception cause) + { + exception.addSuppressed(cause); + } + } + throw exception; } - private Map<TokenRange, Long> getSubSplits(String keyspace, String cfName, TokenRange range, Configuration conf, Session session) + private static String getTargetDC(Metadata metadata, String[] inputInitialAddress) + { + BiMultiValMap<InetAddress, String> addressToDc = new BiMultiValMap<>(); + Multimap<String, InetAddress> dcToAddresses = addressToDc.inverse(); + + // only way to match is off the broadcast addresses, so for all hosts do a existence check + Set<InetAddress> addresses = new HashSet<>(inputInitialAddress.length); + for (String inputAddress : inputInitialAddress) + addresses.addAll(parseAddress(inputAddress)); + + for (Host host : metadata.getAllHosts()) + { + InetAddress address = host.getBroadcastAddress(); + if (addresses.contains(address)) + addressToDc.put(address, host.getDatacenter()); + } + + switch (dcToAddresses.keySet().size()) + { + case 1: + return Iterables.getOnlyElement(dcToAddresses.keySet()); + case 0: + throw new IllegalStateException("Input addresses could not be used to find DC; non match client metadata"); + default: + // Mutliple DCs found, attempt to pick the first based off address list. This is to mimic the 2.1 + // behavior which would connect in order and the first node successfully able to connect to was the + // local DC to use; since client abstracts this, we rely on existence as a proxy for connect. + for (String inputAddress : inputInitialAddress) + { + for (InetAddress add : parseAddress(inputAddress)) + { + String dc = addressToDc.get(add); + // possible the address isn't in the cluster and the client dropped, so ignore null + if (dc != null) + return dc; + } + } + // some how we were able to connect to the cluster, find multiple DCs using matching, and yet couldn't + // match again... + throw new AssertionError("Unable to infer datacenter from initial addresses; multiple datacenters found " + + dcToAddresses.keySet() + ", should only use addresses from one datacenter"); + } + } + + private static List<InetAddress> parseAddress(String str) { - int splitSize = ConfigHelper.getInputSplitSize(conf); - int splitSizeMb = ConfigHelper.getInputSplitSizeInMb(conf); try { - return describeSplits(keyspace, cfName, range, splitSize, splitSizeMb, session); + return Arrays.asList(InetAddress.getAllByName(str)); } catch (Exception e) { @@ -211,22 +319,35 @@ public class CqlInputFormat extends org.apache.hadoop.mapreduce.InputFormat<Long } } - private Map<TokenRange, Set<Host>> getRangeMap(String keyspace, Metadata metadata) + private TokenRange rangeToTokenRange(Metadata metadata, Range<Token> range) + { + return metadata.newTokenRange(metadata.newToken(partitioner.getTokenFactory().toString(range.left)), + metadata.newToken(partitioner.getTokenFactory().toString(range.right))); + } + + private Map<TokenRange, Long> getSubSplits(String keyspace, String cfName, TokenRange range, Host host, Configuration conf, Session session) { - return metadata.getTokenRanges() - .stream() - .collect(toMap(p -> p, p -> metadata.getReplicas('"' + keyspace + '"', p))); + int splitSize = ConfigHelper.getInputSplitSize(conf); + int splitSizeMb = ConfigHelper.getInputSplitSizeInMb(conf); + return describeSplits(keyspace, cfName, range, host, splitSize, splitSizeMb, session); } - private Map<TokenRange, Long> describeSplits(String keyspace, String table, TokenRange tokenRange, int splitSize, int splitSizeMb, Session session) + private static Map<TokenRange, List<Host>> getRangeMap(String keyspace, Metadata metadata, String targetDC) { - String query = String.format("SELECT mean_partition_size, partitions_count " + - "FROM %s.%s " + - "WHERE keyspace_name = ? AND table_name = ? AND range_start = ? AND range_end = ?", - SchemaConstants.SYSTEM_KEYSPACE_NAME, - SystemKeyspace.SIZE_ESTIMATES); + return CqlClientHelper.getLocalPrimaryRangeForDC(keyspace, metadata, targetDC); + } - ResultSet resultSet = session.execute(query, keyspace, table, tokenRange.getStart().toString(), tokenRange.getEnd().toString()); + private Map<TokenRange, Long> describeSplits(String keyspace, String table, TokenRange tokenRange, Host host, int splitSize, int splitSizeMb, Session session) + { + // In 2.1 the host list was walked in-order (only move to next if IOException) and calls + // org.apache.cassandra.service.StorageService.getSplits(java.lang.String, java.lang.String, org.apache.cassandra.dht.Range<org.apache.cassandra.dht.Token>, int) + // that call computes totalRowCountEstimate (used to compute #splits) then splits the ring based off those estimates + // + // The main difference is that the estimates in 2.1 were computed based off the data, so replicas could answer the estimates + // In 3.0 we rely on the below CQL query which is local and only computes estimates for the primary range; this + // puts us in a sticky spot to answer, if the node fails what do we do? 3.0 behavior only matches 2.1 IFF all + // nodes are up and healthy + ResultSet resultSet = queryTableEstimates(session, host, keyspace, table, tokenRange); Row row = resultSet.one(); @@ -250,14 +371,47 @@ public class CqlInputFormat extends org.apache.hadoop.mapreduce.InputFormat<Long if (splitCount == 0) { Map<TokenRange, Long> wrappedTokenRange = new HashMap<>(); - wrappedTokenRange.put(tokenRange, (long) 128); + wrappedTokenRange.put(tokenRange, partitionCount == 0 ? 128L : partitionCount); return wrappedTokenRange; } + return splitTokenRange(tokenRange, splitCount, partitionCount / splitCount); + } + + private static ResultSet queryTableEstimates(Session session, Host host, String keyspace, String table, TokenRange tokenRange) + { + try + { + String query = String.format("SELECT mean_partition_size, partitions_count " + + "FROM %s.%s " + + "WHERE keyspace_name = ? AND table_name = ? AND range_type = '%s' AND range_start = ? AND range_end = ?", + SchemaConstants.SYSTEM_KEYSPACE_NAME, + SystemKeyspace.TABLE_ESTIMATES, + SystemKeyspace.TABLE_ESTIMATES_TYPE_LOCAL_PRIMARY); + Statement stmt = new SimpleStatement(query, keyspace, table, tokenRange.getStart().toString(), tokenRange.getEnd().toString()).setHost(host); + return session.execute(stmt); + } + catch (InvalidQueryException e) + { + // if the table doesn't exist, fall back to old table. This is likely to return no records in a multi + // DC setup, but should work fine in a single DC setup. + String query = String.format("SELECT mean_partition_size, partitions_count " + + "FROM %s.%s " + + "WHERE keyspace_name = ? AND table_name = ? AND range_start = ? AND range_end = ?", + SchemaConstants.SYSTEM_KEYSPACE_NAME, + SystemKeyspace.LEGACY_SIZE_ESTIMATES); + + Statement stmt = new SimpleStatement(query, keyspace, table, tokenRange.getStart().toString(), tokenRange.getEnd().toString()).setHost(host); + return session.execute(stmt); + } + } + + private static Map<TokenRange, Long> splitTokenRange(TokenRange tokenRange, int splitCount, long partitionCount) + { List<TokenRange> splitRanges = tokenRange.splitEvenly(splitCount); Map<TokenRange, Long> rangesWithLength = Maps.newHashMapWithExpectedSize(splitRanges.size()); for (TokenRange range : splitRanges) - rangesWithLength.put(range, partitionCount/splitCount); + rangesWithLength.put(range, partitionCount); return rangesWithLength; } @@ -277,56 +431,70 @@ public class CqlInputFormat extends org.apache.hadoop.mapreduce.InputFormat<Long * Gets a token tokenRange and splits it up according to the suggested * size into input splits that Hadoop can use. */ - class SplitCallable implements Callable<List<org.apache.hadoop.mapreduce.InputSplit>> + class SplitCallable implements Callable<List<ColumnFamilySplit>> { private final TokenRange tokenRange; - private final Set<Host> hosts; + private final List<Host> hosts; private final Configuration conf; private final Session session; - public SplitCallable(TokenRange tr, Set<Host> hosts, Configuration conf, Session session) + public SplitCallable(TokenRange tokenRange, List<Host> hosts, Configuration conf, Session session) { - this.tokenRange = tr; + Preconditions.checkArgument(!hosts.isEmpty(), "hosts list requires at least 1 host but was empty"); + this.tokenRange = tokenRange; this.hosts = hosts; this.conf = conf; this.session = session; } - public List<org.apache.hadoop.mapreduce.InputSplit> call() throws Exception + public List<ColumnFamilySplit> call() throws Exception { - ArrayList<org.apache.hadoop.mapreduce.InputSplit> splits = new ArrayList<>(); - Map<TokenRange, Long> subSplits; - subSplits = getSubSplits(keyspace, cfName, tokenRange, conf, session); - // turn the sub-ranges into InputSplits - String[] endpoints = new String[hosts.size()]; + Map<TokenRange, Long> subSplits = getSubSplits(keyspace, cfName, tokenRange, hosts.get(0), conf, session); + return toSplit(hosts, subSplits); + } - // hadoop needs hostname, not ip - int endpointIndex = 0; - for (Host endpoint : hosts) - endpoints[endpointIndex++] = endpoint.getAddress().getHostName(); + } - boolean partitionerIsOpp = partitioner instanceof OrderPreservingPartitioner || partitioner instanceof ByteOrderedPartitioner; + private static class SplitFuture extends FutureTask<List<ColumnFamilySplit>> + { + private final SplitCallable splitCallable; - for (Map.Entry<TokenRange, Long> subSplitEntry : subSplits.entrySet()) - { - List<TokenRange> ranges = subSplitEntry.getKey().unwrap(); - for (TokenRange subrange : ranges) - { - ColumnFamilySplit split = - new ColumnFamilySplit( - partitionerIsOpp ? - subrange.getStart().toString().substring(2) : subrange.getStart().toString(), - partitionerIsOpp ? - subrange.getEnd().toString().substring(2) : subrange.getEnd().toString(), - subSplitEntry.getValue(), - endpoints); - - logger.trace("adding {}", split); - splits.add(split); - } - } - return splits; + SplitFuture(SplitCallable splitCallable) + { + super(splitCallable); + this.splitCallable = splitCallable; } } + + private List<ColumnFamilySplit> toSplit(List<Host> hosts, Map<TokenRange, Long> subSplits) + { + // turn the sub-ranges into InputSplits + String[] endpoints = new String[hosts.size()]; + + // hadoop needs hostname, not ip + int endpointIndex = 0; + for (Host endpoint : hosts) + endpoints[endpointIndex++] = endpoint.getAddress().getHostName(); + + boolean partitionerIsOpp = partitioner instanceof OrderPreservingPartitioner || partitioner instanceof ByteOrderedPartitioner; + + ArrayList<ColumnFamilySplit> splits = new ArrayList<>(); + for (Map.Entry<TokenRange, Long> subSplitEntry : subSplits.entrySet()) + { + TokenRange subrange = subSplitEntry.getKey(); + ColumnFamilySplit split = + new ColumnFamilySplit( + partitionerIsOpp ? + subrange.getStart().toString().substring(2) : subrange.getStart().toString(), + partitionerIsOpp ? + subrange.getEnd().toString().substring(2) : subrange.getEnd().toString(), + subSplitEntry.getValue(), + endpoints); + + logger.trace("adding {}", split); + splits.add(split); + } + return splits; + } } diff --git a/src/java/org/apache/cassandra/hadoop/cql3/LimitedLocalNodeFirstLocalBalancingPolicy.java b/src/java/org/apache/cassandra/hadoop/cql3/LimitedLocalNodeFirstLocalBalancingPolicy.java index 256da2d..59b4eca 100644 --- a/src/java/org/apache/cassandra/hadoop/cql3/LimitedLocalNodeFirstLocalBalancingPolicy.java +++ b/src/java/org/apache/cassandra/hadoop/cql3/LimitedLocalNodeFirstLocalBalancingPolicy.java @@ -56,6 +56,7 @@ class LimitedLocalNodeFirstLocalBalancingPolicy implements LoadBalancingPolicy private final CopyOnWriteArraySet<Host> liveReplicaHosts = new CopyOnWriteArraySet<>(); private final Set<InetAddress> replicaAddresses = new HashSet<>(); + private final Set<String> allowedDCs = new CopyOnWriteArraySet<>(); public LimitedLocalNodeFirstLocalBalancingPolicy(String[] replicas) { @@ -78,15 +79,22 @@ class LimitedLocalNodeFirstLocalBalancingPolicy implements LoadBalancingPolicy @Override public void init(Cluster cluster, Collection<Host> hosts) { - List<Host> replicaHosts = new ArrayList<>(); + // first find which DCs the user defined + Set<String> dcs = new HashSet<>(); for (Host host : hosts) { if (replicaAddresses.contains(host.getAddress())) - { + dcs.add(host.getDatacenter()); + } + // filter to all nodes within the targeted DCs + List<Host> replicaHosts = new ArrayList<>(); + for (Host host : hosts) + { + if (dcs.contains(host.getDatacenter())) replicaHosts.add(host); - } } liveReplicaHosts.addAll(replicaHosts); + allowedDCs.addAll(dcs); logger.trace("Initialized with replica hosts: {}", replicaHosts); } @@ -136,7 +144,7 @@ class LimitedLocalNodeFirstLocalBalancingPolicy implements LoadBalancingPolicy @Override public void onAdd(Host host) { - if (replicaAddresses.contains(host.getAddress())) + if (liveReplicaHosts.contains(host)) { liveReplicaHosts.add(host); logger.trace("Added a new host {}", host); @@ -146,7 +154,7 @@ class LimitedLocalNodeFirstLocalBalancingPolicy implements LoadBalancingPolicy @Override public void onUp(Host host) { - if (replicaAddresses.contains(host.getAddress())) + if (liveReplicaHosts.contains(host)) { liveReplicaHosts.add(host); logger.trace("The host {} is now up", host); diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java index fd350bf..08f0612 100644 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@ -3677,14 +3677,14 @@ public class StorageService extends NotificationBroadcasterSupport implements IE String keyspace = tablesByKeyspace.getKey(); if (!Schema.instance.getKeyspaces().contains(keyspace)) { - SystemKeyspace.clearSizeEstimates(keyspace); + SystemKeyspace.clearEstimates(keyspace); } else { for (String table : tablesByKeyspace.getValue()) { if (Schema.instance.getTableMetadataRef(keyspace, table) == null) - SystemKeyspace.clearSizeEstimates(keyspace, table); + SystemKeyspace.clearEstimates(keyspace, table); } } } @@ -3912,6 +3912,32 @@ public class StorageService extends NotificationBroadcasterSupport implements IE return localDCPrimaryRanges; } + public Collection<Range<Token>> getLocalPrimaryRange() + { + return getLocalPrimaryRangeForEndpoint(FBUtilities.getBroadcastAddressAndPort()); + } + + public Collection<Range<Token>> getLocalPrimaryRangeForEndpoint(InetAddressAndPort referenceEndpoint) + { + IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch(); + TokenMetadata tokenMetadata = this.tokenMetadata.cloneOnlyTokenMap(); + String dc = snitch.getDatacenter(referenceEndpoint); + Set<Token> tokens = new HashSet<>(tokenMetadata.getTokens(referenceEndpoint)); + + // filter tokens to the single DC + List<Token> filteredTokens = Lists.newArrayList(); + for (Token token : tokenMetadata.sortedTokens()) + { + InetAddressAndPort endpoint = tokenMetadata.getEndpoint(token); + if (dc.equals(snitch.getDatacenter(endpoint))) + filteredTokens.add(token); + } + + return getAllRanges(filteredTokens).stream() + .filter(t -> tokens.contains(t.right)) + .collect(Collectors.toList()); + } + /** * Get all ranges that span the ring given a set * of tokens. All ranges are in sorted order of diff --git a/test/unit/org/apache/cassandra/service/StorageServiceServerTest.java b/test/unit/org/apache/cassandra/service/StorageServiceServerTest.java index 8e91342..6e7704a 100644 --- a/test/unit/org/apache/cassandra/service/StorageServiceServerTest.java +++ b/test/unit/org/apache/cassandra/service/StorageServiceServerTest.java @@ -193,6 +193,45 @@ public class StorageServiceServerTest // no need to insert extra data, even an "empty" database will have a little information in the system keyspace StorageService.instance.takeSnapshot(UUID.randomUUID().toString(), SchemaConstants.SCHEMA_KEYSPACE_NAME); } + @Test + public void testLocalPrimaryRangeForEndpointWithNetworkTopologyStrategy() throws Exception + { + TokenMetadata metadata = StorageService.instance.getTokenMetadata(); + metadata.clearUnsafe(); + + // DC1 + metadata.updateNormalToken(new StringToken("A"), InetAddressAndPort.getByName("127.0.0.1")); + metadata.updateNormalToken(new StringToken("C"), InetAddressAndPort.getByName("127.0.0.2")); + + // DC2 + metadata.updateNormalToken(new StringToken("B"), InetAddressAndPort.getByName("127.0.0.4")); + metadata.updateNormalToken(new StringToken("D"), InetAddressAndPort.getByName("127.0.0.5")); + + Map<String, String> configOptions = new HashMap<>(); + configOptions.put("DC1", "2"); + configOptions.put("DC2", "2"); + configOptions.put(ReplicationParams.CLASS, "NetworkTopologyStrategy"); + + Keyspace.clear("Keyspace1"); + KeyspaceMetadata meta = KeyspaceMetadata.create("Keyspace1", KeyspaceParams.create(false, configOptions)); + Schema.instance.load(meta); + + Collection<Range<Token>> primaryRanges = StorageService.instance.getLocalPrimaryRangeForEndpoint(InetAddressAndPort.getByName("127.0.0.1")); + assertEquals(1, primaryRanges.size()); + assertTrue(primaryRanges.contains(new Range<Token>(new StringToken("C"), new StringToken("A")))); + + primaryRanges = StorageService.instance.getLocalPrimaryRangeForEndpoint(InetAddressAndPort.getByName("127.0.0.2")); + assertEquals(1, primaryRanges.size()); + assertTrue(primaryRanges.contains(new Range<Token>(new StringToken("A"), new StringToken("C")))); + + primaryRanges = StorageService.instance.getLocalPrimaryRangeForEndpoint(InetAddressAndPort.getByName("127.0.0.4")); + assertEquals(1, primaryRanges.size()); + assertTrue(primaryRanges.contains(new Range<Token>(new StringToken("D"), new StringToken("B")))); + + primaryRanges = StorageService.instance.getLocalPrimaryRangeForEndpoint(InetAddressAndPort.getByName("127.0.0.5")); + assertEquals(1, primaryRanges.size()); + assertTrue(primaryRanges.contains(new Range<Token>(new StringToken("B"), new StringToken("D")))); + } @Test public void testPrimaryRangeForEndpointWithinDCWithNetworkTopologyStrategy() throws Exception --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org