Merge branch 'cassandra-2.2' into cassandra-3.0

Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/c7724e6b
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/c7724e6b
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/c7724e6b

Branch: refs/heads/cassandra-3.1
Commit: c7724e6b356ed0f3cee1236db52ec2ee425f2495
Parents: 7b430ee f8fc031
Author: Robert Stupp <sn...@snazy.de>
Authored: Fri Nov 27 11:41:21 2015 +0100
Committer: Robert Stupp <sn...@snazy.de>
Committed: Fri Nov 27 11:41:21 2015 +0100

----------------------------------------------------------------------
 .../cassandra/hadoop/cql3/CqlRecordWriter.java  | 160 +++++++++++--------
 1 file changed, 92 insertions(+), 68 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/c7724e6b/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java
index 23beba3,84102a5..96815ef
--- a/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java
+++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java
@@@ -108,14 -109,14 +108,14 @@@ class CqlRecordWriter extends RecordWri
      CqlRecordWriter(Configuration conf)
      {
          this.conf = conf;
 -        this.queueSize = conf.getInt(ColumnFamilyOutputFormat.QUEUE_SIZE, 32 
* FBUtilities.getAvailableProcessors());
 -        batchThreshold = 
conf.getLong(ColumnFamilyOutputFormat.BATCH_THRESHOLD, 32);
 +        this.queueSize = conf.getInt(CqlOutputFormat.QUEUE_SIZE, 32 * 
FBUtilities.getAvailableProcessors());
 +        batchThreshold = conf.getLong(CqlOutputFormat.BATCH_THRESHOLD, 32);
          this.clients = new HashMap<>();
+         String keyspace = ConfigHelper.getOutputKeyspace(conf);
  
-         try (Cluster cluster = 
CqlConfigHelper.getOutputCluster(ConfigHelper.getOutputInitialAddress(conf), 
conf))
+         try (Cluster cluster = 
CqlConfigHelper.getOutputCluster(ConfigHelper.getOutputInitialAddress(conf), 
conf);
+              Session client = cluster.connect(keyspace))
          {
-             String keyspace = ConfigHelper.getOutputKeyspace(conf);
-             Session client = cluster.connect(keyspace);
              ringCache = new NativeRingCache(conf);
              if (client != null)
              {
@@@ -279,45 -295,67 +294,68 @@@
          public void run()
          {
              Session session = null;
-             outer:
-             while (run || !queue.isEmpty())
++
+             try
              {
-                 List<ByteBuffer> bindVariables;
-                 try
+                 outer:
+                 while (run || !queue.isEmpty())
                  {
-                     bindVariables = queue.take();
-                 }
-                 catch (InterruptedException e)
-                 {
-                     // re-check loop condition after interrupt
-                     continue;
-                 }
+                     List<ByteBuffer> bindVariables;
+                     try
+                     {
+                         bindVariables = queue.take();
+                     }
+                     catch (InterruptedException e)
+                     {
+                         // re-check loop condition after interrupt
+                         continue;
+                     }
  
-                 ListIterator<InetAddress> iter = endpoints.listIterator();
-                 while (true)
-                 {
-                     // send the mutation to the last-used endpoint.  first 
time through, this will NPE harmlessly.
-                     if (session != null)
+                     ListIterator<InetAddress> iter = endpoints.listIterator();
+                     while (true)
                      {
-                         try
+                         // send the mutation to the last-used endpoint.  
first time through, this will NPE harmlessly.
+                         if (session != null)
                          {
-                             int i = 0;
-                             PreparedStatement statement = 
preparedStatement(session);
-                             while (bindVariables != null)
+                             try
                              {
-                                 BoundStatement boundStatement = new 
BoundStatement(statement);
-                                 for (int columnPosition = 0; columnPosition < 
bindVariables.size(); columnPosition++)
+                                 int i = 0;
+                                 PreparedStatement statement = 
preparedStatement(session);
+                                 while (bindVariables != null)
                                  {
-                                     
boundStatement.setBytesUnsafe(columnPosition, 
bindVariables.get(columnPosition));
+                                     BoundStatement boundStatement = new 
BoundStatement(statement);
+                                     for (int columnPosition = 0; 
columnPosition < bindVariables.size(); columnPosition++)
+                                     {
+                                         
boundStatement.setBytesUnsafe(columnPosition, 
bindVariables.get(columnPosition));
+                                     }
+                                     session.execute(boundStatement);
+                                     i++;
+ 
+                                     if (i >= batchThreshold)
+                                         break;
+                                     bindVariables = queue.poll();
                                  }
-                                 session.execute(boundStatement);
-                                 i++;
- 
-                                 if (i >= batchThreshold)
-                                     break;
-                                 bindVariables = queue.poll();
+                                 break;
                              }
-                             break;
+                             catch (Exception e)
+                             {
+                                 closeInternal();
+                                 if (!iter.hasNext())
+                                 {
+                                     lastException = new IOException(e);
+                                     break outer;
+                                 }
+                             }
+                         }
+ 
+                         // attempt to connect to a different endpoint
+                         try
+                         {
+                             InetAddress address = iter.next();
+                             String host = address.getHostName();
+                             cluster = CqlConfigHelper.getOutputCluster(host, 
conf);
+                             closeSession(session);
+                             session = cluster.connect();
                          }
                          catch (Exception e)
                          {
@@@ -329,37 -378,13 +378,12 @@@
                              }
                          }
                      }
- 
-                     // attempt to connect to a different endpoint
-                     try
-                     {
-                         InetAddress address = iter.next();
-                         String host = address.getHostName();
-                         cluster = CqlConfigHelper.getOutputCluster(host, 
conf);
-                         session = cluster.connect();
-                     }
-                     catch (Exception e)
-                     {
-                         //If connection died due to Interrupt, just try 
connecting to the endpoint again.
-                         //There are too many ways for the 
Thread.interrupted() state to be cleared, so
-                         //we can't rely on that here. Until the java driver 
gives us a better way of knowing
-                         //that this exception came from an 
InterruptedException, this is the best solution.
-                         if (canRetryDriverConnection(e))
-                         {
-                             iter.previous();
-                         }
-                         closeInternal();
- 
-                         // Most exceptions mean something unexpected went 
wrong to that endpoint, so
-                         // we should try again to another.  Other exceptions 
(auth or invalid request) are fatal.
-                         if ((e instanceof AuthenticationException || e 
instanceof InvalidQueryException) || !iter.hasNext())
-                         {
-                             lastException = new IOException(e);
-                             break outer;
-                         }
-                     }
                  }
              }
+             finally
+             {
+                 closeSession(session);
+             }
 -
              // close all our connections once we are done.
              closeInternal();
          }

Reply via email to