Repository: cassandra Updated Branches: refs/heads/cassandra-2.1 3a03d4bd5 -> a323a1a6d refs/heads/trunk 96addeedf -> c69f257f9
(cqlsh) COPY TO/FROM improvements patch by Mikhail Stepura; reviewed by Tyler Hobbs for CASSANDRA-7405 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/a323a1a6 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/a323a1a6 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/a323a1a6 Branch: refs/heads/cassandra-2.1 Commit: a323a1a6d5f28ced1a51ba559055283f3eb356ff Parents: 3a03d4b Author: Mikhail Stepura <mish...@apache.org> Authored: Fri Aug 15 14:35:18 2014 -0700 Committer: Mikhail Stepura <mish...@apache.org> Committed: Tue Aug 19 13:22:24 2014 -0700 ---------------------------------------------------------------------- CHANGES.txt | 1 + bin/cqlsh | 50 ++++++++-------- pylib/cqlshlib/async_insert.py | 110 ++++++++++++++++++++++++++++++++++++ 3 files changed, 135 insertions(+), 26 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/a323a1a6/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index d29a2db..6facab9 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 2.1.1 + * (cqlsh) COPY TO/FROM improvements (CASSANDRA-7405) * Support list index operations with conditions (CASSANDRA-7499) * Add max live/tombstoned cells to nodetool cfstats output (CASSANDRA-7731) * Validate IPv6 wildcard addresses properly (CASSANDRA-7680) http://git-wip-us.apache.org/repos/asf/cassandra/blob/a323a1a6/bin/cqlsh ---------------------------------------------------------------------- diff --git a/bin/cqlsh b/bin/cqlsh index b65ae00..852f4f5 100755 --- a/bin/cqlsh +++ b/bin/cqlsh @@ -118,7 +118,7 @@ cqlshlibdir = os.path.join(CASSANDRA_PATH, 'pylib') if os.path.isdir(cqlshlibdir): sys.path.insert(0, cqlshlibdir) -from cqlshlib import cqlhandling, cql3handling, pylexotron, sslhandling +from cqlshlib import cqlhandling, cql3handling, pylexotron, sslhandling, async_insert from cqlshlib.displaying import (RED, BLUE, CYAN, ANSI_RESET, COLUMN_NAME_COLORS, FormattedValue, colorme) from cqlshlib.formatting import format_by_type, formatter_for, format_value_utype @@ -1348,27 +1348,33 @@ class Shell(cmd.Cmd): if header: linesource.next() table_meta = self.get_table_meta(ks, cf) - rownum = -1 reader = csv.reader(linesource, **dialect_options) - for rownum, row in enumerate(reader): - if len(row) != len(columns): - self.printerr("Record #%d (line %d) has the wrong number of fields " - "(%d instead of %d)." - % (rownum, reader.line_num, len(row), len(columns))) - return rownum - if not self.do_import_row(columns, nullval, table_meta, row): - self.printerr("Aborting import at record #%d (line %d). " - "Previously-inserted values still present." - % (rownum, reader.line_num)) - return rownum + from functools import partial + rownum, error = \ + async_insert.insert_concurrent(self.session, enumerate(reader, start=1), + partial( + self.create_insert_statement, + columns, nullval, + table_meta)) + if error: + self.printerr(str(error[0])) + self.printerr("Aborting import at record #%d. " + "Previously-inserted values still present." + % error[1]) finally: if do_close: linesource.close() elif self.tty: print - return rownum + 1 + return rownum-1 + + def create_insert_statement(self, columns, nullval, table_meta, row): + + if len(row) != len(columns): + raise ValueError( + "Record has the wrong number of fields (%d instead of %d)." + % (len(row), len(columns))) - def do_import_row(self, columns, nullval, table_meta, row): rowmap = {} primary_key_columns = [col.name for col in table_meta.primary_key] for name, value in zip(columns, row): @@ -1390,9 +1396,6 @@ class Shell(cmd.Cmd): return False else: rowmap[name] = 'null' - return self.do_import_insert(table_meta, rowmap) - - def do_import_insert(self, table_meta, rowmap): # would be nice to be able to use a prepared query here, but in order # to use that interface, we'd need to have all the input as native # values already, reading them from text just like the various @@ -1406,7 +1409,8 @@ class Shell(cmd.Cmd): ) if self.debug: print 'Import using CQL: %s' % query - return self.perform_simple_statement(SimpleStatement(query)) + return SimpleStatement(query) + def perform_csv_export(self, ks, cf, columns, fname, opts): dialect_options = self.csv_dialect_defaults.copy() @@ -1460,13 +1464,7 @@ class Shell(cmd.Cmd): if columns is None: columns = self.get_column_names(ks, cf) columnlist = ', '.join(protect_names(columns)) - # this limit is pretty awful. would be better to use row-key-paging, so - # that the dump could be pretty easily aborted if necessary, but that - # can be kind of tricky with cql3. Punt for now, until the real cursor - # API is added in CASSANDRA-4415. - # https://datastax-oss.atlassian.net/browse/PYTHON-16 - query = 'SELECT %s FROM %s.%s LIMIT 99999999' \ - % (columnlist, protect_name(ks), protect_name(cf)) + query = 'SELECT %s FROM %s.%s' % (columnlist, protect_name(ks), protect_name(cf)) return self.session.execute(query) def do_show(self, parsed): http://git-wip-us.apache.org/repos/asf/cassandra/blob/a323a1a6/pylib/cqlshlib/async_insert.py ---------------------------------------------------------------------- diff --git a/pylib/cqlshlib/async_insert.py b/pylib/cqlshlib/async_insert.py new file mode 100644 index 0000000..145b0d6 --- /dev/null +++ b/pylib/cqlshlib/async_insert.py @@ -0,0 +1,110 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from itertools import count +from threading import Event, Condition +import sys + + +class _CountDownLatch(object): + def __init__(self, counter=1): + self._count = counter + self._lock = Condition() + + def count_down(self): + with self._lock: + self._count -= 1 + if self._count <= 0: + self._lock.notifyAll() + + def await(self): + with self._lock: + while self._count > 0: + self._lock.wait() + + +class _ChainedWriter(object): + + CONCURRENCY = 100 + + def __init__(self, session, enumerated_reader, statement_func): + self._sentinel = object() + self._session = session + self._cancellation_event = Event() + self._first_error = None + self._num_finished = count(start=1) + self._task_counter = _CountDownLatch(self.CONCURRENCY) + self._enumerated_reader = enumerated_reader + self._statement_func = statement_func + + def insert(self): + if not self._enumerated_reader: + return 0, None + + for i in xrange(self.CONCURRENCY): + self._execute_next(self._sentinel, 0) + + self._task_counter.await() + return next(self._num_finished), self._first_error + + def _abort(self, error, failed_record): + if not self._first_error: + self._first_error = error, failed_record + self._task_counter.count_down() + self._cancellation_event.set() + + def _handle_error(self, error, failed_record): + self._abort(error, failed_record) + + def _execute_next(self, result, last_completed_record): + if self._cancellation_event.is_set(): + self._task_counter.count_down() + return + + if result is not self._sentinel: + finished = next(self._num_finished) + if not finished % 1000: + sys.stdout.write('Imported %s rows\r' % finished) + sys.stdout.flush() + + try: + (current_record, row) = next(self._enumerated_reader) + except StopIteration: + self._task_counter.count_down() + return + except Exception as exc: + self._abort(exc, last_completed_record) + return + + if self._cancellation_event.is_set(): + self._task_counter.count_down() + return + + try: + statement = self._statement_func(row) + future = self._session.execute_async(statement) + future.add_callbacks(callback=self._execute_next, + callback_args=(current_record,), + errback=self._handle_error, + errback_args=(current_record,)) + except Exception as exc: + self._abort(exc, current_record) + return + + +def insert_concurrent(session, enumerated_reader, statement_func): + return _ChainedWriter(session, enumerated_reader, statement_func).insert() +