Updated Branches: refs/heads/cassandra-1.2 cd7b05ffb -> 8e7d7285c refs/heads/cassandra-2.0 d38ca3252 -> 3a4d6beb4 refs/heads/trunk 5909b5f87 -> 2d6ab6fb5
Make CPRR more robust to failures. Patch by brandonwilliams, reviewed by jbellis for CASSANDRA-6302 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/8e7d7285 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/8e7d7285 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/8e7d7285 Branch: refs/heads/cassandra-1.2 Commit: 8e7d7285cdeac4f2527c933280d595bbddd26935 Parents: cd7b05f Author: Brandon Williams <[email protected]> Authored: Tue Nov 5 15:26:07 2013 -0600 Committer: Brandon Williams <[email protected]> Committed: Tue Nov 5 15:26:07 2013 -0600 ---------------------------------------------------------------------- .../hadoop/cql3/CqlPagingRecordReader.java | 32 ++++++++++++++------ 1 file changed, 23 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/8e7d7285/src/java/org/apache/cassandra/hadoop/cql3/CqlPagingRecordReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlPagingRecordReader.java b/src/java/org/apache/cassandra/hadoop/cql3/CqlPagingRecordReader.java index d1a089f..b6e793c 100644 --- a/src/java/org/apache/cassandra/hadoop/cql3/CqlPagingRecordReader.java +++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlPagingRecordReader.java @@ -135,10 +135,24 @@ public class CqlPagingRecordReader extends RecordReader<Map<String, ByteBuffer>, return; // create connection using thrift - String location = getLocation(); - - int port = ConfigHelper.getInputRpcPort(conf); - client = CqlPagingInputFormat.createAuthenticatedClient(location, port, conf); + String[] locations = split.getLocations(); + Exception lastException = null; + for (String location : locations) + { + int port = ConfigHelper.getInputRpcPort(conf); + try + { + client = CqlPagingInputFormat.createAuthenticatedClient(location, port, conf); + break; + } + catch (Exception e) + { + lastException = e; + logger.warn("Failed to create authenticated client to {}:{}", location , port); + } + } + if (client == null && lastException != null) + throw lastException; // retrieve partition keys and cluster keys from system.schema_columnfamilies table retrieveKeys(); @@ -210,7 +224,7 @@ public class CqlPagingRecordReader extends RecordReader<Map<String, ByteBuffer>, // we don't use endpointsnitch since we are trying to support hadoop nodes that are // not necessarily on Cassandra machines, too. This should be adequate for single-DC clusters, at least. - private String getLocation() + private String[] getLocations() { Collection<InetAddress> localAddresses = FBUtilities.getAllLocalAddresses(); @@ -229,11 +243,11 @@ public class CqlPagingRecordReader extends RecordReader<Map<String, ByteBuffer>, } if (address.equals(locationAddress)) { - return location; + return new String[] { location }; } } } - return split.getLocations()[0]; + return split.getLocations(); } // Because the old Hadoop API wants us to write to the key and value @@ -434,8 +448,8 @@ public class CqlPagingRecordReader extends RecordReader<Map<String, ByteBuffer>, columns = withoutKeyColumns(columns); columns = (clusterKey == null || "".equals(clusterKey)) - ? partitionKey + "," + columns - : partitionKey + "," + clusterKey + "," + columns; + ? partitionKey + (columns != null ? ("," + columns) : "") + : partitionKey + "," + clusterKey + (columns != null ? ("," + columns) : ""); } String whereStr = userDefinedWhereClauses == null ? "" : " AND " + userDefinedWhereClauses;
