Updated Branches: refs/heads/cassandra-1.2 5d08a2518 -> 308a3e41e refs/heads/cassandra-2.0 643dc0068 -> 8845ad609 refs/heads/trunk d5f7ba5c0 -> fa2453558
Fix NPE in pig with tables created from thrift Patch by Alex Liu, reviewed by brandonwilliams for CASSANDRA-6072 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/308a3e41 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/308a3e41 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/308a3e41 Branch: refs/heads/cassandra-1.2 Commit: 308a3e41e7a70be90ab3335601f3a67aa93d8927 Parents: 5d08a25 Author: Brandon Williams <brandonwilli...@apache.org> Authored: Wed Sep 25 16:07:27 2013 -0500 Committer: Brandon Williams <brandonwilli...@apache.org> Committed: Wed Sep 25 16:08:27 2013 -0500 ---------------------------------------------------------------------- .../hadoop/pig/AbstractCassandraStorage.java | 97 ++++++++------------ 1 file changed, 36 insertions(+), 61 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/308a3e41/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java b/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java index 68e18c8..74702e3 100644 --- a/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java +++ b/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java @@ -578,8 +578,7 @@ public abstract class AbstractCassandraStorage extends LoadFunc implements Store " subcomparator," + " default_validator," + " key_validator," + - " key_aliases," + - " key_alias " + + " key_aliases " + "FROM system.schema_columnfamilies " + "WHERE keyspace_name = '%s' " + " AND columnfamily_name = '%s' "; @@ -608,32 +607,22 @@ public abstract class AbstractCassandraStorage extends LoadFunc implements Store cfDef.subcomparator_type = ByteBufferUtil.string(subComparator); cfDef.default_validation_class = ByteBufferUtil.string(cqlRow.columns.get(3).value); cfDef.key_validation_class = ByteBufferUtil.string(cqlRow.columns.get(4).value); - List<String> keys = null; - if (cqlRow.columns.get(5).value != null) + String keyAliases = ByteBufferUtil.string(cqlRow.columns.get(5).value); + List<String> keys = FBUtilities.fromJsonList(keyAliases); + // classis thrift tables + if (keys.size() == 0) { - String keyAliases = ByteBufferUtil.string(cqlRow.columns.get(5).value); - keys = FBUtilities.fromJsonList(keyAliases); - // classis thrift tables - if (keys.size() == 0 && cqlRow.columns.get(6).value == null) + CFDefinition cfDefinition = getCfDefinition(keyspace, column_family, client); + for (ColumnIdentifier column : cfDefinition.keys.keySet()) { - CFDefinition cfDefinition = getCfDefinition(keyspace, column_family, client); - for (ColumnIdentifier column : cfDefinition.keys.keySet()) - { - String key = column.toString(); - String type = cfDefinition.keys.get(column).type.toString(); - logger.debug("name: {}, type: {} ", key, type); - keys.add(key); - } + String key = column.toString(); + String type = cfDefinition.keys.get(column).type.toString(); + logger.debug("name: {}, type: {} ", key, type); + keys.add(key); } - else - cql3Table = true; } else - { - String keyAlias = ByteBufferUtil.string(cqlRow.columns.get(6).value); - keys = new ArrayList<String>(1); - keys.add(keyAlias); - } + cql3Table = true; } cfDef.column_metadata = getColumnMetadata(client, cql3Table); return cfDef; @@ -732,8 +721,7 @@ public abstract class AbstractCassandraStorage extends LoadFunc implements Store " comparator, " + " keyspace_name, " + " value_alias, " + - " default_validator," + - " key_alias " + + " default_validator " + "FROM system.schema_columnfamilies " + "WHERE keyspace_name = '%s'" + " AND columnfamily_name = '%s' "; @@ -746,60 +734,47 @@ public abstract class AbstractCassandraStorage extends LoadFunc implements Store if (result == null || result.rows == null || result.rows.isEmpty()) return null; - List<CqlRow> rows = result.rows; - Iterator<CqlRow> iteraRow = rows.iterator(); + Iterator<CqlRow> iteraRow = result.rows.iterator(); List<ColumnDef> keys = new ArrayList<ColumnDef>(); if (iteraRow.hasNext()) { CqlRow cqlRow = iteraRow.next(); String name = ByteBufferUtil.string(cqlRow.columns.get(4).value); logger.debug("Found ksDef name: {}", name); - String keyString; - List<String> keyNames; - Iterator<String> iterator; - if (cqlRow.columns.get(0).getValue() == null) + String keyString = ByteBufferUtil.string(ByteBuffer.wrap(cqlRow.columns.get(0).getValue())); + + logger.debug("partition keys: {}", keyString); + List<String> keyNames = FBUtilities.fromJsonList(keyString); + + Iterator<String> iterator = keyNames.iterator(); + while (iterator.hasNext()) { ColumnDef cDef = new ColumnDef(); - cDef.name = ByteBuffer.wrap(result.rows.get(0).columns.get(7).getValue()); + cDef.name = ByteBufferUtil.bytes(iterator.next()); keys.add(cDef); } - else + // classic thrift tables + if (keys.size() == 0) { - keyString = ByteBufferUtil.string(ByteBuffer.wrap(cqlRow.columns.get(0).getValue())); - - logger.debug("partition keys: {}", keyString); - keyNames = FBUtilities.fromJsonList(keyString); - - iterator = keyNames.iterator(); - while (iterator.hasNext()) + CFDefinition cfDefinition = getCfDefinition(keyspace, column_family, client); + for (ColumnIdentifier column : cfDefinition.keys.keySet()) { + String key = column.toString(); + logger.debug("name: {} ", key); ColumnDef cDef = new ColumnDef(); - cDef.name = ByteBufferUtil.bytes(iterator.next()); + cDef.name = ByteBufferUtil.bytes(key); keys.add(cDef); } - // classic thrift tables - if (keys.size() == 0) + for (ColumnIdentifier column : cfDefinition.columns.keySet()) { - CFDefinition cfDefinition = getCfDefinition(keyspace, column_family, client); - for (ColumnIdentifier column : cfDefinition.keys.keySet()) - { - String key = column.toString(); - logger.debug("name: {} ", key); - ColumnDef cDef = new ColumnDef(); - cDef.name = ByteBufferUtil.bytes(key); - keys.add(cDef); - } - for (ColumnIdentifier column : cfDefinition.columns.keySet()) - { - String key = column.toString(); - logger.debug("name: {} ", key); - ColumnDef cDef = new ColumnDef(); - cDef.name = ByteBufferUtil.bytes(key); - keys.add(cDef); - } + String key = column.toString(); + logger.debug("name: {} ", key); + ColumnDef cDef = new ColumnDef(); + cDef.name = ByteBufferUtil.bytes(key); + keys.add(cDef); } - } + keyString = ByteBufferUtil.string(ByteBuffer.wrap(cqlRow.columns.get(1).getValue())); logger.debug("cluster keys: {}", keyString);