test_bulk_round_trip_blogposts is failing occasionally Patch by Stefania Alborghetti; reviewed by Paulo Motta for CASSANDRA-10938
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/165f586e Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/165f586e Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/165f586e Branch: refs/heads/cassandra-2.2 Commit: 165f586e6f5e7e5d08f0b85e5b00dbe1f68e3e8f Parents: 3b794f0 Author: Stefania Alborghetti <stefania.alborghe...@datastax.com> Authored: Mon Jan 11 09:31:36 2016 +0000 Committer: Sylvain Lebresne <sylv...@datastax.com> Committed: Tue Feb 2 14:51:19 2016 +0100 ---------------------------------------------------------------------- CHANGES.txt | 1 + pylib/cqlshlib/copyutil.py | 87 +++++++++++++------- .../cassandra/transport/ServerConnection.java | 5 +- 3 files changed, 58 insertions(+), 35 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/165f586e/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index e3e53d8..1793c32 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 2.1.13 + * test_bulk_round_trip_blogposts is failing occasionally (CASSANDRA-10938) * Fix isJoined return true only after becoming cluster member (CASANDRA-11007) * Fix bad gossip generation seen in long-running clusters (CASSANDRA-10969) * Avoid NPE when incremental repair fails (CASSANDRA-10909) http://git-wip-us.apache.org/repos/asf/cassandra/blob/165f586e/pylib/cqlshlib/copyutil.py ---------------------------------------------------------------------- diff --git a/pylib/cqlshlib/copyutil.py b/pylib/cqlshlib/copyutil.py index b015a77..f9e4a85 100644 --- a/pylib/cqlshlib/copyutil.py +++ b/pylib/cqlshlib/copyutil.py @@ -1106,7 +1106,7 @@ class ExportSession(object): session.default_timeout = export_process.options.copy['pagetimeout'] export_process.printdebugmsg("Created connection to %s with page size %d and timeout %d seconds per page" - % (session.hosts, session.default_fetch_size, session.default_timeout)) + % (cluster.contact_points, session.default_fetch_size, session.default_timeout)) self.cluster = cluster self.session = session @@ -1175,16 +1175,20 @@ class ExportProcess(ChildProcess): token_range, info = self.inmsg.get() self.start_request(token_range, info) - def report_error(self, err, token_range=None): + @staticmethod + def get_error_message(err, print_traceback=False): if isinstance(err, str): msg = err elif isinstance(err, BaseException): msg = "%s - %s" % (err.__class__.__name__, err) - if self.debug: + if print_traceback: traceback.print_exc(err) else: msg = str(err) + return msg + def report_error(self, err, token_range=None): + msg = self.get_error_message(err, print_traceback=self.debug) self.printdebugmsg(msg) self.outmsg.put((token_range, Exception(msg))) @@ -1193,48 +1197,67 @@ class ExportProcess(ChildProcess): Begin querying a range by executing an async query that will later on invoke the callbacks attached in attach_callbacks. """ - session = self.get_session(info['hosts']) - metadata = session.cluster.metadata.keyspaces[self.ks].tables[self.table] - query = self.prepare_query(metadata.partition_key, token_range, info['attempts']) - future = session.execute_async(query) - self.attach_callbacks(token_range, future, session) + session = self.get_session(info['hosts'], token_range) + if session: + metadata = session.cluster.metadata.keyspaces[self.ks].tables[self.table] + query = self.prepare_query(metadata.partition_key, token_range, info['attempts']) + future = session.execute_async(query) + self.attach_callbacks(token_range, future, session) def num_requests(self): return sum(session.num_requests() for session in self.hosts_to_sessions.values()) - def get_session(self, hosts): + def get_session(self, hosts, token_range): """ - We select a host to connect to. If we have no connections to one of the hosts - yet then we select this host, else we pick the one with the smallest number - of requests. + We return a session connected to one of the hosts passed in, which are valid replicas for + the token range. We sort replicas by favouring those without any active requests yet or with the + smallest number of requests. If we fail to connect we report an error so that the token will + be retried again later. :return: An ExportSession connected to the chosen host. """ - new_hosts = [h for h in hosts if h not in self.hosts_to_sessions] - if new_hosts: - host = new_hosts[0] - new_cluster = Cluster( - contact_points=(host,), - port=self.port, - cql_version=self.cql_version, - protocol_version=self.protocol_version, - auth_provider=self.auth_provider, - ssl_options=ssl_settings(host, self.config_file) if self.ssl else None, - load_balancing_policy=TokenAwarePolicy(WhiteListRoundRobinPolicy(hosts)), - default_retry_policy=ExpBackoffRetryPolicy(self), - compression=None, - control_connection_timeout=self.connect_timeout, - connect_timeout=self.connect_timeout) + # sorted replicas favouring those with no connections yet + hosts = sorted(hosts, + key=lambda hh: 0 if hh not in self.hosts_to_sessions else self.hosts_to_sessions[hh].requests) - session = ExportSession(new_cluster, self) - self.hosts_to_sessions[host] = session - return session - else: - host = min(hosts, key=lambda hh: self.hosts_to_sessions[hh].requests) + errors = [] + ret = None + for host in hosts: + try: + ret = self.connect(host) + except Exception, e: + errors.append(self.get_error_message(e)) + + if ret: + if errors: + self.printdebugmsg("Warning: failed to connect to some replicas: %s" % (errors,)) + return ret + + self.report_error("Failed to connect to all replicas %s for %s, errors: %s" % (hosts, token_range, errors)) + return None + + def connect(self, host): + if host in self.hosts_to_sessions.keys(): session = self.hosts_to_sessions[host] session.add_request() return session + new_cluster = Cluster( + contact_points=(host,), + port=self.port, + cql_version=self.cql_version, + protocol_version=self.protocol_version, + auth_provider=self.auth_provider, + ssl_options=ssl_settings(host, self.config_file) if self.ssl else None, + load_balancing_policy=WhiteListRoundRobinPolicy([host]), + default_retry_policy=ExpBackoffRetryPolicy(self), + compression=None, + control_connection_timeout=self.connect_timeout, + connect_timeout=self.connect_timeout) + session = ExportSession(new_cluster, self) + self.hosts_to_sessions[host] = session + return session + def attach_callbacks(self, token_range, future, session): def result_callback(rows): if future.has_more_pages: http://git-wip-us.apache.org/repos/asf/cassandra/blob/165f586e/src/java/org/apache/cassandra/transport/ServerConnection.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/ServerConnection.java b/src/java/org/apache/cassandra/transport/ServerConnection.java index 5991b33..ce4d164 100644 --- a/src/java/org/apache/cassandra/transport/ServerConnection.java +++ b/src/java/org/apache/cassandra/transport/ServerConnection.java @@ -17,6 +17,7 @@ */ package org.apache.cassandra.transport; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import io.netty.channel.Channel; @@ -28,8 +29,6 @@ import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.service.ClientState; import org.apache.cassandra.service.QueryState; -import org.cliffc.high_scale_lib.NonBlockingHashMap; - public class ServerConnection extends Connection { private enum State { UNINITIALIZED, AUTHENTICATION, READY } @@ -38,7 +37,7 @@ public class ServerConnection extends Connection private final ClientState clientState; private volatile State state; - private final ConcurrentMap<Integer, QueryState> queryStates = new NonBlockingHashMap<>(); + private final ConcurrentMap<Integer, QueryState> queryStates = new ConcurrentHashMap<>(); public ServerConnection(Channel channel, int version, Connection.Tracker tracker) {