Pig: disable split combination, add split size param
Patch by Alex Liu, reviewed by brandonwilliams for CASSANDRA-5544


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/6d7404bc
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/6d7404bc
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/6d7404bc

Branch: refs/heads/trunk
Commit: 6d7404bc2eec2c34feb6e2b9db938f9e0e5ae208
Parents: 27e8f87
Author: Brandon Williams <brandonwilli...@apache.org>
Authored: Wed May 29 12:53:52 2013 -0500
Committer: Brandon Williams <brandonwilli...@apache.org>
Committed: Wed May 29 12:53:52 2013 -0500

----------------------------------------------------------------------
 CHANGES.txt                                        |    1 +
 examples/pig/README.txt                            |    4 +++
 .../cassandra/hadoop/pig/CassandraStorage.java     |   21 +++++++++++++++
 3 files changed, 26 insertions(+), 0 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/6d7404bc/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 34e5b52..61bd4b7 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -12,6 +12,7 @@
 Merged from 1.1:
  * Remove buggy thrift max message length option (CASSANDRA-5529)
  * Fix NPE in Pig's widerow mode (CASSANDRA-5488)
+ * Add split size parameter to Pig and disable split combination 
(CASSANDRA-5544)
 
 
 1.2.5

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6d7404bc/examples/pig/README.txt
----------------------------------------------------------------------
diff --git a/examples/pig/README.txt b/examples/pig/README.txt
index 3ef7858..e3d9af6 100644
--- a/examples/pig/README.txt
+++ b/examples/pig/README.txt
@@ -88,3 +88,7 @@ PIG_USE_SECONDARY:  this allows easy use of secondary indexes 
within your
                     can also be set in the LOAD url by adding the
                     'use_secondary=true' parameter.
 
+PIG_INPUT_SPLIT_SIZE: this sets the split size passed to Hadoop, controlling
+                      the amount of mapper tasks created.  This can also be 
set in the LOAD url by
+                      adding the 'split_size=X' parameter, where X is an 
integer amount for the size.
+

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6d7404bc/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java 
b/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
index 0854758..6490d05 100644
--- a/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
+++ b/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
@@ -77,6 +77,7 @@ public class CassandraStorage extends LoadFunc implements 
StoreFuncInterface, Lo
     public final static String PIG_ALLOW_DELETES = "PIG_ALLOW_DELETES";
     public final static String PIG_WIDEROW_INPUT = "PIG_WIDEROW_INPUT";
     public final static String PIG_USE_SECONDARY = "PIG_USE_SECONDARY";
+    public final static String PIG_INPUT_SPLIT_SIZE = "PIG_INPUT_SPLIT_SIZE";
 
     private final static String DEFAULT_INPUT_FORMAT = 
"org.apache.cassandra.hadoop.ColumnFamilyInputFormat";
     private final static String DEFAULT_OUTPUT_FORMAT = 
"org.apache.cassandra.hadoop.ColumnFamilyOutputFormat";
@@ -105,6 +106,7 @@ public class CassandraStorage extends LoadFunc implements 
StoreFuncInterface, Lo
     private int limit;
     private boolean widerows = false;
     private boolean usePartitionFilter = false;
+    private int splitSize = 64 * 1024;
     // wide row hacks
     private ByteBuffer lastKey;
     private Map<ByteBuffer,IColumn> lastRow;
@@ -516,6 +518,8 @@ public class CassandraStorage extends LoadFunc implements 
StoreFuncInterface, Lo
                     widerows = Boolean.parseBoolean(urlQuery.get("widerows"));
                 if (urlQuery.containsKey("use_secondary"))
                     usePartitionFilter = 
Boolean.parseBoolean(urlQuery.get("use_secondary"));
+                if (urlQuery.containsKey("split_size"))
+                    splitSize = Integer.parseInt(urlQuery.get("split_size"));
             }
             String[] parts = urlParts[0].split("/+");
             String[] credentialsAndKeyspace = parts[1].split("@");
@@ -591,6 +595,9 @@ public class CassandraStorage extends LoadFunc implements 
StoreFuncInterface, Lo
     public void setLocation(String location, Job job) throws IOException
     {
         conf = job.getConfiguration();
+        
+        // don't combine mappers to a single mapper per node
+        conf.setBoolean("pig.noSplitCombination", true);
         setLocationFromUri(location);
 
         if (ConfigHelper.getInputSlicePredicate(conf) == null)
@@ -603,12 +610,26 @@ public class CassandraStorage extends LoadFunc implements 
StoreFuncInterface, Lo
             widerows = Boolean.valueOf(System.getenv(PIG_WIDEROW_INPUT));
         if (System.getenv(PIG_USE_SECONDARY) != null)
             usePartitionFilter = 
Boolean.valueOf(System.getenv(PIG_USE_SECONDARY));
+        if (System.getenv(PIG_INPUT_SPLIT_SIZE) != null)
+        {
+            try
+            {
+                ConfigHelper.setInputSplitSize(conf, 
Integer.valueOf(System.getenv(PIG_INPUT_SPLIT_SIZE)));
+            }
+            catch(NumberFormatException e)
+            {
+                throw new RuntimeException("PIG_INPUT_SPLIT_SIZE is not a 
number", e);
+            }           
+        }
 
         if (usePartitionFilter && getIndexExpressions() != null)
             ConfigHelper.setInputRange(conf, getIndexExpressions());
 
         if (username != null && password != null)
             ConfigHelper.setInputKeyspaceUserNameAndPassword(conf, username, 
password);
+        
+        if (splitSize > 0)
+            ConfigHelper.setInputSplitSize(conf, splitSize);
 
         ConfigHelper.setInputColumnFamily(conf, keyspace, column_family, 
widerows);
         setConnectionInformation();

Reply via email to