Author: jbellis
Date: Mon Sep 27 22:30:20 2010
New Revision: 1001940

URL: http://svn.apache.org/viewvc?rev=1001940&view=rev
Log:
rewrite Hadoop ColumnFamilyRecordWriter to pool connections, retry to multiple 
Cassandra nodes, and smooth impact on the Cassandra cluster by using smaller 
batch sizes.
patch by jbellis and Stu Hood for CASSADRA-1434

Modified:
    cassandra/trunk/CHANGES.txt
    cassandra/trunk/src/java/org/apache/cassandra/client/RingCache.java
    cassandra/trunk/src/java/org/apache/cassandra/dht/AbstractBounds.java
    
cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyOutputFormat.java
    
cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java

Modified: cassandra/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=1001940&r1=1001939&r2=1001940&view=diff
==============================================================================
--- cassandra/trunk/CHANGES.txt (original)
+++ cassandra/trunk/CHANGES.txt Mon Sep 27 22:30:20 2010
@@ -2,6 +2,9 @@ dev
  * create EndpointSnitchInfo and MBean to expose rack and DC (CASSANDRA-1491)
  * added option to contrib/word_count to output results back to Cassandra
    (CASSANDRA-1342)
+ * rewrite Hadoop ColumnFamilyRecordWriter to pool connections, retry to
+   multiple Cassandra nodes, and smooth impact on the Cassandra cluster
+   by using smaller batch sizes (CASSANDRA-1434)
 
 
 0.7-beta2

Modified: cassandra/trunk/src/java/org/apache/cassandra/client/RingCache.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/client/RingCache.java?rev=1001940&r1=1001939&r2=1001940&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/client/RingCache.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/client/RingCache.java Mon Sep 
27 22:30:20 2010
@@ -40,11 +40,12 @@ import org.apache.thrift.transport.TFram
 import org.apache.thrift.transport.TSocket;
 
 import com.google.common.collect.Multimap;
-import com.google.common.collect.HashMultimap;
+import com.google.common.collect.ArrayListMultimap;
 
 /**
- *  A class for caching the ring map at the client. For usage example, see
- *  test/unit/org.apache.cassandra.client.TestRingCache.java.
+ * A class for caching the ring map at the client. For usage example, see
+ * test/unit/org.apache.cassandra.client.TestRingCache.java.
+ * TODO: doing a naive linear search of the token map
  */
 public class RingCache
 {
@@ -55,7 +56,6 @@ public class RingCache
     private final IPartitioner partitioner_;
     private final String keyspace;
 
-    private Set<Range> rangeSet;
     private Multimap<Range, InetAddress> rangeMap;
 
     public RingCache(String keyspace, IPartitioner partitioner, String 
addresses, int port) throws IOException
@@ -80,26 +80,25 @@ public class RingCache
                 socket.open();
 
                 List<TokenRange> ring = client.describe_ring(keyspace);
-                rangeMap = HashMultimap.create();
+                rangeMap = ArrayListMultimap.create();
                 
                 for (TokenRange range : ring)
                 {
                     Token<?> left = 
partitioner_.getTokenFactory().fromString(range.start_token);
                     Token<?> right = 
partitioner_.getTokenFactory().fromString(range.end_token);
-                    String host = range.endpoints.get(0);
-                    
-                    try
+                    Range r = new Range(left, right, partitioner_);
+                    for (String host : range.endpoints)
                     {
-                        rangeMap.put(new Range(left, right, partitioner_), 
InetAddress.getByName(host));
-                    }
-                    catch (UnknownHostException e)
-                    {
-                        throw new AssertionError(e); // host strings are IPs
+                        try
+                        {
+                            rangeMap.put(r, InetAddress.getByName(host));
+                        }
+                        catch (UnknownHostException e)
+                        {
+                            throw new AssertionError(e); // host strings are 
IPs
+                        }
                     }
                 }
-
-                rangeSet = new HashSet(rangeMap.keySet());
-
                 break;
             }
             catch (InvalidRequestException e)
