Repository: cassandra Updated Branches: refs/heads/cassandra-3.0 039ff1f88 -> 8a0615b4a
Try next replica if not possible to connect to primary replica on ColumnFamilyRecordReader patch by pauloricardomg; reviewed by pkolaczk for CASSANDRA-2388 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/882adf0a Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/882adf0a Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/882adf0a Branch: refs/heads/cassandra-3.0 Commit: 882adf0ae638c69928e14ed7e34de7364ed65ba1 Parents: 851aed7 Author: Paulo Motta <pauloricard...@gmail.com> Authored: Mon Aug 24 17:21:57 2015 -0300 Committer: Sylvain Lebresne <sylv...@datastax.com> Committed: Fri Nov 20 13:59:46 2015 +0100 ---------------------------------------------------------------------- CHANGES.txt | 2 ++ .../hadoop/ColumnFamilyRecordReader.java | 35 +++++++++++--------- 2 files changed, 21 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/882adf0a/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 66423c7..9e2869e 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,6 @@ 2.1.12 + * Try next replica if not possible to connect to primary replica on + ColumnFamilyRecordReader (CASSANDRA-2388) * Limit window size in DTCS (CASSANDRA-10280) * sstableloader does not use MAX_HEAP_SIZE env parameter (CASSANDRA-10188) * (cqlsh) Improve COPY TO performance and error handling (CASSANDRA-9304) http://git-wip-us.apache.org/repos/asf/cassandra/blob/882adf0a/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java b/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java index 0b52904..dc44a43 100644 --- a/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java +++ b/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java @@ -151,22 +151,25 @@ public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap if (batchSize < 2) throw new IllegalArgumentException("Minimum batchSize is 2. Suggested batchSize is 100 or more"); - try - { - if (client != null) - return; - - // create connection using thrift - String location = getLocation(); + String[] locations = getLocations(); + int port = ConfigHelper.getInputRpcPort(conf); - int port = ConfigHelper.getInputRpcPort(conf); - client = ColumnFamilyInputFormat.createAuthenticatedClient(location, port, conf); - - } - catch (Exception e) + Exception lastException = null; + for (String location : locations) { - throw new RuntimeException(e); + try + { + client = ColumnFamilyInputFormat.createAuthenticatedClient(location, port, conf); + break; + } + catch (Exception e) + { + lastException = e; + logger.warn("Failed to create authenticated client to {}:{}", location , port); + } } + if (client == null && lastException != null) + throw new RuntimeException(lastException); iter = widerows ? new WideRowIterator() : new StaticRowIterator(); logger.debug("created {}", iter); @@ -186,7 +189,7 @@ public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap // we don't use endpointsnitch since we are trying to support hadoop nodes that are // not necessarily on Cassandra machines, too. This should be adequate for single-DC clusters, at least. - private String getLocation() + private String[] getLocations() { Collection<InetAddress> localAddresses = FBUtilities.getAllLocalAddresses(); @@ -205,11 +208,11 @@ public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap } if (address.equals(locationAddress)) { - return location; + return new String[]{location}; } } } - return split.getLocations()[0]; + return split.getLocations(); } private abstract class RowIterator extends AbstractIterator<Pair<ByteBuffer, SortedMap<ByteBuffer, Cell>>>