Updated Branches: refs/heads/cassandra-1.2 fd6d19bd5 -> a00477976 refs/heads/trunk 6da1cbbce -> 2e9f51d16
cleanup Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/a2436f07 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/a2436f07 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/a2436f07 Branch: refs/heads/trunk Commit: a2436f07a45ba81cd32b0f915a32d0f83f19538e Parents: b973a33 Author: Jonathan Ellis <jbel...@apache.org> Authored: Thu Jun 6 16:47:03 2013 -0500 Committer: Jonathan Ellis <jbel...@apache.org> Committed: Thu Jun 6 16:47:03 2013 -0500 ---------------------------------------------------------------------- .../hadoop/cql3/ColumnFamilyRecordWriter.java | 43 +++++++-------- 1 files changed, 20 insertions(+), 23 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/a2436f07/src/java/org/apache/cassandra/hadoop/cql3/ColumnFamilyRecordWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/cql3/ColumnFamilyRecordWriter.java b/src/java/org/apache/cassandra/hadoop/cql3/ColumnFamilyRecordWriter.java index 3939e0b..c32d9ef 100644 --- a/src/java/org/apache/cassandra/hadoop/cql3/ColumnFamilyRecordWriter.java +++ b/src/java/org/apache/cassandra/hadoop/cql3/ColumnFamilyRecordWriter.java @@ -152,16 +152,16 @@ final class ColumnFamilyRecordWriter extends AbstractColumnFamilyRecordWriter<Ma * (i.e., null), then the entire key is marked for {@link Deletion}. * </p> * - * @param keybuff + * @param keyColumns * the key to write. * @param values * the values to write. * @throws IOException */ @Override - public void write(Map<String, ByteBuffer> keys, List<ByteBuffer> values) throws IOException + public void write(Map<String, ByteBuffer> keyColumns, List<ByteBuffer> values) throws IOException { - ByteBuffer rowKey = getRowKey(keys); + ByteBuffer rowKey = getRowKey(keyColumns); Range<Token> range = ringCache.getRange(rowKey); // get the client for the given range, or create a new one @@ -180,7 +180,7 @@ final class ColumnFamilyRecordWriter extends AbstractColumnFamilyRecordWriter<Ma /** * A client that runs in a threadpool and connects to the list of endpoints for a particular - * range. Binded variable values for keys in that range are sent to this client via a queue. + * range. Bound variables for keys in that range are sent to this client via a queue. */ public class RangeClient extends AbstractRangeClient<List<ByteBuffer>> { @@ -201,10 +201,10 @@ final class ColumnFamilyRecordWriter extends AbstractColumnFamilyRecordWriter<Ma outer: while (run || !queue.isEmpty()) { - Pair<ByteBuffer, List<ByteBuffer>> bindVariables; + Pair<ByteBuffer, List<ByteBuffer>> item; try { - bindVariables = queue.take(); + item = queue.take(); } catch (InterruptedException e) { @@ -220,15 +220,16 @@ final class ColumnFamilyRecordWriter extends AbstractColumnFamilyRecordWriter<Ma { int i = 0; int itemId = preparedStatement(client); - while (bindVariables != null) + while (item != null) { - client.execute_prepared_cql3_query(itemId, bindVariables.right, ConsistencyLevel.ONE); + List<ByteBuffer> bindVariables = item.right; + client.execute_prepared_cql3_query(itemId, bindVariables, ConsistencyLevel.ONE); i++; if (i >= batchThreshold) break; - bindVariables = queue.poll(); + item = queue.poll(); } break; @@ -293,7 +294,7 @@ final class ColumnFamilyRecordWriter extends AbstractColumnFamilyRecordWriter<Ma } } - private ByteBuffer getRowKey(Map<String, ByteBuffer> keysMap) + private ByteBuffer getRowKey(Map<String, ByteBuffer> keyColumns) { //current row key ByteBuffer rowKey; @@ -301,13 +302,13 @@ final class ColumnFamilyRecordWriter extends AbstractColumnFamilyRecordWriter<Ma { ByteBuffer[] keys = new ByteBuffer[partitionkeys.length]; for (int i = 0; i< keys.length; i++) - keys[i] = keysMap.get(partitionkeys[i]); + keys[i] = keyColumns.get(partitionkeys[i]); rowKey = ((CompositeType) keyValidator).build(keys); } else { - rowKey = keysMap.get(partitionkeys[0]); + rowKey = keyColumns.get(partitionkeys[0]); } return rowKey; } @@ -331,11 +332,11 @@ final class ColumnFamilyRecordWriter extends AbstractColumnFamilyRecordWriter<Ma Column rawPartitionKeys = result.rows.get(0).columns.get(1); String keyString = ByteBufferUtil.string(ByteBuffer.wrap(rawPartitionKeys.getValue())); logger.debug("partition keys: " + keyString); - + List<String> keys = FBUtilities.fromJsonList(keyString); - partitionkeys = new String [keys.size()]; - int i=0; - for (String key: keys) + partitionkeys = new String[keys.size()]; + int i = 0; + for (String key : keys) { partitionkeys[i] = key; i++; @@ -372,13 +373,9 @@ final class ColumnFamilyRecordWriter extends AbstractColumnFamilyRecordWriter<Ma } finally { - if (client != null) - { - TTransport transport = client.getOutputProtocol().getTransport(); - if (transport.isOpen()) - transport.close(); - client = null; - } + TTransport transport = client.getOutputProtocol().getTransport(); + if (transport.isOpen()) + transport.close(); } throw new IOException("There are no endpoints"); }