10835 3.0

Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/a7a03887
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/a7a03887
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/a7a03887

Branch: refs/heads/cassandra-3.0
Commit: a7a03887cdfd109d0a0150a4bce1eba9b70494de
Parents: b29275c
Author: Artem Aliev <artem.al...@gmail.com>
Authored: Thu Dec 10 12:41:15 2015 -0500
Committer: Joshua McKenzie <jmcken...@apache.org>
Committed: Thu Dec 10 12:41:15 2015 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../apache/cassandra/hadoop/ConfigHelper.java   | 26 +++++++++++++++++++-
 .../cassandra/hadoop/cql3/CqlInputFormat.java   | 17 ++++++++-----
 3 files changed, 37 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7a03887/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 8bd5892..9c01160 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -25,6 +25,7 @@
  * Keep the file open in trySkipCache (CASSANDRA-10669)
  * Updated trigger example (CASSANDRA-10257)
 Merged from 2.2:
+ * Fix regression on split size in CqlInputFormat (CASSANDRA-10835)
  * Better handling of SSL connection errors inter-node (CASSANDRA-10816)
  * Verify tables in pseudo-system keyspaces at startup (CASSANDRA-10761)
  * Fix IllegalArgumentException in DataOutputBuffer.reallocate for large 
buffers (CASSANDRA-10592)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7a03887/src/java/org/apache/cassandra/hadoop/ConfigHelper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/ConfigHelper.java 
b/src/java/org/apache/cassandra/hadoop/ConfigHelper.java
index a9dcc6c..a4deb4a 100644
--- a/src/java/org/apache/cassandra/hadoop/ConfigHelper.java
+++ b/src/java/org/apache/cassandra/hadoop/ConfigHelper.java
@@ -54,6 +54,7 @@ public class ConfigHelper
     private static final String INPUT_PREDICATE_CONFIG = 
"cassandra.input.predicate";
     private static final String INPUT_KEYRANGE_CONFIG = 
"cassandra.input.keyRange";
     private static final String INPUT_SPLIT_SIZE_CONFIG = 
"cassandra.input.split.size";
+    private static final String INPUT_SPLIT_SIZE_IN_MB_CONFIG = 
"cassandra.input.split.size_mb";
     private static final String INPUT_WIDEROWS_CONFIG = 
"cassandra.input.widerows";
     private static final int DEFAULT_SPLIT_SIZE = 64 * 1024;
     private static final String RANGE_BATCH_SIZE_CONFIG = 
"cassandra.range.batch.size";
@@ -176,7 +177,7 @@ public class ConfigHelper
      * the overhead of each map will take up the bulk of the job time.
      *
      * @param conf      Job configuration you are about to run
-     * @param splitsize Size of the input split
+     * @param splitsize Number of partitions in the input split
      */
     public static void setInputSplitSize(Configuration conf, int splitsize)
     {
@@ -189,6 +190,29 @@ public class ConfigHelper
     }
 
     /**
+     * Set the size of the input split. setInputSplitSize value is used if 
this is not set.
+     * This affects the number of maps created, if the number is too small
+     * the overhead of each map will take up the bulk of the job time.
+     *
+     * @param conf          Job configuration you are about to run
+     * @param splitSizeMb   Input split size in MB
+     */
+    public static void setInputSplitSizeInMb(Configuration conf, int 
splitSizeMb)
+    {
+        conf.setInt(INPUT_SPLIT_SIZE_IN_MB_CONFIG, splitSizeMb);
+    }
+
+    /**
+     * cassandra.input.split.size will be used if the value is undefined or 
negative.
+     * @param conf  Job configuration you are about to run
+     * @return      split size in MB or -1 if it is undefined.
+     */
+    public static int getInputSplitSizeInMb(Configuration conf)
+    {
+        return conf.getInt(INPUT_SPLIT_SIZE_IN_MB_CONFIG, -1);
+    }
+
+    /**
      * Set the predicate that determines what columns will be selected from 
each row.
      *
      * @param conf      Job configuration you are about to run

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7a03887/src/java/org/apache/cassandra/hadoop/cql3/CqlInputFormat.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlInputFormat.java 
b/src/java/org/apache/cassandra/hadoop/cql3/CqlInputFormat.java
index 6856175..534e66d 100644
--- a/src/java/org/apache/cassandra/hadoop/cql3/CqlInputFormat.java
+++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlInputFormat.java
@@ -53,10 +53,12 @@ import org.apache.cassandra.hadoop.*;
  *   ConfigHelper.setInputColumnFamily
  *
  * You can also configure the number of rows per InputSplit with
- *   ConfigHelper.setInputSplitSize. The default split size is 64k rows.
+ *   1: ConfigHelper.setInputSplitSize. The default split size is 64k rows.
+ *   or
+ *   2: ConfigHelper.setInputSplitSizeInMb. InputSplit size in MB with new, 
more precise method
+ *   If no value is provided for InputSplitSizeInMb, we default to using 
InputSplitSize.
  *
- *   the number of CQL rows per page
- *   CQLConfigHelper.setInputCQLPageRowSize. The default page row size is 
1000. You 
+ *   CQLConfigHelper.setInputCQLPageRowSize. The default page row size is 
1000. You
  *   should set it to "as big as possible, but no bigger." It set the LIMIT 
for the CQL 
  *   query, so you need set it big enough to minimize the network overhead, 
and also
  *   not too big to avoid out of memory issue.
@@ -213,9 +215,10 @@ public class CqlInputFormat extends 
org.apache.hadoop.mapreduce.InputFormat<Long
     private Map<TokenRange, Long> getSubSplits(String keyspace, String cfName, 
TokenRange range, Configuration conf) throws IOException
     {
         int splitSize = ConfigHelper.getInputSplitSize(conf);
+        int splitSizeMb = ConfigHelper.getInputSplitSizeInMb(conf);
         try
         {
-            return describeSplits(keyspace, cfName, range, splitSize);
+            return describeSplits(keyspace, cfName, range, splitSize, 
splitSizeMb);
         }
         catch (Exception e)
         {
@@ -235,7 +238,7 @@ public class CqlInputFormat extends 
org.apache.hadoop.mapreduce.InputFormat<Long
         }
     }
 
-    private Map<TokenRange, Long> describeSplits(String keyspace, String 
table, TokenRange tokenRange, int splitSize)
+    private Map<TokenRange, Long> describeSplits(String keyspace, String 
table, TokenRange tokenRange, int splitSize, int splitSizeMb)
     {
         String query = String.format("SELECT mean_partition_size, 
partitions_count " +
                                      "FROM %s.%s " +
@@ -256,7 +259,9 @@ public class CqlInputFormat extends 
org.apache.hadoop.mapreduce.InputFormat<Long
             meanPartitionSize = row.getLong("mean_partition_size");
             partitionCount = row.getLong("partitions_count");
 
-            splitCount = (int)((meanPartitionSize * partitionCount) / 
splitSize);
+            splitCount = splitSizeMb > 0
+                ? (int)(meanPartitionSize * partitionCount / splitSizeMb / 
1024 / 1024)
+                : (int)(partitionCount / splitSize);
         }
 
         // If we have no data on this split or the size estimate is 0,

Reply via email to