Give CRR a default input_cql Statement Patch by Mike Adamson, reviewed by brandonwilliams for CASSANDRA-7226
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/52df514d Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/52df514d Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/52df514d Branch: refs/heads/cassandra-2.1.0 Commit: 52df514dd1a95d4fc4d699d6ffa9d3bf7e844854 Parents: bd0bb6d Author: Brandon Williams <brandonwilli...@apache.org> Authored: Mon Aug 11 13:00:39 2014 -0500 Committer: Brandon Williams <brandonwilli...@apache.org> Committed: Mon Aug 11 13:02:34 2014 -0500 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../cassandra/hadoop/cql3/CqlConfigHelper.java | 2 +- .../cassandra/hadoop/cql3/CqlRecordReader.java | 138 ++++++++++++++++++- .../cassandra/hadoop/pig/CqlNativeStorage.java | 15 +- .../apache/cassandra/hadoop/pig/CqlStorage.java | 4 +- 5 files changed, 150 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/52df514d/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index cd51e04..ddf4627 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 2.0.10 + * Give CRR a default input_cql Statement (CASSANDRA-7226) * Better error message when adding a collection with the same name than a previously dropped one (CASSANDRA-6276) * Fix validation when adding static columns (CASSANDRA-7730) http://git-wip-us.apache.org/repos/asf/cassandra/blob/52df514d/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java b/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java index 7a5fd47..b2c8fbf 100644 --- a/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java +++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java @@ -60,7 +60,7 @@ import com.google.common.collect.Sets; public class CqlConfigHelper { - private static final String INPUT_CQL_COLUMNS_CONFIG = "cassandra.input.columnfamily.columns"; // separate by colon , + private static final String INPUT_CQL_COLUMNS_CONFIG = "cassandra.input.columnfamily.columns"; private static final String INPUT_CQL_PAGE_ROW_SIZE_CONFIG = "cassandra.input.page.row.size"; private static final String INPUT_CQL_WHERE_CLAUSE_CONFIG = "cassandra.input.where.clause"; private static final String INPUT_CQL = "cassandra.input.cql"; http://git-wip-us.apache.org/repos/asf/cassandra/blob/52df514d/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordReader.java b/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordReader.java index 88c5c33..74310cf 100644 --- a/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordReader.java +++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordReader.java @@ -24,9 +24,17 @@ import java.net.InetAddress; import java.nio.ByteBuffer; import java.util.*; +import com.google.common.base.Function; +import com.google.common.base.Joiner; +import com.google.common.base.Optional; +import com.google.common.base.Splitter; import com.google.common.collect.AbstractIterator; +import com.google.common.collect.Iterables; import com.google.common.collect.Maps; +import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.lang.StringUtils; + import org.apache.cassandra.hadoop.HadoopCompat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -63,6 +71,8 @@ public class CqlRecordReader extends RecordReader<Long, Row> { private static final Logger logger = LoggerFactory.getLogger(CqlRecordReader.class); + public static final int DEFAULT_CQL_PAGE_LIMIT = 1000; + private ColumnFamilySplit split; private RowIterator rowIterator; @@ -74,6 +84,12 @@ public class CqlRecordReader extends RecordReader<Long, Row> private Cluster cluster; private Session session; private IPartitioner partitioner; + private String inputColumns; + private String userDefinedWhereClauses; + private int pageRowSize; + + private List<String> partitionKeys = new ArrayList<>(); + private List<String> clusteringKeys = new ArrayList<>(); // partition keys -- key aliases private LinkedHashMap<String, Boolean> partitionBoundColumns = Maps.newLinkedHashMap(); @@ -92,8 +108,18 @@ public class CqlRecordReader extends RecordReader<Long, Row> : ConfigHelper.getInputSplitSize(conf); cfName = quote(ConfigHelper.getInputColumnFamily(conf)); keyspace = quote(ConfigHelper.getInputKeyspace(conf)); - cqlQuery = CqlConfigHelper.getInputCql(conf); - partitioner = ConfigHelper.getInputPartitioner(HadoopCompat.getConfiguration(context)); + partitioner = ConfigHelper.getInputPartitioner(conf); + inputColumns = CqlConfigHelper.getInputcolumns(conf); + userDefinedWhereClauses = CqlConfigHelper.getInputWhereClauses(conf); + Optional<Integer> pageRowSizeOptional = CqlConfigHelper.getInputPageRowSize(conf); + try + { + pageRowSize = pageRowSizeOptional.isPresent() ? pageRowSizeOptional.get() : DEFAULT_CQL_PAGE_LIMIT; + } + catch(NumberFormatException e) + { + pageRowSize = DEFAULT_CQL_PAGE_LIMIT; + } try { if (cluster != null) @@ -125,6 +151,20 @@ public class CqlRecordReader extends RecordReader<Long, Row> if (cluster != null) session = cluster.connect(keyspace); + + if (session == null) + throw new RuntimeException("Can't create connection session"); + + // If the user provides a CQL query then we will use it without validation + // otherwise we will fall back to building a query using the: + // inputColumns + // whereClauses + // pageRowSize + cqlQuery = CqlConfigHelper.getInputCql(conf); + if (StringUtils.isEmpty(cqlQuery)) + cqlQuery = buildQuery(); + logger.debug("cqlQuery {}", cqlQuery); + rowIterator = new RowIterator(); logger.debug("created {}", rowIterator); } @@ -224,9 +264,6 @@ public class CqlRecordReader extends RecordReader<Long, Row> public RowIterator() { - if (session == null) - throw new RuntimeException("Can't create connection session"); - AbstractType type = partitioner.getTokenValidator(); ResultSet rs = session.execute(cqlQuery, type.compose(type.fromString(split.getStartToken())), type.compose(type.fromString(split.getEndToken())) ); for (ColumnMetadata meta : cluster.getMetadata().getKeyspace(keyspace).getTable(cfName).getPartitionKey()) @@ -487,6 +524,97 @@ public class CqlRecordReader extends RecordReader<Long, Row> } } + /** + * Build a query for the reader of the form: + * + * SELECT * FROM ks>cf token(pk1,...pkn)>? AND token(pk1,...pkn)<=? [AND user where clauses] + * LIMIT pageRowSize [ALLOW FILTERING] + */ + private String buildQuery() + { + fetchKeys(); + + String selectColumnList = makeColumnList(getSelectColumns()); + String partitionKeyList = makeColumnList(partitionKeys); + + return String.format("SELECT %s FROM %s.%s WHERE token(%s)>? AND token(%s)<=?" + getAdditionalWhereClauses(), + selectColumnList, keyspace, cfName, partitionKeyList, partitionKeyList); + } + + private String getAdditionalWhereClauses() + { + String whereClause = ""; + if (StringUtils.isNotEmpty(userDefinedWhereClauses)) + whereClause += " AND " + userDefinedWhereClauses; + whereClause += " LIMIT " + pageRowSize; + if (StringUtils.isNotEmpty(userDefinedWhereClauses)) + whereClause += " ALLOW FILTERING"; + return whereClause; + } + + private List<String> getSelectColumns() + { + List<String> selectColumns = new ArrayList<>(); + + if (StringUtils.isEmpty(inputColumns)) + selectColumns.add("*"); + else + { + // We must select all the partition keys plus any other columns the user wants + selectColumns.addAll(partitionKeys); + for (String column : Splitter.on(',').split(inputColumns)) + { + if (!partitionKeys.contains(column)) + selectColumns.add(column); + } + } + return selectColumns; + } + + private String makeColumnList(Collection<String> columns) + { + return Joiner.on(',').join(Iterables.transform(columns, new Function<String, String>() + { + public String apply(String column) + { + return quote(column); + } + })); + } + + private void fetchKeys() + { + String query = "SELECT column_name, component_index, type FROM system.schema_columns WHERE keyspace_name='" + + keyspace + "' and columnfamily_name='" + cfName + "'"; + List<Row> rows = session.execute(query).all(); + if (CollectionUtils.isEmpty(rows)) + { + throw new RuntimeException("No table metadata found for " + keyspace + "." + cfName); + } + int numberOfPartitionKeys = 0; + for (Row row : rows) + if (row.getString(2).equals("partition_key")) + numberOfPartitionKeys++; + String[] partitionKeyArray = new String[numberOfPartitionKeys]; + for (Row row : rows) + { + String type = row.getString(2); + String column = row.getString(0); + if (type.equals("partition_key")) + { + int componentIndex = row.isNull(1) ? 0 : row.getInt(1); + partitionKeyArray[componentIndex] = column; + } + else if (type.equals("clustering_key")) + { + clusteringKeys.add(column); + } + } + partitionKeys.addAll(Arrays.asList(partitionKeyArray)); + } + + + private String quote(String identifier) { return "\"" + identifier.replaceAll("\"", "\"\"") + "\""; http://git-wip-us.apache.org/repos/asf/cassandra/blob/52df514d/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java b/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java index 1e48bf4..eea5d4e 100644 --- a/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java +++ b/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java @@ -177,7 +177,13 @@ public class CqlNativeStorage extends CqlStorage setConnectionInformation(); CqlConfigHelper.setInputCQLPageRowSize(conf, String.valueOf(pageSize)); - CqlConfigHelper.setInputCql(conf, inputCql); + if (inputCql != null) + CqlConfigHelper.setInputCql(conf, inputCql); + if (columns != null) + CqlConfigHelper.setInputColumns(conf, columns); + if (whereClause != null) + CqlConfigHelper.setInputWhereClauses(conf, whereClause); + if (System.getenv(PIG_INPUT_SPLIT_SIZE) != null) { try @@ -270,6 +276,10 @@ public class CqlNativeStorage extends CqlStorage nativeSSLCipherSuites = urlQuery.get("cipher_suites"); if (urlQuery.containsKey("input_cql")) inputCql = urlQuery.get("input_cql"); + if (urlQuery.containsKey("columns")) + columns = urlQuery.get("columns"); + if (urlQuery.containsKey("where_clause")) + whereClause = urlQuery.get("where_clause"); if (urlQuery.containsKey("rpc_port")) rpcPort = urlQuery.get("rpc_port"); } @@ -299,7 +309,8 @@ public class CqlNativeStorage extends CqlStorage "[&send_buff_size=<send_buff_size>][&solinger=<solinger>][&tcp_nodelay=<tcp_nodelay>][&reuse_address=<reuse_address>]" + "[&keep_alive=<keep_alive>][&auth_provider=<auth_provider>][&trust_store_path=<trust_store_path>]" + "[&key_store_path=<key_store_path>][&trust_store_password=<trust_store_password>]" + - "[&key_store_password=<key_store_password>][&cipher_suites=<cipher_suites>][&input_cql=<input_cql>]]': " + e.getMessage()); + "[&key_store_password=<key_store_password>][&cipher_suites=<cipher_suites>][&input_cql=<input_cql>]" + + "[columns=<columns>][where_clause=<where_clause>]]': " + e.getMessage()); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/52df514d/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java b/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java index 02a6d98..53f3900 100644 --- a/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java +++ b/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java @@ -59,9 +59,9 @@ public class CqlStorage extends AbstractCassandraStorage protected RecordWriter<Map<String, ByteBuffer>, List<ByteBuffer>> writer; protected int pageSize = 1000; - private String columns; + protected String columns; protected String outputQuery; - private String whereClause; + protected String whereClause; private boolean hasCompactValueAlias = false; public CqlStorage()