@@ -114,16 +113,25 @@ public class RingCache
         }
     }
 
-    public Collection<InetAddress> getEndpoint(byte[] key)
+    /** ListMultimap promises to return a List for get(K) */
+    @SuppressWarnings(value="unchecked")
+    public List<InetAddress> getEndpoint(Range range)
     {
-        if (rangeSet == null)
-            throw new RuntimeException("Must refresh endpoints before looking 
up a key.");
+        return (List<InetAddress>) rangeMap.get(range);
+    }
 
+    public List<InetAddress> getEndpoint(byte[] key)
+    {
+        return getEndpoint(getRange(key));
+    }
+
+    public Range getRange(byte[] key)
+    {
         // TODO: naive linear search of the token map
         Token<?> t = partitioner_.getToken(key);
-        for (Range range : rangeSet)
+        for (Range range : rangeMap.keySet())
             if (range.contains(t))
-                return rangeMap.get(range);
+                return range;
 
         throw new RuntimeException("Invalid token information returned by 
describe_ring: " + rangeMap);
     }

Modified: cassandra/trunk/src/java/org/apache/cassandra/dht/AbstractBounds.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/dht/AbstractBounds.java?rev=1001940&r1=1001939&r2=1001940&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/dht/AbstractBounds.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/dht/AbstractBounds.java Mon 
Sep 27 22:30:20 2010
@@ -59,7 +59,7 @@ public abstract class AbstractBounds imp
     @Override
     public int hashCode()
     {
-        return toString().hashCode();
+        return 31 * left.hashCode() + right.hashCode();
     }
 
     @Override

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyOutputFormat.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyOutputFormat.java?rev=1001940&r1=1001939&r2=1001940&view=diff
==============================================================================
--- 
cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyOutputFormat.java
 (original)
+++ 
cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyOutputFormat.java
 Mon Sep 27 22:30:20 2010
@@ -74,7 +74,8 @@ public class ColumnFamilyOutputFormat ex
     private static final Logger logger = 
LoggerFactory.getLogger(ColumnFamilyOutputFormat.class);
     
     public static final String BATCH_THRESHOLD = 
"mapreduce.output.columnfamilyoutputformat.batch.threshold";
-    
+    public static final String QUEUE_SIZE = 
"mapreduce.output.columnfamilyoutputformat.queue.size";
+
     /**
      * Check for validity of the output-specification for the job.
      * 

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java?rev=1001940&r1=1001939&r2=1001940&view=diff
==============================================================================
--- 
cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java
 (original)
+++ 
cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java
 Mon Sep 27 22:30:20 2010
@@ -20,39 +20,28 @@ package org.apache.cassandra.hadoop;
  * under the License.
  * 
  */
+
 import java.io.IOException;
 import java.net.InetAddress;
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.TreeMap;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
+import java.util.*;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.cassandra.client.RingCache;
-import static org.apache.cassandra.io.SerDeUtils.copy;
-import org.apache.cassandra.thrift.Cassandra;
-import org.apache.cassandra.thrift.Column;
-import org.apache.cassandra.thrift.ColumnOrSuperColumn;
-import org.apache.cassandra.thrift.ConsistencyLevel;
-import org.apache.cassandra.thrift.Deletion;
-import org.apache.cassandra.thrift.Mutation;
-import org.apache.cassandra.thrift.SlicePredicate;
-import org.apache.cassandra.thrift.SliceRange;
-import org.apache.cassandra.thrift.SuperColumn;
-import org.apache.cassandra.utils.FBUtilities;
-
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.thrift.*;
+import org.apache.cassandra.utils.Pair;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.OutputFormat;
 import org.apache.hadoop.mapreduce.RecordWriter;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.thrift.TException;
 import org.apache.thrift.transport.TSocket;
 
+import static org.apache.cassandra.io.SerDeUtils.copy;
+
 /**
  * The <code>ColumnFamilyRecordWriter</code> maps the output &lt;key, value&gt;
  * pairs to a Cassandra column family. In particular, it applies all mutations
@@ -60,12 +49,6 @@ import org.apache.thrift.transport.TSock
  * endpoint.
  * 
  * <p>
- * Note that, given that round trips to the server are fairly expensive, it
- * merely batches the mutations in-memory and periodically sends the batched
- * mutations to the server in one shot.
- * </p>
- * 
- * <p>
  * Furthermore, this writer groups the mutations by the endpoint responsible 
for
  * the rows being affected. This allows the mutations to be executed in 
parallel,
  * directly to a responsible endpoint.
@@ -76,14 +59,12 @@ import org.apache.thrift.transport.TSock
  * @see OutputFormat
  * 
  */
-final class ColumnFamilyRecordWriter extends 
RecordWriter<ByteBuffer,List<org.apache.cassandra.avro.Mutation>> implements 
org.apache.hadoop.mapred.RecordWriter<ByteBuffer,List<org.apache.cassandra.avro.Mutation>>
+final class ColumnFamilyRecordWriter extends 
RecordWriter<ByteBuffer,List<org.apache.cassandra.avro.Mutation>>
+implements 
org.apache.hadoop.mapred.RecordWriter<ByteBuffer,List<org.apache.cassandra.avro.Mutation>>
 {
     // The configuration this writer is associated with.
     private final Configuration conf;
     
-    // The batched set of mutations grouped by endpoints.
-    private Map<InetAddress,Map<byte[],Map<String,List<Mutation>>>> 
mutationsByEndpoint;
-    
     // The ring cache that describes the token ranges each node in the ring is
     // responsible for. This is what allows us to group the mutations by
     // the endpoints they should be targeted at. The targeted endpoint
@@ -91,11 +72,13 @@ final class ColumnFamilyRecordWriter ext
     // acts as the primary replica for the rows being affected by the 
mutations.
     private final RingCache ringCache;
     
-    // The number of mutations currently held in the mutations cache.
-    private long batchSize = 0L;
-    // The maximum number of mutations to hold in the mutations cache.
+    // The number of mutations to buffer per endpoint
+    private final int queueSize;
+
+    // handles for clients for each range running in the threadpool
+    private final Map<Range,RangeClient> clients;
     private final long batchThreshold;
-    
+
     /**
      * Upon construction, obtain the map that this writer will use to collect
      * mutations, and the ring cache for the given keyspace.
@@ -111,32 +94,16 @@ final class ColumnFamilyRecordWriter ext
     ColumnFamilyRecordWriter(Configuration conf) throws IOException
     {
         this.conf = conf;
-        this.mutationsByEndpoint = new 
HashMap<InetAddress,Map<byte[],Map<String,List<Mutation>>>>();
         this.ringCache = new RingCache(ConfigHelper.getOutputKeyspace(conf),
                                        ConfigHelper.getPartitioner(conf),
                                        ConfigHelper.getInitialAddress(conf),
                                        ConfigHelper.getRpcPort(conf));
-        this.batchThreshold = 
conf.getLong(ColumnFamilyOutputFormat.BATCH_THRESHOLD, Long.MAX_VALUE);
-    }
-
-    /**
-     * Return the endpoint responsible for the given key. The selected endpoint
-     * one whose token range contains the given key.
-     * 
-     * @param key
-     *            the key being mutated
-     * @return the endpoint responsible for that key
-     */
-    protected InetAddress getEndpoint(byte[] key)
-    {
-        return ringCache.getEndpoint(key).iterator().next();
+        this.queueSize = conf.getInt(ColumnFamilyOutputFormat.QUEUE_SIZE, 32 * 
Runtime.getRuntime().availableProcessors());
+        this.clients = new HashMap<Range,RangeClient>();
+        batchThreshold = 
conf.getLong(ColumnFamilyOutputFormat.BATCH_THRESHOLD, 32);
     }
 
     /**
-     * Writes a key/value pair, not to the Cassandra server, but into a
-     * in-memory cache (viz. {...@link #mutationsByEndpoint}.
-     * 
-     * <p>
      * If the key is to be associated with a valid value, a mutation is created
      * for it with the given column family and columns. In the event the value
      * in the column is missing (i.e., null), then it is marked for
@@ -144,41 +111,30 @@ final class ColumnFamilyRecordWriter ext
      * (i.e., null), then the entire key is marked for {...@link Deletion}.
      * </p>
      * 
-     * @param key
+     * @param keybuff
      *            the key to write.
      * @param value
      *            the value to write.
      * @throws IOException
      */
     @Override
-    public synchronized void write(ByteBuffer keybuff, 
List<org.apache.cassandra.avro.Mutation> value) throws IOException
+    public void write(ByteBuffer keybuff, 
List<org.apache.cassandra.avro.Mutation> value) throws IOException
     {
-        maybeFlush();
         byte[] key = copy(keybuff);
-        InetAddress endpoint = getEndpoint(key);
-        Map<byte[], Map<String, List<Mutation>>> mutationsByKey = 
mutationsByEndpoint.get(endpoint);
-        if (mutationsByKey == null)
-        {
-            mutationsByKey = new TreeMap<byte[], Map<String, 
List<Mutation>>>(FBUtilities.byteArrayComparator);
-            mutationsByEndpoint.put(endpoint, mutationsByKey);
-        }
+        Range range = ringCache.getRange(key);
 
-        Map<String, List<Mutation>> cfMutation = mutationsByKey.get(key);
-        if (cfMutation == null)
+        // get the client for the given range, or create a new one
+        RangeClient client = clients.get(range);
+        if (client == null)
         {
-            cfMutation = new HashMap<String, List<Mutation>>();
-            mutationsByKey.put(key, cfMutation);
-        }
-
-        List<Mutation> mutationList = 
cfMutation.get(ConfigHelper.getOutputColumnFamily(conf));
-        if (mutationList == null)
-        {
-            mutationList = new ArrayList<Mutation>();
-            cfMutation.put(ConfigHelper.getOutputColumnFamily(conf), 
mutationList);
+            // haven't seen keys for this range: create new client
+            client = new RangeClient(ringCache.getEndpoint(range));
+            client.start();
+            clients.put(range, client);
         }
 
         for (org.apache.cassandra.avro.Mutation amut : value)
-            mutationList.add(avroToThrift(amut));
+            client.put(new Pair<byte[],Mutation>(key, avroToThrift(amut)));
     }
 
     /**
@@ -200,7 +156,7 @@ final class ColumnFamilyRecordWriter ext
             {
                 // super column
                 byte[] scolname = copy(acosc.super_column.name);
-                List<Column> scolcols = new 
ArrayList<Column>((int)acosc.super_column.columns.size());
+                List<Column> scolcols = new 
ArrayList<Column>(acosc.super_column.columns.size());
                 for (org.apache.cassandra.avro.Column acol : 
acosc.super_column.columns)
                     scolcols.add(avroToThrift(acol));
                 cosc.setSuper_column(new SuperColumn(scolname, scolcols));
@@ -218,10 +174,10 @@ final class ColumnFamilyRecordWriter ext
             else if (apred.column_names != null)
             {
                 // column names
-                List<byte[]> colnames = new 
ArrayList<byte[]>((int)apred.column_names.size());
-                for (ByteBuffer acolname : apred.column_names)
-                    colnames.add(copy(acolname));
-                deletion.setPredicate(new 
SlicePredicate().setColumn_names(colnames));
+                List<byte[]> names = new 
ArrayList<byte[]>(apred.column_names.size());
+                for (ByteBuffer name : apred.column_names)
+                    names.add(copy(name));
+                deletion.setPredicate(new 
SlicePredicate().setColumn_names(names));
             }
             else
             {
@@ -252,58 +208,70 @@ final class ColumnFamilyRecordWriter ext
     @Override
     public void close(TaskAttemptContext context) throws IOException, 
InterruptedException
     {
-        flush();
+        close((org.apache.hadoop.mapred.Reporter)null);
     }
 
     /** Fills the deprecated RecordWriter interface for streaming. */
     @Deprecated
     public void close(org.apache.hadoop.mapred.Reporter reporter) throws 
IOException
     {
-        flush();
-    }
-
-    /**
-     * Flush the mutations cache, iff more mutations have been cached than
-     * {...@link #batchThreshold}.
-     *
-     * @throws IOException
-     */
-    private void maybeFlush() throws IOException
-    {
-        if (++batchSize > batchThreshold)
+        for (RangeClient client : clients.values())
+            client.stopNicely();
+        try
+        {
+            for (RangeClient client : clients.values())
+            {
+                client.join();
+                client.close();
+            }
+        }
+        catch (InterruptedException e)
         {
-            flush();
-            batchSize = 0L;
+            throw new AssertionError(e);
         }
     }
 
     /**
-     * Send the batched mutations over to Cassandra, and then clear the
-     * mutations cache.
-     *
-     * @throws IOException
+     * A client that runs in a threadpool and connects to the list of 
endpoints for a particular
+     * range. Mutations for keys in that range are sent to this client via a 
queue.
      */
-    protected synchronized void flush() throws IOException
+    public class RangeClient extends Thread
     {
-        ExecutorService executor = Executors.newCachedThreadPool();
+        // The list of endpoints for this range
+        private final List<InetAddress> endpoints;
+        private final String columnFamily = 
ConfigHelper.getOutputColumnFamily(conf);
+        // A bounded queue of incoming mutations for this range
+        private final BlockingQueue<Pair<byte[], Mutation>> queue = new 
ArrayBlockingQueue<Pair<byte[],Mutation>>(queueSize);
 
-        try
+        private volatile boolean run = true;
+        private volatile IOException lastException;
+
+        private Cassandra.Client thriftClient;
+        private TSocket thriftSocket;
+
+        /**
+         * Constructs an {...@link RangeClient} for the given endpoints.
+         * @param endpoints the possible endpoints to execute the mutations on
+         */
+        public RangeClient(List<InetAddress> endpoints)
         {
-            List<Future<?>> mutationFutures = new ArrayList<Future<?>>();
-            for (Map.Entry<InetAddress, Map<byte[], Map<String, 
List<Mutation>>>> entry : mutationsByEndpoint.entrySet())
-            {
-                mutationFutures.add(executor.submit(new EndpointCallable(conf, 
entry.getKey(), entry.getValue())));
-            }
-            // wait until we have all the results back
-            for (Future<?> mutationFuture : mutationFutures)
+            super("client-" + endpoints);
+            this.endpoints = endpoints;
+         }
+
+        /**
+         * enqueues the given value to Cassandra
+         */
+        public void put(Pair<byte[],Mutation> value) throws IOException
+        {
+            while (true)
             {
+                if (lastException != null)
+                    throw lastException;
                 try
                 {
-                    mutationFuture.get();
-                }
-                catch (ExecutionException e)
-                {
-                    throw new IOException("Could not perform endpoint 
mutations", e.getCause());
+                    if (queue.offer(value, 100, TimeUnit.MILLISECONDS))
+                        break;
                 }
                 catch (InterruptedException e)
                 {
@@ -311,67 +279,98 @@ final class ColumnFamilyRecordWriter ext
                 }
             }
         }
-        finally
+
+        public void stopNicely() throws IOException
         {
-            executor.shutdownNow();
-            mutationsByEndpoint.clear();
+            if (lastException != null)
+                throw lastException;
+            run = false;
+            interrupt();
         }
 
-    }
-
-    /**
-     * The <code>EndpointCallable</code> facilitates an asynchronous call to a
-     * specific node in the ring that commands it to perform a batched set of
-     * mutations. Needless to say, the given mutations are targeted at rows 
that
-     * the selected endpoint is responsible for (i.e., is the primary replica
-     * for).
-     */
-    public class EndpointCallable implements Callable<Void>
-    {
-        // The task attempt context associated with this callable.
-        private Configuration conf;
-        // The endpoint of the primary replica for the rows being mutated
-        private InetAddress endpoint;
-        // The mutations to be performed in the node referenced by {...@link
-        // #endpoint}.
-        private Map<byte[], Map<String, List<Mutation>>> mutations;
-
-        /**
-         * Constructs an {...@link EndpointCallable} for the given endpoint 
and set
-         * of mutations.
-         *
-         * @param conf      job configuration
-         * @param endpoint  the endpoint wherein to execute the mutations
-         * @param mutations the mutation map expected by
-         *                  {...@link Cassandra.Client#batch_mutate(Map, 
ConsistencyLevel)}
-         */
-        public EndpointCallable(Configuration conf, InetAddress endpoint, 
Map<byte[], Map<String, List<Mutation>>> mutations)
+        public void close()
         {
-            this.conf = conf;
-            this.endpoint = endpoint;
-            this.mutations = mutations;
+            if (thriftSocket != null)
+            {
+                thriftSocket.close();
+                thriftSocket = null;
+                thriftClient = null;
+            }
         }
 
         /**
-         * Perform the call to
-         * {...@link Cassandra.Client#batch_mutate(Map, ConsistencyLevel)}.
+         * Loops collecting mutations from the queue and sending to Cassandra
          */
-        public Void call() throws Exception
+        public void run()
         {
-            TSocket socket = null;
-            try
+            outer:
+            while (run || !queue.isEmpty())
             {
-                socket = new TSocket(endpoint.getHostName(), 
ConfigHelper.getRpcPort(conf));
-                Cassandra.Client client = 
ColumnFamilyOutputFormat.createAuthenticatedClient(socket, conf);
-                client.batch_mutate(mutations, ConsistencyLevel.ONE);
-                return null;
-            }
-            finally
-            {
-                if (socket != null)
-                    socket.close();
+                Pair<byte[], Mutation> mutation;
+                try
+                {
+                    mutation = queue.take();
+                }
+                catch (InterruptedException e)
+                {
+                    // re-check loop condition after interrupt
+                    continue;
+                }
+
+                Map<byte[], Map<String, List<Mutation>>> batch = new 
HashMap<byte[], Map<String, List<Mutation>>>();
+                while (batch.size() < batchThreshold)
+                {
+                    Map<String, List<Mutation>> subBatch = 
Collections.singletonMap(columnFamily, Arrays.asList(mutation.right));
+                    batch.put(mutation.left, subBatch);
+                    if ((mutation = queue.poll()) == null)
+                        break;
+                }
+
+                Iterator<InetAddress> iter = endpoints.iterator();
+                while (true)
+                {
+                    // send the mutation to the last-used endpoint.  first 
time through, this will NPE harmlessly.
+                    try
+                    {
+                        thriftClient.batch_mutate(batch, ConsistencyLevel.ONE);
+                        break;
+                    }
+                    catch (Exception e)
+                    {
+                        close();
+                        if (!iter.hasNext())
+                        {
+                            lastException = new IOException(e);
+                            break outer;
+                        }
+                    }
+
+                    // attempt to connect to a different endpoint
+                    try
+                    {
+                        InetAddress address = iter.next();
+                        thriftSocket = new TSocket(address.getHostName(), 
ConfigHelper.getRpcPort(conf));
+                        thriftClient = 
ColumnFamilyOutputFormat.createAuthenticatedClient(thriftSocket, conf);
+                    }
+                    catch (Exception e)
+                    {
+                        close();
+                        // TException means something unexpected went wrong to 
that endpoint, so
+                        // we should try again to another.  Other exceptions 
(auth or invalid request) are fatal.
+                        if ((!(e instanceof TException)) || !iter.hasNext())
+                        {
+                            lastException = new IOException(e);
+                            break outer;
+                        }
+                    }
+                }
             }
         }
-    }
 
+        @Override
+        public String toString()
+        {
+            return "#<Client for " + endpoints.toString() + ">";
+        }
+    }
 }


Reply via email to