Fix regression in split size on CqlInputFormat Patch by aaliev; reviewed by jmckenzie for CASSANDRA-10835
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/4f3e47bf Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/4f3e47bf Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/4f3e47bf Branch: refs/heads/cassandra-3.0 Commit: 4f3e47bf9edbfefec2f85ecd915ac8fbbc81de8b Parents: c8493c4 Author: Artem Aliev <artem.al...@gmail.com> Authored: Thu Dec 10 12:33:58 2015 -0500 Committer: Joshua McKenzie <jmcken...@apache.org> Committed: Thu Dec 10 12:33:58 2015 -0500 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../hadoop/AbstractColumnFamilyInputFormat.java | 36 ++++++++++++++------ .../apache/cassandra/hadoop/ConfigHelper.java | 26 +++++++++++++- .../cassandra/hadoop/cql3/CqlInputFormat.java | 8 +++-- 4 files changed, 57 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/4f3e47bf/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index cd6b92e..30a76a9 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 2.2.5 + * Fix regression in split size on CqlInputFormat (CASSANDRA-10835) * Better handling of SSL connection errors inter-node (CASSANDRA-10816) * Disable reloading of GossipingPropertyFileSnitch (CASSANDRA-9474) * Verify tables in pseudo-system keyspaces at startup (CASSANDRA-10761) http://git-wip-us.apache.org/repos/asf/cassandra/blob/4f3e47bf/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyInputFormat.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyInputFormat.java b/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyInputFormat.java index 3c088c2..d55f205 100644 --- a/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyInputFormat.java +++ b/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyInputFormat.java @@ -18,8 +18,19 @@ package org.apache.cassandra.hadoop; import java.io.IOException; -import java.util.*; -import java.util.concurrent.*; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -31,18 +42,19 @@ import com.datastax.driver.core.ResultSet; import com.datastax.driver.core.Row; import com.datastax.driver.core.Session; import com.datastax.driver.core.TokenRange; - import org.apache.cassandra.db.SystemKeyspace; -import org.apache.cassandra.dht.ByteOrderedPartitioner; import org.apache.cassandra.dht.IPartitioner; -import org.apache.cassandra.dht.OrderPreservingPartitioner; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; -import org.apache.cassandra.hadoop.cql3.*; +import org.apache.cassandra.hadoop.cql3.CqlConfigHelper; import org.apache.cassandra.thrift.KeyRange; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapreduce.*; +import org.apache.hadoop.mapreduce.InputFormat; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.TaskAttemptID; public abstract class AbstractColumnFamilyInputFormat<K, Y> extends InputFormat<K, Y> implements org.apache.hadoop.mapred.InputFormat<K, Y> { @@ -230,9 +242,10 @@ public abstract class AbstractColumnFamilyInputFormat<K, Y> extends InputFormat< private Map<TokenRange, Long> getSubSplits(String keyspace, String cfName, TokenRange range, Configuration conf, Session session) throws IOException { int splitSize = ConfigHelper.getInputSplitSize(conf); + int splitSizeMb = ConfigHelper.getInputSplitSizeInMb(conf); try { - return describeSplits(keyspace, cfName, range, splitSize, session); + return describeSplits(keyspace, cfName, range, splitSize, splitSizeMb, session); } catch (Exception e) { @@ -252,7 +265,7 @@ public abstract class AbstractColumnFamilyInputFormat<K, Y> extends InputFormat< } } - private Map<TokenRange, Long> describeSplits(String keyspace, String table, TokenRange tokenRange, int splitSize, Session session) + private Map<TokenRange, Long> describeSplits(String keyspace, String table, TokenRange tokenRange, int splitSize, int splitSizeMb, Session session) { String query = String.format("SELECT mean_partition_size, partitions_count " + "FROM %s.%s " + @@ -275,7 +288,10 @@ public abstract class AbstractColumnFamilyInputFormat<K, Y> extends InputFormat< long meanPartitionSize = row.getLong("mean_partition_size"); long partitionCount = row.getLong("partitions_count"); - int splitCount = (int)((meanPartitionSize * partitionCount) / splitSize); + int splitCount = splitSizeMb > 0 + ? (int)(meanPartitionSize * partitionCount / splitSizeMb / 1024 / 1024) + : (int)(partitionCount / splitSize); + if (splitCount <= 0) splitCount = 1; List<TokenRange> splitRanges = tokenRange.splitEvenly(splitCount); Map<TokenRange, Long> rangesWithLength = new HashMap<>(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/4f3e47bf/src/java/org/apache/cassandra/hadoop/ConfigHelper.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/ConfigHelper.java b/src/java/org/apache/cassandra/hadoop/ConfigHelper.java index e81860d..376c250 100644 --- a/src/java/org/apache/cassandra/hadoop/ConfigHelper.java +++ b/src/java/org/apache/cassandra/hadoop/ConfigHelper.java @@ -54,6 +54,7 @@ public class ConfigHelper private static final String INPUT_PREDICATE_CONFIG = "cassandra.input.predicate"; private static final String INPUT_KEYRANGE_CONFIG = "cassandra.input.keyRange"; private static final String INPUT_SPLIT_SIZE_CONFIG = "cassandra.input.split.size"; + private static final String INPUT_SPLIT_SIZE_IN_MB_CONFIG = "cassandra.input.split.size_mb"; private static final String INPUT_WIDEROWS_CONFIG = "cassandra.input.widerows"; private static final int DEFAULT_SPLIT_SIZE = 64 * 1024; private static final String RANGE_BATCH_SIZE_CONFIG = "cassandra.range.batch.size"; @@ -176,7 +177,7 @@ public class ConfigHelper * the overhead of each map will take up the bulk of the job time. * * @param conf Job configuration you are about to run - * @param splitsize Size of the input split + * @param splitsize Number of partitions in the input split */ public static void setInputSplitSize(Configuration conf, int splitsize) { @@ -189,6 +190,29 @@ public class ConfigHelper } /** + * Set the size of the input split. getInputSplitSize value is used if this is not set. + * This affects the number of maps created, if the number is too small + * the overhead of each map will take up the bulk of the job time. + * + * @param conf Job configuration you are about to run + * @param splitSizeMb Input split size in MB + */ + public static void setInputSplitSizeInMb(Configuration conf, int splitSizeMb) + { + conf.setInt(INPUT_SPLIT_SIZE_IN_MB_CONFIG, splitSizeMb); + } + + /** + * cassandra.input.split.size will be used if the value is undefined or negative. + * @param conf Job configuration you are about to run + * @return split size in MB or -1 if it is undefined. + */ + public static int getInputSplitSizeInMb(Configuration conf) + { + return conf.getInt(INPUT_SPLIT_SIZE_IN_MB_CONFIG, -1); + } + + /** * Set the predicate that determines what columns will be selected from each row. * * @param conf Job configuration you are about to run http://git-wip-us.apache.org/repos/asf/cassandra/blob/4f3e47bf/src/java/org/apache/cassandra/hadoop/cql3/CqlInputFormat.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlInputFormat.java b/src/java/org/apache/cassandra/hadoop/cql3/CqlInputFormat.java index 36da92d..c46ceb8 100644 --- a/src/java/org/apache/cassandra/hadoop/cql3/CqlInputFormat.java +++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlInputFormat.java @@ -40,10 +40,12 @@ import com.datastax.driver.core.Row; * ConfigHelper.setInputColumnFamily * * You can also configure the number of rows per InputSplit with - * ConfigHelper.setInputSplitSize. The default split size is 64k rows. + * 1: ConfigHelper.setInputSplitSize. The default split size is 64k rows. + * or + * 2: ConfigHelper.setInputSplitSizeInMb. InputSplit size in MB with new, more precise method + * If no value is provided for InputSplitSizeInMb, InputSplitSize will be used. * - * the number of CQL rows per page - * CQLConfigHelper.setInputCQLPageRowSize. The default page row size is 1000. You + * CQLConfigHelper.setInputCQLPageRowSize. The default page row size is 1000. You * should set it to "as big as possible, but no bigger." It set the LIMIT for the CQL * query, so you need set it big enough to minimize the network overhead, and also * not too big to avoid out of memory issue.