Support thrift tables in Pig CqlStorage Patch by Alex Liu, reviewed by brandonwilliams for CASSANDRA-5847
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/f5618e36 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/f5618e36 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/f5618e36 Branch: refs/heads/cassandra-2.0 Commit: f5618e36dcec78c0fb791327defad14b4488b235 Parents: 8bedb57 Author: Brandon Williams <brandonwilli...@apache.org> Authored: Wed Sep 11 10:16:19 2013 -0500 Committer: Brandon Williams <brandonwilli...@apache.org> Committed: Wed Sep 11 10:16:19 2013 -0500 ---------------------------------------------------------------------- .../hadoop/pig/AbstractCassandraStorage.java | 182 ++++++++++++++----- .../cassandra/hadoop/pig/CassandraStorage.java | 8 +- .../apache/cassandra/hadoop/pig/CqlStorage.java | 10 +- 3 files changed, 147 insertions(+), 53 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/f5618e36/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java b/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java index 03805d2..68e18c8 100644 --- a/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java +++ b/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java @@ -29,6 +29,9 @@ import java.util.*; import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.exceptions.SyntaxException; import org.apache.cassandra.auth.IAuthenticator; +import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.cql3.CFDefinition; +import org.apache.cassandra.cql3.ColumnIdentifier; import org.apache.cassandra.db.Column; import org.apache.cassandra.db.IColumn; import org.apache.cassandra.db.marshal.*; @@ -205,6 +208,8 @@ public abstract class AbstractCassandraStorage extends LoadFunc implements Store try { validator = TypeParser.parse(cd.getValidation_class()); + if (validator instanceof CounterColumnType) + validator = LongType.instance; validators.put(cd.name, validator); } catch (ConfigurationException e) @@ -515,27 +520,7 @@ public abstract class AbstractCassandraStorage extends LoadFunc implements Store column_family, keyspace)); } - catch (TException e) - { - throw new RuntimeException(e); - } - catch (InvalidRequestException e) - { - throw new RuntimeException(e); - } - catch (IOException e) - { - throw new RuntimeException(e); - } - catch (UnavailableException e) - { - throw new RuntimeException(e); - } - catch (TimedOutException e) - { - throw new RuntimeException(e); - } - catch (SchemaDisagreementException e) + catch (Exception e) { throw new RuntimeException(e); } @@ -582,15 +567,19 @@ public abstract class AbstractCassandraStorage extends LoadFunc implements Store TimedOutException, SchemaDisagreementException, TException, - CharacterCodingException + CharacterCodingException, + NotFoundException, + org.apache.cassandra.exceptions.InvalidRequestException, + ConfigurationException { // get CF meta data - String query = "SELECT type, " + + String query = "SELECT type," + " comparator," + " subcomparator," + - " default_validator, " + + " default_validator," + " key_validator," + - " key_aliases " + + " key_aliases," + + " key_alias " + "FROM system.schema_columnfamilies " + "WHERE keyspace_name = '%s' " + " AND columnfamily_name = '%s' "; @@ -624,10 +613,27 @@ public abstract class AbstractCassandraStorage extends LoadFunc implements Store { String keyAliases = ByteBufferUtil.string(cqlRow.columns.get(5).value); keys = FBUtilities.fromJsonList(keyAliases); + // classis thrift tables + if (keys.size() == 0 && cqlRow.columns.get(6).value == null) + { + CFDefinition cfDefinition = getCfDefinition(keyspace, column_family, client); + for (ColumnIdentifier column : cfDefinition.keys.keySet()) + { + String key = column.toString(); + String type = cfDefinition.keys.get(column).type.toString(); + logger.debug("name: {}, type: {} ", key, type); + keys.add(key); + } + } + else + cql3Table = true; + } + else + { + String keyAlias = ByteBufferUtil.string(cqlRow.columns.get(6).value); + keys = new ArrayList<String>(1); + keys.add(keyAlias); } - // get column meta data - if (keys != null && keys.size() > 0) - cql3Table = true; } cfDef.column_metadata = getColumnMetadata(client, cql3Table); return cfDef; @@ -640,16 +646,22 @@ public abstract class AbstractCassandraStorage extends LoadFunc implements Store TimedOutException, SchemaDisagreementException, TException, - CharacterCodingException; + CharacterCodingException, + org.apache.cassandra.exceptions.InvalidRequestException, + ConfigurationException, + NotFoundException; /** get column meta data */ - protected List<ColumnDef> getColumnMeta(Cassandra.Client client) + protected List<ColumnDef> getColumnMeta(Cassandra.Client client, boolean cassandraStorage) throws InvalidRequestException, UnavailableException, TimedOutException, SchemaDisagreementException, TException, - CharacterCodingException + CharacterCodingException, + org.apache.cassandra.exceptions.InvalidRequestException, + ConfigurationException, + NotFoundException { String query = "SELECT column_name, " + " validator, " + @@ -665,7 +677,34 @@ public abstract class AbstractCassandraStorage extends LoadFunc implements Store List<CqlRow> rows = result.rows; List<ColumnDef> columnDefs = new ArrayList<ColumnDef>(); - if (rows == null || rows.isEmpty()) + if (!cassandraStorage && (rows == null || rows.isEmpty())) + { + // check classic thrift tables + CFDefinition cfDefinition = getCfDefinition(keyspace, column_family, client); + for (ColumnIdentifier column : cfDefinition.metadata.keySet()) + { + ColumnDef cDef = new ColumnDef(); + String columnName = column.toString(); + String type = cfDefinition.metadata.get(column).type.toString(); + logger.debug("name: {}, type: {} ", columnName, type); + cDef.name = ByteBufferUtil.bytes(columnName); + cDef.validation_class = type; + columnDefs.add(cDef); + } + if (columnDefs.size() == 0) + { + String value = cfDefinition.value != null ? cfDefinition.value.toString() : null; + if ("value".equals(value)) + { + ColumnDef cDef = new ColumnDef(); + cDef.name = ByteBufferUtil.bytes(value); + cDef.validation_class = cfDefinition.value.type.toString(); + columnDefs.add(cDef); + } + } + return columnDefs; + } + else if (rows == null || rows.isEmpty()) return columnDefs; Iterator<CqlRow> iterator = rows.iterator(); @@ -683,14 +722,9 @@ public abstract class AbstractCassandraStorage extends LoadFunc implements Store return columnDefs; } - /** get keys meta data */ + /** get keys meta data */ protected List<ColumnDef> getKeysMeta(Cassandra.Client client) - throws InvalidRequestException, - UnavailableException, - TimedOutException, - SchemaDisagreementException, - TException, - IOException + throws Exception { String query = "SELECT key_aliases, " + " column_aliases, " + @@ -698,7 +732,8 @@ public abstract class AbstractCassandraStorage extends LoadFunc implements Store " comparator, " + " keyspace_name, " + " value_alias, " + - " default_validator " + + " default_validator," + + " key_alias " + "FROM system.schema_columnfamilies " + "WHERE keyspace_name = '%s'" + " AND columnfamily_name = '%s' "; @@ -719,19 +754,52 @@ public abstract class AbstractCassandraStorage extends LoadFunc implements Store CqlRow cqlRow = iteraRow.next(); String name = ByteBufferUtil.string(cqlRow.columns.get(4).value); logger.debug("Found ksDef name: {}", name); - String keyString = ByteBufferUtil.string(ByteBuffer.wrap(cqlRow.columns.get(0).getValue())); - - logger.debug("partition keys: {}", keyString); - List<String> keyNames = FBUtilities.fromJsonList(keyString); - - Iterator<String> iterator = keyNames.iterator(); - while (iterator.hasNext()) + String keyString; + List<String> keyNames; + Iterator<String> iterator; + if (cqlRow.columns.get(0).getValue() == null) { ColumnDef cDef = new ColumnDef(); - cDef.name = ByteBufferUtil.bytes(iterator.next()); + cDef.name = ByteBuffer.wrap(result.rows.get(0).columns.get(7).getValue()); keys.add(cDef); } + else + { + keyString = ByteBufferUtil.string(ByteBuffer.wrap(cqlRow.columns.get(0).getValue())); + + logger.debug("partition keys: {}", keyString); + keyNames = FBUtilities.fromJsonList(keyString); + + iterator = keyNames.iterator(); + while (iterator.hasNext()) + { + ColumnDef cDef = new ColumnDef(); + cDef.name = ByteBufferUtil.bytes(iterator.next()); + keys.add(cDef); + } + // classic thrift tables + if (keys.size() == 0) + { + CFDefinition cfDefinition = getCfDefinition(keyspace, column_family, client); + for (ColumnIdentifier column : cfDefinition.keys.keySet()) + { + String key = column.toString(); + logger.debug("name: {} ", key); + ColumnDef cDef = new ColumnDef(); + cDef.name = ByteBufferUtil.bytes(key); + keys.add(cDef); + } + for (ColumnIdentifier column : cfDefinition.columns.keySet()) + { + String key = column.toString(); + logger.debug("name: {} ", key); + ColumnDef cDef = new ColumnDef(); + cDef.name = ByteBufferUtil.bytes(key); + keys.add(cDef); + } + } + } keyString = ByteBufferUtil.string(ByteBuffer.wrap(cqlRow.columns.get(1).getValue())); logger.debug("cluster keys: {}", keyString); @@ -840,5 +908,23 @@ public abstract class AbstractCassandraStorage extends LoadFunc implements Store } return indexes; } + + + /** get CFDefinition of a column family */ + private CFDefinition getCfDefinition(String ks, String cf, Cassandra.Client client) + throws NotFoundException, + InvalidRequestException, + TException, + org.apache.cassandra.exceptions.InvalidRequestException, + ConfigurationException + { + KsDef ksDef = client.describe_keyspace(ks); + for (CfDef cfDef : ksDef.cf_defs) + { + if (cfDef.name.equalsIgnoreCase(cf)) + return new CFDefinition(CFMetaData.fromThrift(cfDef)); + } + return null; + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/f5618e36/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java b/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java index add4395..dbdd5e9 100644 --- a/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java +++ b/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java @@ -25,6 +25,7 @@ import java.util.*; import org.apache.cassandra.db.IColumn; import org.apache.cassandra.db.marshal.*; +import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.hadoop.*; import org.apache.cassandra.thrift.*; import org.apache.cassandra.utils.ByteBufferUtil; @@ -702,12 +703,15 @@ public class CassandraStorage extends AbstractCassandraStorage TimedOutException, SchemaDisagreementException, TException, - CharacterCodingException + CharacterCodingException, + org.apache.cassandra.exceptions.InvalidRequestException, + ConfigurationException, + NotFoundException { if (cql3Table) return new ArrayList<ColumnDef>(); - return getColumnMeta(client); + return getColumnMeta(client, true); } /** convert key to a tuple */ http://git-wip-us.apache.org/repos/asf/cassandra/blob/f5618e36/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java b/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java index a73e5a5..b35e13a 100644 --- a/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java +++ b/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java @@ -26,6 +26,7 @@ import java.util.*; import org.apache.cassandra.db.IColumn; import org.apache.cassandra.db.Column; import org.apache.cassandra.db.marshal.*; +import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.hadoop.*; import org.apache.cassandra.hadoop.cql3.CqlConfigHelper; import org.apache.cassandra.thrift.*; @@ -432,7 +433,10 @@ public class CqlStorage extends AbstractCassandraStorage TimedOutException, SchemaDisagreementException, TException, - CharacterCodingException + CharacterCodingException, + org.apache.cassandra.exceptions.InvalidRequestException, + ConfigurationException, + NotFoundException { List<ColumnDef> keyColumns = null; // get key columns @@ -440,13 +444,13 @@ public class CqlStorage extends AbstractCassandraStorage { keyColumns = getKeysMeta(client); } - catch(IOException e) + catch(Exception e) { logger.error("Error in retrieving key columns" , e); } // get other columns - List<ColumnDef> columns = getColumnMeta(client); + List<ColumnDef> columns = getColumnMeta(client, false); // combine all columns in a list if (keyColumns != null && columns != null)