fix support for Thrift tables in CqlPagingRecordReader patch by Alex Liu; reviewed by jbellis for CASSANDRA-5752
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/7a394210 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/7a394210 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/7a394210 Branch: refs/heads/trunk Commit: 7a39421074d3d14bfc1a4fa1ab986b4fa614f324 Parents: ba274ad Author: Jonathan Ellis <jbel...@apache.org> Authored: Wed Jul 31 18:15:40 2013 -0500 Committer: Jonathan Ellis <jbel...@apache.org> Committed: Wed Jul 31 18:16:23 2013 -0500 ---------------------------------------------------------------------- CHANGES.txt | 2 + .../hadoop/cql3/CqlPagingRecordReader.java | 41 ++++++++++++++++++-- .../cassandra/hadoop/cql3/CqlRecordWriter.java | 33 ++++++++++++++++ 3 files changed, 72 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/7a394210/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index da1ec20..377b5a1 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -6,6 +6,8 @@ * Allow compacting 2Is via nodetool (CASSANDRA-5670) * Hex-encode non-String keys in OPP (CASSANDRA-5793) * nodetool history logging (CASSANDRA-5823) + * (Hadoop) fix support for Thrift tables in CqlPagingRecordReader + (CASSANDRA-5752) 1.2.8 http://git-wip-us.apache.org/repos/asf/cassandra/blob/7a394210/src/java/org/apache/cassandra/hadoop/cql3/CqlPagingRecordReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlPagingRecordReader.java b/src/java/org/apache/cassandra/hadoop/cql3/CqlPagingRecordReader.java index fc07131..db77c9e 100644 --- a/src/java/org/apache/cassandra/hadoop/cql3/CqlPagingRecordReader.java +++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlPagingRecordReader.java @@ -29,6 +29,9 @@ import com.google.common.collect.Iterables; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.cql3.CFDefinition; +import org.apache.cassandra.cql3.ColumnIdentifier; import org.apache.cassandra.db.marshal.AbstractType; import org.apache.cassandra.db.marshal.CompositeType; import org.apache.cassandra.db.marshal.LongType; @@ -671,6 +674,11 @@ public class CqlPagingRecordReader extends RecordReader<Map<String, ByteBuffer>, for (String key : keys) partitionBoundColumns.add(new BoundColumn(key)); + if (partitionBoundColumns.size() == 0) + { + retrieveKeysForThriftTables(); + return; + } keyString = ByteBufferUtil.string(ByteBuffer.wrap(cqlRow.columns.get(1).getValue())); logger.debug("cluster columns: {}", keyString); @@ -679,10 +687,35 @@ public class CqlPagingRecordReader extends RecordReader<Map<String, ByteBuffer>, for (String key : keys) clusterColumns.add(new BoundColumn(key)); - Column rawKeyValidator = cqlRow.columns.get(2); - String validator = ByteBufferUtil.string(ByteBuffer.wrap(rawKeyValidator.getValue())); - logger.debug("row key validator: {}", validator); - keyValidator = parseType(validator); + parseKeyValidators(ByteBufferUtil.string(ByteBuffer.wrap(cqlRow.columns.get(2).getValue()))); + } + + /** + * retrieve the fake partition keys and cluster keys for classic thrift table + * use CFDefinition to get keys and columns + * */ + private void retrieveKeysForThriftTables() throws Exception + { + KsDef ksDef = client.describe_keyspace(keyspace); + for (CfDef cfDef : ksDef.cf_defs) + { + if (cfDef.name.equalsIgnoreCase(cfName)) + { + CFMetaData cfMeta = CFMetaData.fromThrift(cfDef); + CFDefinition cfDefinition = new CFDefinition(cfMeta); + for (ColumnIdentifier columnIdentifier : cfDefinition.keys.keySet()) + partitionBoundColumns.add(new BoundColumn(columnIdentifier.toString())); + parseKeyValidators(cfDef.key_validation_class); + return; + } + } + } + + /** parse key validators */ + private void parseKeyValidators(String rowKeyValidator) throws IOException + { + logger.debug("row key validator: {} ", rowKeyValidator); + keyValidator = parseType(rowKeyValidator); if (keyValidator instanceof CompositeType) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/7a394210/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java b/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java index 612f86a..76d419e 100644 --- a/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java +++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java @@ -26,6 +26,9 @@ import java.util.concurrent.ConcurrentHashMap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.cql3.CFDefinition; +import org.apache.cassandra.cql3.ColumnIdentifier; import org.apache.cassandra.db.marshal.AbstractType; import org.apache.cassandra.db.marshal.CompositeType; import org.apache.cassandra.db.marshal.LongType; @@ -337,6 +340,11 @@ final class CqlRecordWriter extends AbstractColumnFamilyRecordWriter<Map<String, logger.debug("partition keys: " + keyString); List<String> keys = FBUtilities.fromJsonList(keyString); + if (keys.size() == 0) + { + retrieveKeysForThriftTables(client); + return; + } partitionKeyColumns = new String[keys.size()]; int i = 0; for (String key : keys) @@ -352,6 +360,31 @@ final class CqlRecordWriter extends AbstractColumnFamilyRecordWriter<Map<String, clusterColumns = FBUtilities.fromJsonList(clusterColumnString); } + /** + * retrieve the fake partition keys and cluster keys for classic thrift table + * use CFDefinition to get keys and columns + * */ + private void retrieveKeysForThriftTables(Cassandra.Client client) throws Exception + { + String keyspace = ConfigHelper.getOutputKeyspace(conf); + String cfName = ConfigHelper.getOutputColumnFamily(conf); + KsDef ksDef = client.describe_keyspace(keyspace); + for (CfDef cfDef : ksDef.cf_defs) + { + if (cfDef.name.equalsIgnoreCase(cfName)) + { + CFMetaData cfMeta = CFMetaData.fromThrift(cfDef); + CFDefinition cfDefinition = new CFDefinition(cfMeta); + int i = 0; + for (ColumnIdentifier column : cfDefinition.keys.keySet()) + { + partitionKeyColumns[i] = column.toString(); + i++; + } + return; + } + } + } private AbstractType<?> parseType(String type) throws ConfigurationException { try