test_bulk_round_trip_blogposts is failing occasionally

Patch by Stefania Alborghetti; reviewed by Paulo Motta for CASSANDRA-10938


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/165f586e
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/165f586e
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/165f586e

Branch: refs/heads/cassandra-2.2
Commit: 165f586e6f5e7e5d08f0b85e5b00dbe1f68e3e8f
Parents: 3b794f0
Author: Stefania Alborghetti <stefania.alborghe...@datastax.com>
Authored: Mon Jan 11 09:31:36 2016 +0000
Committer: Sylvain Lebresne <sylv...@datastax.com>
Committed: Tue Feb 2 14:51:19 2016 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 pylib/cqlshlib/copyutil.py                      | 87 +++++++++++++-------
 .../cassandra/transport/ServerConnection.java   |  5 +-
 3 files changed, 58 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/165f586e/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index e3e53d8..1793c32 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.1.13
+ * test_bulk_round_trip_blogposts is failing occasionally (CASSANDRA-10938)
  * Fix isJoined return true only after becoming cluster member (CASANDRA-11007)
  * Fix bad gossip generation seen in long-running clusters (CASSANDRA-10969)
  * Avoid NPE when incremental repair fails (CASSANDRA-10909)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/165f586e/pylib/cqlshlib/copyutil.py
----------------------------------------------------------------------
diff --git a/pylib/cqlshlib/copyutil.py b/pylib/cqlshlib/copyutil.py
index b015a77..f9e4a85 100644
--- a/pylib/cqlshlib/copyutil.py
+++ b/pylib/cqlshlib/copyutil.py
@@ -1106,7 +1106,7 @@ class ExportSession(object):
         session.default_timeout = export_process.options.copy['pagetimeout']
 
         export_process.printdebugmsg("Created connection to %s with page size 
%d and timeout %d seconds per page"
-                                     % (session.hosts, 
session.default_fetch_size, session.default_timeout))
+                                     % (cluster.contact_points, 
session.default_fetch_size, session.default_timeout))
 
         self.cluster = cluster
         self.session = session
@@ -1175,16 +1175,20 @@ class ExportProcess(ChildProcess):
             token_range, info = self.inmsg.get()
             self.start_request(token_range, info)
 
-    def report_error(self, err, token_range=None):
+    @staticmethod
+    def get_error_message(err, print_traceback=False):
         if isinstance(err, str):
             msg = err
         elif isinstance(err, BaseException):
             msg = "%s - %s" % (err.__class__.__name__, err)
-            if self.debug:
+            if print_traceback:
                 traceback.print_exc(err)
         else:
             msg = str(err)
+        return msg
 
+    def report_error(self, err, token_range=None):
+        msg = self.get_error_message(err, print_traceback=self.debug)
         self.printdebugmsg(msg)
         self.outmsg.put((token_range, Exception(msg)))
 
@@ -1193,48 +1197,67 @@ class ExportProcess(ChildProcess):
         Begin querying a range by executing an async query that
         will later on invoke the callbacks attached in attach_callbacks.
         """
-        session = self.get_session(info['hosts'])
-        metadata = 
session.cluster.metadata.keyspaces[self.ks].tables[self.table]
-        query = self.prepare_query(metadata.partition_key, token_range, 
info['attempts'])
-        future = session.execute_async(query)
-        self.attach_callbacks(token_range, future, session)
+        session = self.get_session(info['hosts'], token_range)
+        if session:
+            metadata = 
session.cluster.metadata.keyspaces[self.ks].tables[self.table]
+            query = self.prepare_query(metadata.partition_key, token_range, 
info['attempts'])
+            future = session.execute_async(query)
+            self.attach_callbacks(token_range, future, session)
 
     def num_requests(self):
         return sum(session.num_requests() for session in 
self.hosts_to_sessions.values())
 
-    def get_session(self, hosts):
+    def get_session(self, hosts, token_range):
         """
-        We select a host to connect to. If we have no connections to one of 
the hosts
-        yet then we select this host, else we pick the one with the smallest 
number
-        of requests.
+        We return a session connected to one of the hosts passed in, which are 
valid replicas for
+        the token range. We sort replicas by favouring those without any 
active requests yet or with the
+        smallest number of requests. If we fail to connect we report an error 
so that the token will
+        be retried again later.
 
         :return: An ExportSession connected to the chosen host.
         """
-        new_hosts = [h for h in hosts if h not in self.hosts_to_sessions]
-        if new_hosts:
-            host = new_hosts[0]
-            new_cluster = Cluster(
-                contact_points=(host,),
-                port=self.port,
-                cql_version=self.cql_version,
-                protocol_version=self.protocol_version,
-                auth_provider=self.auth_provider,
-                ssl_options=ssl_settings(host, self.config_file) if self.ssl 
else None,
-                
load_balancing_policy=TokenAwarePolicy(WhiteListRoundRobinPolicy(hosts)),
-                default_retry_policy=ExpBackoffRetryPolicy(self),
-                compression=None,
-                control_connection_timeout=self.connect_timeout,
-                connect_timeout=self.connect_timeout)
+        # sorted replicas favouring those with no connections yet
+        hosts = sorted(hosts,
+                       key=lambda hh: 0 if hh not in self.hosts_to_sessions 
else self.hosts_to_sessions[hh].requests)
 
-            session = ExportSession(new_cluster, self)
-            self.hosts_to_sessions[host] = session
-            return session
-        else:
-            host = min(hosts, key=lambda hh: 
self.hosts_to_sessions[hh].requests)
+        errors = []
+        ret = None
+        for host in hosts:
+            try:
+                ret = self.connect(host)
+            except Exception, e:
+                errors.append(self.get_error_message(e))
+
+            if ret:
+                if errors:
+                    self.printdebugmsg("Warning: failed to connect to some 
replicas: %s" % (errors,))
+                return ret
+
+        self.report_error("Failed to connect to all replicas %s for %s, 
errors: %s" % (hosts, token_range, errors))
+        return None
+
+    def connect(self, host):
+        if host in self.hosts_to_sessions.keys():
             session = self.hosts_to_sessions[host]
             session.add_request()
             return session
 
+        new_cluster = Cluster(
+            contact_points=(host,),
+            port=self.port,
+            cql_version=self.cql_version,
+            protocol_version=self.protocol_version,
+            auth_provider=self.auth_provider,
+            ssl_options=ssl_settings(host, self.config_file) if self.ssl else 
None,
+            load_balancing_policy=WhiteListRoundRobinPolicy([host]),
+            default_retry_policy=ExpBackoffRetryPolicy(self),
+            compression=None,
+            control_connection_timeout=self.connect_timeout,
+            connect_timeout=self.connect_timeout)
+        session = ExportSession(new_cluster, self)
+        self.hosts_to_sessions[host] = session
+        return session
+
     def attach_callbacks(self, token_range, future, session):
         def result_callback(rows):
             if future.has_more_pages:

http://git-wip-us.apache.org/repos/asf/cassandra/blob/165f586e/src/java/org/apache/cassandra/transport/ServerConnection.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/ServerConnection.java 
b/src/java/org/apache/cassandra/transport/ServerConnection.java
index 5991b33..ce4d164 100644
--- a/src/java/org/apache/cassandra/transport/ServerConnection.java
+++ b/src/java/org/apache/cassandra/transport/ServerConnection.java
@@ -17,6 +17,7 @@
  */
 package org.apache.cassandra.transport;
 
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 
 import io.netty.channel.Channel;
@@ -28,8 +29,6 @@ import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.service.ClientState;
 import org.apache.cassandra.service.QueryState;
 
-import org.cliffc.high_scale_lib.NonBlockingHashMap;
-
 public class ServerConnection extends Connection
 {
     private enum State { UNINITIALIZED, AUTHENTICATION, READY }
@@ -38,7 +37,7 @@ public class ServerConnection extends Connection
     private final ClientState clientState;
     private volatile State state;
 
-    private final ConcurrentMap<Integer, QueryState> queryStates = new 
NonBlockingHashMap<>();
+    private final ConcurrentMap<Integer, QueryState> queryStates = new 
ConcurrentHashMap<>();
 
     public ServerConnection(Channel channel, int version, Connection.Tracker 
tracker)
     {

Reply via email to