Don't add extraneous field with CqlStorage Patch by Sam Tunnicliffe, reviewed by Alex Liu for CASSANDRA-6071
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/389ff55e Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/389ff55e Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/389ff55e Branch: refs/heads/trunk Commit: 389ff55e2bbc3046a6ad1aba85bdaab0e38dc6e8 Parents: 00e871d Author: Brandon Williams <brandonwilli...@apache.org> Authored: Thu Sep 26 13:49:07 2013 -0500 Committer: Brandon Williams <brandonwilli...@apache.org> Committed: Thu Sep 26 13:49:07 2013 -0500 ---------------------------------------------------------------------- .../hadoop/pig/AbstractCassandraStorage.java | 151 ++----------------- .../cassandra/hadoop/pig/CassandraStorage.java | 2 +- .../apache/cassandra/hadoop/pig/CqlStorage.java | 144 +++++++++++++++++- 3 files changed, 153 insertions(+), 144 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/389ff55e/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java b/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java index 50671da..ce92014 100644 --- a/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java +++ b/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java @@ -641,7 +641,7 @@ public abstract class AbstractCassandraStorage extends LoadFunc implements Store NotFoundException; /** get column meta data */ - protected List<ColumnDef> getColumnMeta(Cassandra.Client client, boolean cassandraStorage) + protected List<ColumnDef> getColumnMeta(Cassandra.Client client, boolean cassandraStorage, boolean includeCompactValueColumn) throws InvalidRequestException, UnavailableException, TimedOutException, @@ -666,9 +666,13 @@ public abstract class AbstractCassandraStorage extends LoadFunc implements Store List<CqlRow> rows = result.rows; List<ColumnDef> columnDefs = new ArrayList<ColumnDef>(); - if (!cassandraStorage && (rows == null || rows.isEmpty())) + if (rows == null || rows.isEmpty()) { - // check classic thrift tables + // if CassandraStorage, just return the empty list + if (cassandraStorage) + return columnDefs; + + // otherwise for CqlStorage, check metadata for classic thrift tables CFDefinition cfDefinition = getCfDefinition(keyspace, column_family, client); for (ColumnIdentifier column : cfDefinition.metadata.keySet()) { @@ -680,7 +684,9 @@ public abstract class AbstractCassandraStorage extends LoadFunc implements Store cDef.validation_class = type; columnDefs.add(cDef); } - if (columnDefs.size() == 0) + // we may not need to include the value column for compact tables as we + // could have already processed it as schema_columnfamilies.value_alias + if (columnDefs.size() == 0 && includeCompactValueColumn) { String value = cfDefinition.value != null ? cfDefinition.value.toString() : null; if ("value".equals(value)) @@ -693,8 +699,6 @@ public abstract class AbstractCassandraStorage extends LoadFunc implements Store } return columnDefs; } - else if (rows == null || rows.isEmpty()) - return columnDefs; Iterator<CqlRow> iterator = rows.iterator(); while (iterator.hasNext()) @@ -711,138 +715,6 @@ public abstract class AbstractCassandraStorage extends LoadFunc implements Store return columnDefs; } - /** get keys meta data */ - protected List<ColumnDef> getKeysMeta(Cassandra.Client client) - throws Exception - { - String query = "SELECT key_aliases, " + - " column_aliases, " + - " key_validator, " + - " comparator, " + - " keyspace_name, " + - " value_alias, " + - " default_validator " + - "FROM system.schema_columnfamilies " + - "WHERE keyspace_name = '%s'" + - " AND columnfamily_name = '%s' "; - - CqlResult result = client.execute_cql3_query( - ByteBufferUtil.bytes(String.format(query, keyspace, column_family)), - Compression.NONE, - ConsistencyLevel.ONE); - - if (result == null || result.rows == null || result.rows.isEmpty()) - return null; - - Iterator<CqlRow> iteraRow = result.rows.iterator(); - List<ColumnDef> keys = new ArrayList<ColumnDef>(); - if (iteraRow.hasNext()) - { - CqlRow cqlRow = iteraRow.next(); - String name = ByteBufferUtil.string(cqlRow.columns.get(4).value); - logger.debug("Found ksDef name: {}", name); - String keyString = ByteBufferUtil.string(ByteBuffer.wrap(cqlRow.columns.get(0).getValue())); - - logger.debug("partition keys: {}", keyString); - List<String> keyNames = FBUtilities.fromJsonList(keyString); - - Iterator<String> iterator = keyNames.iterator(); - while (iterator.hasNext()) - { - ColumnDef cDef = new ColumnDef(); - cDef.name = ByteBufferUtil.bytes(iterator.next()); - keys.add(cDef); - } - // classic thrift tables - if (keys.size() == 0) - { - CFDefinition cfDefinition = getCfDefinition(keyspace, column_family, client); - for (ColumnIdentifier column : cfDefinition.keys.keySet()) - { - String key = column.toString(); - logger.debug("name: {} ", key); - ColumnDef cDef = new ColumnDef(); - cDef.name = ByteBufferUtil.bytes(key); - keys.add(cDef); - } - for (ColumnIdentifier column : cfDefinition.columns.keySet()) - { - String key = column.toString(); - logger.debug("name: {} ", key); - ColumnDef cDef = new ColumnDef(); - cDef.name = ByteBufferUtil.bytes(key); - keys.add(cDef); - } - } - - keyString = ByteBufferUtil.string(ByteBuffer.wrap(cqlRow.columns.get(1).getValue())); - - logger.debug("cluster keys: {}", keyString); - keyNames = FBUtilities.fromJsonList(keyString); - - iterator = keyNames.iterator(); - while (iterator.hasNext()) - { - ColumnDef cDef = new ColumnDef(); - cDef.name = ByteBufferUtil.bytes(iterator.next()); - keys.add(cDef); - } - - String validator = ByteBufferUtil.string(ByteBuffer.wrap(cqlRow.columns.get(2).getValue())); - logger.debug("row key validator: {}", validator); - AbstractType<?> keyValidator = parseType(validator); - - Iterator<ColumnDef> keyItera = keys.iterator(); - if (keyValidator instanceof CompositeType) - { - Iterator<AbstractType<?>> typeItera = ((CompositeType) keyValidator).types.iterator(); - while (typeItera.hasNext()) - keyItera.next().validation_class = typeItera.next().toString(); - } - else - keyItera.next().validation_class = keyValidator.toString(); - - validator = ByteBufferUtil.string(ByteBuffer.wrap(cqlRow.columns.get(3).getValue())); - logger.debug("cluster key validator: {}", validator); - - if (keyItera.hasNext() && validator != null && !validator.isEmpty()) - { - AbstractType<?> clusterKeyValidator = parseType(validator); - - if (clusterKeyValidator instanceof CompositeType) - { - Iterator<AbstractType<?>> typeItera = ((CompositeType) clusterKeyValidator).types.iterator(); - while (keyItera.hasNext()) - keyItera.next().validation_class = typeItera.next().toString(); - } - else - keyItera.next().validation_class = clusterKeyValidator.toString(); - } - - // compact value_alias column - if (cqlRow.columns.get(5).value != null) - { - try - { - String compactValidator = ByteBufferUtil.string(ByteBuffer.wrap(cqlRow.columns.get(6).getValue())); - logger.debug("default validator: {}", compactValidator); - AbstractType<?> defaultValidator = parseType(compactValidator); - - ColumnDef cDef = new ColumnDef(); - cDef.name = cqlRow.columns.get(5).value; - cDef.validation_class = defaultValidator.toString(); - keys.add(cDef); - } - catch (Exception e) - { - // no compact column at value_alias - } - } - - } - return keys; - } - /** get index type from string */ protected IndexType getIndexType(String type) { @@ -884,9 +756,8 @@ public abstract class AbstractCassandraStorage extends LoadFunc implements Store return indexes; } - /** get CFDefinition of a column family */ - private CFDefinition getCfDefinition(String ks, String cf, Cassandra.Client client) + protected CFDefinition getCfDefinition(String ks, String cf, Cassandra.Client client) throws NotFoundException, InvalidRequestException, TException, http://git-wip-us.apache.org/repos/asf/cassandra/blob/389ff55e/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java b/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java index 8cf06f2..09171a0 100644 --- a/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java +++ b/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java @@ -711,7 +711,7 @@ public class CassandraStorage extends AbstractCassandraStorage if (cql3Table) return new ArrayList<ColumnDef>(); - return getColumnMeta(client, true); + return getColumnMeta(client, true, true); } /** convert key to a tuple */ http://git-wip-us.apache.org/repos/asf/cassandra/blob/389ff55e/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java b/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java index 7780ca9..79abc2c 100644 --- a/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java +++ b/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java @@ -23,6 +23,8 @@ import java.nio.charset.CharacterCodingException; import java.util.*; +import org.apache.cassandra.cql3.CFDefinition; +import org.apache.cassandra.cql3.ColumnIdentifier; import org.apache.cassandra.db.IColumn; import org.apache.cassandra.db.Column; import org.apache.cassandra.db.marshal.*; @@ -31,6 +33,8 @@ import org.apache.cassandra.hadoop.*; import org.apache.cassandra.hadoop.cql3.CqlConfigHelper; import org.apache.cassandra.thrift.*; import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.cassandra.utils.FBUtilities; + import org.apache.hadoop.mapreduce.*; import org.apache.pig.Expression; import org.apache.pig.Expression.OpType; @@ -61,7 +65,8 @@ public class CqlStorage extends AbstractCassandraStorage private String columns; private String outputQuery; private String whereClause; - + private boolean hasCompactValueAlias = false; + public CqlStorage() { this(1000); @@ -450,7 +455,7 @@ public class CqlStorage extends AbstractCassandraStorage } // get other columns - List<ColumnDef> columns = getColumnMeta(client, false); + List<ColumnDef> columns = getColumnMeta(client, false, !hasCompactValueAlias); // combine all columns in a list if (keyColumns != null && columns != null) @@ -458,7 +463,140 @@ public class CqlStorage extends AbstractCassandraStorage return keyColumns; } - + + /** get keys meta data */ + protected List<ColumnDef> getKeysMeta(Cassandra.Client client) + throws Exception + { + String query = "SELECT key_aliases, " + + " column_aliases, " + + " key_validator, " + + " comparator, " + + " keyspace_name, " + + " value_alias, " + + " default_validator " + + "FROM system.schema_columnfamilies " + + "WHERE keyspace_name = '%s'" + + " AND columnfamily_name = '%s' "; + + CqlResult result = client.execute_cql3_query( + ByteBufferUtil.bytes(String.format(query, keyspace, column_family)), + Compression.NONE, + ConsistencyLevel.ONE); + + if (result == null || result.rows == null || result.rows.isEmpty()) + return null; + + Iterator<CqlRow> iteraRow = result.rows.iterator(); + List<ColumnDef> keys = new ArrayList<ColumnDef>(); + if (iteraRow.hasNext()) + { + CqlRow cqlRow = iteraRow.next(); + String name = ByteBufferUtil.string(cqlRow.columns.get(4).value); + logger.debug("Found ksDef name: {}", name); + String keyString = ByteBufferUtil.string(ByteBuffer.wrap(cqlRow.columns.get(0).getValue())); + + logger.debug("partition keys: {}", keyString); + List<String> keyNames = FBUtilities.fromJsonList(keyString); + + Iterator<String> iterator = keyNames.iterator(); + while (iterator.hasNext()) + { + ColumnDef cDef = new ColumnDef(); + cDef.name = ByteBufferUtil.bytes(iterator.next()); + keys.add(cDef); + } + // classic thrift tables + if (keys.size() == 0) + { + CFDefinition cfDefinition = getCfDefinition(keyspace, column_family, client); + for (ColumnIdentifier column : cfDefinition.keys.keySet()) + { + String key = column.toString(); + logger.debug("name: {} ", key); + ColumnDef cDef = new ColumnDef(); + cDef.name = ByteBufferUtil.bytes(key); + keys.add(cDef); + } + for (ColumnIdentifier column : cfDefinition.columns.keySet()) + { + String key = column.toString(); + logger.debug("name: {} ", key); + ColumnDef cDef = new ColumnDef(); + cDef.name = ByteBufferUtil.bytes(key); + keys.add(cDef); + } + } + + keyString = ByteBufferUtil.string(ByteBuffer.wrap(cqlRow.columns.get(1).getValue())); + + logger.debug("cluster keys: {}", keyString); + keyNames = FBUtilities.fromJsonList(keyString); + + iterator = keyNames.iterator(); + while (iterator.hasNext()) + { + ColumnDef cDef = new ColumnDef(); + cDef.name = ByteBufferUtil.bytes(iterator.next()); + keys.add(cDef); + } + + String validator = ByteBufferUtil.string(ByteBuffer.wrap(cqlRow.columns.get(2).getValue())); + logger.debug("row key validator: {}", validator); + AbstractType<?> keyValidator = parseType(validator); + + Iterator<ColumnDef> keyItera = keys.iterator(); + if (keyValidator instanceof CompositeType) + { + Iterator<AbstractType<?>> typeItera = ((CompositeType) keyValidator).types.iterator(); + while (typeItera.hasNext()) + keyItera.next().validation_class = typeItera.next().toString(); + } + else + keyItera.next().validation_class = keyValidator.toString(); + + validator = ByteBufferUtil.string(ByteBuffer.wrap(cqlRow.columns.get(3).getValue())); + logger.debug("cluster key validator: {}", validator); + + if (keyItera.hasNext() && validator != null && !validator.isEmpty()) + { + AbstractType<?> clusterKeyValidator = parseType(validator); + + if (clusterKeyValidator instanceof CompositeType) + { + Iterator<AbstractType<?>> typeItera = ((CompositeType) clusterKeyValidator).types.iterator(); + while (keyItera.hasNext()) + keyItera.next().validation_class = typeItera.next().toString(); + } + else + keyItera.next().validation_class = clusterKeyValidator.toString(); + } + + // compact value_alias column + if (cqlRow.columns.get(5).value != null) + { + try + { + String compactValidator = ByteBufferUtil.string(ByteBuffer.wrap(cqlRow.columns.get(6).getValue())); + logger.debug("default validator: {}", compactValidator); + AbstractType<?> defaultValidator = parseType(compactValidator); + + ColumnDef cDef = new ColumnDef(); + cDef.name = cqlRow.columns.get(5).value; + cDef.validation_class = defaultValidator.toString(); + keys.add(cDef); + hasCompactValueAlias = true; + } + catch (Exception e) + { + // no compact column at value_alias + } + } + + } + return keys; + } + /** cql://[username:password@]<keyspace>/<columnfamily>[?[page_size=<size>] * [&columns=<col1,col2>][&output_query=<prepared_statement_query>][&where_clause=<clause>] * [&split_size=<size>][&partitioner=<partitioner>][&use_secondary=true|false]] */