Repository: cassandra Updated Branches: refs/heads/cassandra-2.1 42644c324 -> a9b542205 refs/heads/cassandra-2.2 cab3d5d12 -> 6c1ef2ba4 refs/heads/cassandra-3.0 70eab633f -> 3efc609e0 refs/heads/cassandra-3.5 acc2f89c1 -> c7ef7c91c refs/heads/trunk 5beedbc66 -> 860291872
COPY FROM on large datasets: fixed problem on single core machines patch by Stefania Alborghetti; reviewed by Adam Holmberg for CASSANDRA-11053 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/a9b54220 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/a9b54220 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/a9b54220 Branch: refs/heads/cassandra-2.1 Commit: a9b5422057054b0ba612164d56d7cce5567e48df Parents: 42644c3 Author: Stefania Alborghetti <stefania.alborghe...@datastax.com> Authored: Fri Mar 18 13:33:21 2016 +0800 Committer: Josh McKenzie <josh.mcken...@datastax.com> Committed: Mon Mar 28 13:54:37 2016 -0400 ---------------------------------------------------------------------- pylib/cqlshlib/copyutil.py | 98 +++++++++++++++++++++++++++++------------ 1 file changed, 69 insertions(+), 29 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/a9b54220/pylib/cqlshlib/copyutil.py ---------------------------------------------------------------------- diff --git a/pylib/cqlshlib/copyutil.py b/pylib/cqlshlib/copyutil.py index cd03765..ba2a47b 100644 --- a/pylib/cqlshlib/copyutil.py +++ b/pylib/cqlshlib/copyutil.py @@ -29,16 +29,17 @@ import re import struct import sys import time +import threading import traceback from bisect import bisect_right from calendar import timegm from collections import defaultdict, namedtuple from decimal import Decimal +from Queue import Queue from random import randrange from StringIO import StringIO from select import select -from threading import Lock from uuid import UUID from util import profile_on, profile_off @@ -161,11 +162,11 @@ class CopyTask(object): self.options = self.parse_options(opts, direction) self.num_processes = self.options.copy['numprocesses'] - if direction == 'in': - self.num_processes += 1 # add the feeder process - self.printmsg('Using %d child processes' % (self.num_processes,)) + if direction == 'from': + self.num_processes += 1 # add the feeder process + self.processes = [] self.inmsg = OneWayChannels(self.num_processes) self.outmsg = OneWayChannels(self.num_processes) @@ -295,17 +296,20 @@ class CopyTask(object): def get_num_processes(cap): """ Pick a reasonable number of child processes. We need to leave at - least one core for the parent process. + least one core for the parent or feeder process. """ return max(1, min(cap, CopyTask.get_num_cores() - 1)) @staticmethod def get_num_cores(): """ - Return the number of cores if available. + Return the number of cores if available. If the test environment variable + is set, then return the number carried by this variable. This is to test single-core + machine more easily. """ try: - return mp.cpu_count() + num_cores_for_testing = os.environ.get('CQLSH_COPY_TEST_NUM_CORES', '') + return int(num_cores_for_testing) if num_cores_for_testing else mp.cpu_count() except NotImplementedError: return 1 @@ -690,22 +694,20 @@ class ExportTask(CopyTask): if token_range is None and result is None: # a request has finished succeeded += 1 elif isinstance(result, Exception): # an error occurred - if token_range is None: # the entire process failed - shell.printerr('Error from worker process: %s' % (result)) - else: # only this token_range failed, retry up to max_attempts if no rows received yet, - # If rows were already received we'd risk duplicating data. - # Note that there is still a slight risk of duplicating data, even if we have - # an error with no rows received yet, it's just less likely. To avoid retrying on - # all timeouts would however mean we could risk not exporting some rows. - if ranges[token_range]['attempts'] < max_attempts and ranges[token_range]['rows'] == 0: - shell.printerr('Error for %s: %s (will try again later attempt %d of %d)' - % (token_range, result, ranges[token_range]['attempts'], max_attempts)) - self.send_work(ranges, [token_range]) - else: - shell.printerr('Error for %s: %s (permanently given up after %d rows and %d attempts)' - % (token_range, result, ranges[token_range]['rows'], - ranges[token_range]['attempts'])) - failed += 1 + # This token_range failed, retry up to max_attempts if no rows received yet, + # If rows were already received we'd risk duplicating data. + # Note that there is still a slight risk of duplicating data, even if we have + # an error with no rows received yet, it's just less likely. To avoid retrying on + # all timeouts would however mean we could risk not exporting some rows. + if ranges[token_range]['attempts'] < max_attempts and ranges[token_range]['rows'] == 0: + shell.printerr('Error for %s: %s (will try again later attempt %d of %d)' + % (token_range, result, ranges[token_range]['attempts'], max_attempts)) + self.send_work(ranges, [token_range]) + else: + shell.printerr('Error for %s: %s (permanently given up after %d rows and %d attempts)' + % (token_range, result, ranges[token_range]['rows'], + ranges[token_range]['attempts'])) + failed += 1 else: # partial result received data, num = result self.writer.write(data, num) @@ -1313,7 +1315,7 @@ class ExportSession(object): self.cluster = cluster self.session = session self.requests = 1 - self.lock = Lock() + self.lock = threading.Lock() self.consistency_level = export_process.consistency_level def add_request(self): @@ -1351,6 +1353,7 @@ class ExportProcess(ChildProcess): self.hosts_to_sessions = dict() self.formatters = dict() self.options = options + self.responses = None def run(self): try: @@ -1368,6 +1371,8 @@ class ExportProcess(ChildProcess): we can signal a global error by sending (None, error). We terminate when the inbound queue is closed. """ + self.init_feeder_thread() + while True: if self.num_requests() > self.max_requests: time.sleep(0.001) # 1 millisecond @@ -1376,6 +1381,37 @@ class ExportProcess(ChildProcess): token_range, info = self.inmsg.recv() self.start_request(token_range, info) + def init_feeder_thread(self): + """ + Start a thread to feed response messages to the parent process. + + It is not safe to write on the pipe from the main thread if the parent process is still sending work and + not receiving yet. This will in fact block the main thread on the send, which in turn won't be able to call + recv(), and will therefore block the parent process on its send(). + + It is also not safe to write on the pipe from the driver receiving thread whilst the parent process is + sending work, because if the receiving thread stops making progress, then the main thread may no longer + call recv() due to the check on the maximum number of requests in inner_run(). + + These deadlocks are easiest to reproduce with a single worker process, but may well affect multiple worker + processes too. + + It is important that the order of the responses in the queue is respected, or else the parent process may + kill off worker processes before it has received all the pages of the last token range. + """ + def feed_errors(): + while True: + try: + self.outmsg.send(self.responses.get()) + except Exception, e: + self.printdebugmsg(e.message) + + self.responses = Queue() + + thread = threading.Thread(target=feed_errors) + thread.setDaemon(True) + thread.start() + @staticmethod def get_error_message(err, print_traceback=False): if isinstance(err, str): @@ -1388,10 +1424,13 @@ class ExportProcess(ChildProcess): msg = str(err) return msg - def report_error(self, err, token_range=None): + def report_error(self, err, token_range): msg = self.get_error_message(err, print_traceback=self.debug) self.printdebugmsg(msg) - self.outmsg.send((token_range, Exception(msg))) + self.send((token_range, Exception(msg))) + + def send(self, response): + self.responses.put(response) def start_request(self, token_range, info): """ @@ -1434,7 +1473,8 @@ class ExportProcess(ChildProcess): self.printdebugmsg("Warning: failed to connect to some replicas: %s" % (errors,)) return ret - self.report_error("Failed to connect to all replicas %s for %s, errors: %s" % (hosts, token_range, errors)) + self.report_error("Failed to connect to all replicas %s for %s, errors: %s" % (hosts, token_range, errors), + token_range) return None def connect(self, host): @@ -1467,7 +1507,7 @@ class ExportProcess(ChildProcess): self.write_rows_to_csv(token_range, rows) else: self.write_rows_to_csv(token_range, rows) - self.outmsg.send((None, None)) + self.send((None, None)) session.complete_request() def err_callback(err): @@ -1488,7 +1528,7 @@ class ExportProcess(ChildProcess): writer.writerow(map(self.format_value, row)) data = (output.getvalue(), len(rows)) - self.outmsg.send((token_range, data)) + self.send((token_range, data)) output.close() except Exception, e: