Pig: disable split combination, add split size param Patch by Alex Liu, reviewed by brandonwilliams for CASSANDRA-5544
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/6d7404bc Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/6d7404bc Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/6d7404bc Branch: refs/heads/trunk Commit: 6d7404bc2eec2c34feb6e2b9db938f9e0e5ae208 Parents: 27e8f87 Author: Brandon Williams <brandonwilli...@apache.org> Authored: Wed May 29 12:53:52 2013 -0500 Committer: Brandon Williams <brandonwilli...@apache.org> Committed: Wed May 29 12:53:52 2013 -0500 ---------------------------------------------------------------------- CHANGES.txt | 1 + examples/pig/README.txt | 4 +++ .../cassandra/hadoop/pig/CassandraStorage.java | 21 +++++++++++++++ 3 files changed, 26 insertions(+), 0 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/6d7404bc/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 34e5b52..61bd4b7 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -12,6 +12,7 @@ Merged from 1.1: * Remove buggy thrift max message length option (CASSANDRA-5529) * Fix NPE in Pig's widerow mode (CASSANDRA-5488) + * Add split size parameter to Pig and disable split combination (CASSANDRA-5544) 1.2.5 http://git-wip-us.apache.org/repos/asf/cassandra/blob/6d7404bc/examples/pig/README.txt ---------------------------------------------------------------------- diff --git a/examples/pig/README.txt b/examples/pig/README.txt index 3ef7858..e3d9af6 100644 --- a/examples/pig/README.txt +++ b/examples/pig/README.txt @@ -88,3 +88,7 @@ PIG_USE_SECONDARY: this allows easy use of secondary indexes within your can also be set in the LOAD url by adding the 'use_secondary=true' parameter. +PIG_INPUT_SPLIT_SIZE: this sets the split size passed to Hadoop, controlling + the amount of mapper tasks created. This can also be set in the LOAD url by + adding the 'split_size=X' parameter, where X is an integer amount for the size. + http://git-wip-us.apache.org/repos/asf/cassandra/blob/6d7404bc/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java b/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java index 0854758..6490d05 100644 --- a/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java +++ b/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java @@ -77,6 +77,7 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo public final static String PIG_ALLOW_DELETES = "PIG_ALLOW_DELETES"; public final static String PIG_WIDEROW_INPUT = "PIG_WIDEROW_INPUT"; public final static String PIG_USE_SECONDARY = "PIG_USE_SECONDARY"; + public final static String PIG_INPUT_SPLIT_SIZE = "PIG_INPUT_SPLIT_SIZE"; private final static String DEFAULT_INPUT_FORMAT = "org.apache.cassandra.hadoop.ColumnFamilyInputFormat"; private final static String DEFAULT_OUTPUT_FORMAT = "org.apache.cassandra.hadoop.ColumnFamilyOutputFormat"; @@ -105,6 +106,7 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo private int limit; private boolean widerows = false; private boolean usePartitionFilter = false; + private int splitSize = 64 * 1024; // wide row hacks private ByteBuffer lastKey; private Map<ByteBuffer,IColumn> lastRow; @@ -516,6 +518,8 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo widerows = Boolean.parseBoolean(urlQuery.get("widerows")); if (urlQuery.containsKey("use_secondary")) usePartitionFilter = Boolean.parseBoolean(urlQuery.get("use_secondary")); + if (urlQuery.containsKey("split_size")) + splitSize = Integer.parseInt(urlQuery.get("split_size")); } String[] parts = urlParts[0].split("/+"); String[] credentialsAndKeyspace = parts[1].split("@"); @@ -591,6 +595,9 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo public void setLocation(String location, Job job) throws IOException { conf = job.getConfiguration(); + + // don't combine mappers to a single mapper per node + conf.setBoolean("pig.noSplitCombination", true); setLocationFromUri(location); if (ConfigHelper.getInputSlicePredicate(conf) == null) @@ -603,12 +610,26 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo widerows = Boolean.valueOf(System.getenv(PIG_WIDEROW_INPUT)); if (System.getenv(PIG_USE_SECONDARY) != null) usePartitionFilter = Boolean.valueOf(System.getenv(PIG_USE_SECONDARY)); + if (System.getenv(PIG_INPUT_SPLIT_SIZE) != null) + { + try + { + ConfigHelper.setInputSplitSize(conf, Integer.valueOf(System.getenv(PIG_INPUT_SPLIT_SIZE))); + } + catch(NumberFormatException e) + { + throw new RuntimeException("PIG_INPUT_SPLIT_SIZE is not a number", e); + } + } if (usePartitionFilter && getIndexExpressions() != null) ConfigHelper.setInputRange(conf, getIndexExpressions()); if (username != null && password != null) ConfigHelper.setInputKeyspaceUserNameAndPassword(conf, username, password); + + if (splitSize > 0) + ConfigHelper.setInputSplitSize(conf, splitSize); ConfigHelper.setInputColumnFamily(conf, keyspace, column_family, widerows); setConnectionInformation();