cqlsh COPY FROM: shutdown parent cluster after forking, to avoid corrupting SSL connections
patch by Stefania Alborghetti; reviewed by Tyler Hobbs for CASSANDRA-11749 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/68319f7c Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/68319f7c Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/68319f7c Branch: refs/heads/cassandra-2.2 Commit: 68319f7c3be232a58e68ca91206283076aa3dedb Parents: 06bb6b9 Author: Stefania Alborghetti <stefania.alborghe...@datastax.com> Authored: Fri May 27 11:00:27 2016 +0800 Committer: Stefania Alborghetti <stefania.alborghe...@datastax.com> Committed: Fri Jun 10 15:49:51 2016 -0500 ---------------------------------------------------------------------- CHANGES.txt | 1 + pylib/cqlshlib/copyutil.py | 38 +++++++++++++++++++++++++++++++++++--- 2 files changed, 36 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/68319f7c/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 619dc61..af641e1 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 2.1.15 + * cqlsh COPY FROM: shutdown parent cluster after forking, to avoid corrupting SSL connections (CASSANDRA-11749) * Updated cqlsh Python driver to fix DESCRIBE problem for legacy tables (CASSANDRA-11055) * cqlsh: apply current keyspace to source command (CASSANDRA-11152) * Backport CASSANDRA-11578 (CASSANDRA-11750) http://git-wip-us.apache.org/repos/asf/cassandra/blob/68319f7c/pylib/cqlshlib/copyutil.py ---------------------------------------------------------------------- diff --git a/pylib/cqlshlib/copyutil.py b/pylib/cqlshlib/copyutil.py index d68812c..0016dfd 100644 --- a/pylib/cqlshlib/copyutil.py +++ b/pylib/cqlshlib/copyutil.py @@ -59,6 +59,7 @@ PROFILE_ON = False STRACE_ON = False DEBUG = False # This may be set to True when initializing the task IS_LINUX = platform.system() == 'Linux' +IS_WINDOWS = platform.system() == 'Windows' CopyOptions = namedtuple('CopyOptions', 'copy dialect unrecognized') @@ -421,9 +422,13 @@ class CopyTask(object): def make_params(self): """ Return a dictionary of parameters to be used by the worker processes. - On Windows this dictionary must be pickle-able. + On Windows this dictionary must be pickle-able, therefore we do not pass the + parent connection since it may not be pickle-able. Also, on Windows child + processes are spawned and not forked, and therefore we don't need to shutdown + the parent connection anyway, see CASSANDRA-11749 for more details. """ shell = self.shell + return dict(ks=self.ks, table=self.table, local_dc=self.local_dc, @@ -434,6 +439,7 @@ class CopyTask(object): port=shell.port, ssl=shell.ssl, auth_provider=shell.auth_provider, + parent_cluster=shell.conn if not IS_WINDOWS else None, cql_version=shell.conn.cql_version, config_file=self.config_file, protocol_version=self.protocol_version, @@ -1072,7 +1078,8 @@ class ImportTask(CopyTask): self.processes.append(ImportProcess(self.update_params(params, i))) feeder = FeedingProcess(self.outmsg.channels[-1], self.inmsg.channels[-1], - self.outmsg.channels[:-1], self.fname, self.options) + self.outmsg.channels[:-1], self.fname, self.options, + self.shell.conn if not IS_WINDOWS else None) self.processes.append(feeder) self.start_processes() @@ -1179,7 +1186,7 @@ class FeedingProcess(mp.Process): """ A process that reads from import sources and sends chunks to worker processes. """ - def __init__(self, inmsg, outmsg, worker_channels, fname, options): + def __init__(self, inmsg, outmsg, worker_channels, fname, options, parent_cluster): mp.Process.__init__(self, target=self.run) self.inmsg = inmsg self.outmsg = outmsg @@ -1189,6 +1196,15 @@ class FeedingProcess(mp.Process): self.ingest_rate = options.copy['ingestrate'] self.num_worker_processes = options.copy['numprocesses'] self.chunk_id = 0 + self.parent_cluster = parent_cluster + + def on_fork(self): + """ + Release any parent connections after forking, see CASSANDRA-11749 for details. + """ + if self.parent_cluster: + printdebugmsg("Closing parent cluster sockets") + self.parent_cluster.shutdown() def run(self): pr = profile_on() if PROFILE_ON else None @@ -1205,6 +1221,9 @@ class FeedingProcess(mp.Process): here we throttle using the ingest rate in the feeding process because of memory usage concerns. When finished we send back to the parent process the total number of rows sent. """ + + self.on_fork() + reader = self.reader reader.start() channels = self.worker_channels @@ -1268,6 +1287,7 @@ class ChildProcess(mp.Process): self.connect_timeout = params['connect_timeout'] self.cql_version = params['cql_version'] self.auth_provider = params['auth_provider'] + self.parent_cluster = params['parent_cluster'] self.ssl = params['ssl'] self.protocol_version = params['protocol_version'] self.config_file = params['config_file'] @@ -1285,6 +1305,14 @@ class ChildProcess(mp.Process): else: self.test_failures = None + def on_fork(self): + """ + Release any parent connections after forking, see CASSANDRA-11749 for details. + """ + if self.parent_cluster: + printdebugmsg("Closing parent cluster sockets") + self.parent_cluster.shutdown() + def close(self): printdebugmsg("Closing queues...") self.inmsg.close() @@ -1411,6 +1439,9 @@ class ExportProcess(ChildProcess): we can signal a global error by sending (None, error). We terminate when the inbound queue is closed. """ + + self.on_fork() + while True: if self.num_requests() > self.max_requests: time.sleep(0.001) # 1 millisecond @@ -2059,6 +2090,7 @@ class ImportProcess(ChildProcess): try: pr = profile_on() if PROFILE_ON else None + self.on_fork() self.inner_run(*self.make_params()) if pr: