10835 3.0
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/a7a03887 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/a7a03887 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/a7a03887 Branch: refs/heads/cassandra-3.0 Commit: a7a03887cdfd109d0a0150a4bce1eba9b70494de Parents: b29275c Author: Artem Aliev <artem.al...@gmail.com> Authored: Thu Dec 10 12:41:15 2015 -0500 Committer: Joshua McKenzie <jmcken...@apache.org> Committed: Thu Dec 10 12:41:15 2015 -0500 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../apache/cassandra/hadoop/ConfigHelper.java | 26 +++++++++++++++++++- .../cassandra/hadoop/cql3/CqlInputFormat.java | 17 ++++++++----- 3 files changed, 37 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7a03887/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 8bd5892..9c01160 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -25,6 +25,7 @@ * Keep the file open in trySkipCache (CASSANDRA-10669) * Updated trigger example (CASSANDRA-10257) Merged from 2.2: + * Fix regression on split size in CqlInputFormat (CASSANDRA-10835) * Better handling of SSL connection errors inter-node (CASSANDRA-10816) * Verify tables in pseudo-system keyspaces at startup (CASSANDRA-10761) * Fix IllegalArgumentException in DataOutputBuffer.reallocate for large buffers (CASSANDRA-10592) http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7a03887/src/java/org/apache/cassandra/hadoop/ConfigHelper.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/ConfigHelper.java b/src/java/org/apache/cassandra/hadoop/ConfigHelper.java index a9dcc6c..a4deb4a 100644 --- a/src/java/org/apache/cassandra/hadoop/ConfigHelper.java +++ b/src/java/org/apache/cassandra/hadoop/ConfigHelper.java @@ -54,6 +54,7 @@ public class ConfigHelper private static final String INPUT_PREDICATE_CONFIG = "cassandra.input.predicate"; private static final String INPUT_KEYRANGE_CONFIG = "cassandra.input.keyRange"; private static final String INPUT_SPLIT_SIZE_CONFIG = "cassandra.input.split.size"; + private static final String INPUT_SPLIT_SIZE_IN_MB_CONFIG = "cassandra.input.split.size_mb"; private static final String INPUT_WIDEROWS_CONFIG = "cassandra.input.widerows"; private static final int DEFAULT_SPLIT_SIZE = 64 * 1024; private static final String RANGE_BATCH_SIZE_CONFIG = "cassandra.range.batch.size"; @@ -176,7 +177,7 @@ public class ConfigHelper * the overhead of each map will take up the bulk of the job time. * * @param conf Job configuration you are about to run - * @param splitsize Size of the input split + * @param splitsize Number of partitions in the input split */ public static void setInputSplitSize(Configuration conf, int splitsize) { @@ -189,6 +190,29 @@ public class ConfigHelper } /** + * Set the size of the input split. setInputSplitSize value is used if this is not set. + * This affects the number of maps created, if the number is too small + * the overhead of each map will take up the bulk of the job time. + * + * @param conf Job configuration you are about to run + * @param splitSizeMb Input split size in MB + */ + public static void setInputSplitSizeInMb(Configuration conf, int splitSizeMb) + { + conf.setInt(INPUT_SPLIT_SIZE_IN_MB_CONFIG, splitSizeMb); + } + + /** + * cassandra.input.split.size will be used if the value is undefined or negative. + * @param conf Job configuration you are about to run + * @return split size in MB or -1 if it is undefined. + */ + public static int getInputSplitSizeInMb(Configuration conf) + { + return conf.getInt(INPUT_SPLIT_SIZE_IN_MB_CONFIG, -1); + } + + /** * Set the predicate that determines what columns will be selected from each row. * * @param conf Job configuration you are about to run http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7a03887/src/java/org/apache/cassandra/hadoop/cql3/CqlInputFormat.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlInputFormat.java b/src/java/org/apache/cassandra/hadoop/cql3/CqlInputFormat.java index 6856175..534e66d 100644 --- a/src/java/org/apache/cassandra/hadoop/cql3/CqlInputFormat.java +++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlInputFormat.java @@ -53,10 +53,12 @@ import org.apache.cassandra.hadoop.*; * ConfigHelper.setInputColumnFamily * * You can also configure the number of rows per InputSplit with - * ConfigHelper.setInputSplitSize. The default split size is 64k rows. + * 1: ConfigHelper.setInputSplitSize. The default split size is 64k rows. + * or + * 2: ConfigHelper.setInputSplitSizeInMb. InputSplit size in MB with new, more precise method + * If no value is provided for InputSplitSizeInMb, we default to using InputSplitSize. * - * the number of CQL rows per page - * CQLConfigHelper.setInputCQLPageRowSize. The default page row size is 1000. You + * CQLConfigHelper.setInputCQLPageRowSize. The default page row size is 1000. You * should set it to "as big as possible, but no bigger." It set the LIMIT for the CQL * query, so you need set it big enough to minimize the network overhead, and also * not too big to avoid out of memory issue. @@ -213,9 +215,10 @@ public class CqlInputFormat extends org.apache.hadoop.mapreduce.InputFormat<Long private Map<TokenRange, Long> getSubSplits(String keyspace, String cfName, TokenRange range, Configuration conf) throws IOException { int splitSize = ConfigHelper.getInputSplitSize(conf); + int splitSizeMb = ConfigHelper.getInputSplitSizeInMb(conf); try { - return describeSplits(keyspace, cfName, range, splitSize); + return describeSplits(keyspace, cfName, range, splitSize, splitSizeMb); } catch (Exception e) { @@ -235,7 +238,7 @@ public class CqlInputFormat extends org.apache.hadoop.mapreduce.InputFormat<Long } } - private Map<TokenRange, Long> describeSplits(String keyspace, String table, TokenRange tokenRange, int splitSize) + private Map<TokenRange, Long> describeSplits(String keyspace, String table, TokenRange tokenRange, int splitSize, int splitSizeMb) { String query = String.format("SELECT mean_partition_size, partitions_count " + "FROM %s.%s " + @@ -256,7 +259,9 @@ public class CqlInputFormat extends org.apache.hadoop.mapreduce.InputFormat<Long meanPartitionSize = row.getLong("mean_partition_size"); partitionCount = row.getLong("partitions_count"); - splitCount = (int)((meanPartitionSize * partitionCount) / splitSize); + splitCount = splitSizeMb > 0 + ? (int)(meanPartitionSize * partitionCount / splitSizeMb / 1024 / 1024) + : (int)(partitionCount / splitSize); } // If we have no data on this split or the size estimate is 0,