Repository: cassandra
Updated Branches:
  refs/heads/cassandra-2.1 3a03d4bd5 -> a323a1a6d
  refs/heads/trunk 96addeedf -> c69f257f9


(cqlsh) COPY TO/FROM improvements

patch by Mikhail Stepura; reviewed by Tyler Hobbs for CASSANDRA-7405


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/a323a1a6
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/a323a1a6
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/a323a1a6

Branch: refs/heads/cassandra-2.1
Commit: a323a1a6d5f28ced1a51ba559055283f3eb356ff
Parents: 3a03d4b
Author: Mikhail Stepura <mish...@apache.org>
Authored: Fri Aug 15 14:35:18 2014 -0700
Committer: Mikhail Stepura <mish...@apache.org>
Committed: Tue Aug 19 13:22:24 2014 -0700

----------------------------------------------------------------------
 CHANGES.txt                    |   1 +
 bin/cqlsh                      |  50 ++++++++--------
 pylib/cqlshlib/async_insert.py | 110 ++++++++++++++++++++++++++++++++++++
 3 files changed, 135 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/a323a1a6/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index d29a2db..6facab9 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.1.1
+ * (cqlsh) COPY TO/FROM improvements (CASSANDRA-7405)
  * Support list index operations with conditions (CASSANDRA-7499)
  * Add max live/tombstoned cells to nodetool cfstats output (CASSANDRA-7731)
  * Validate IPv6 wildcard addresses properly (CASSANDRA-7680)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a323a1a6/bin/cqlsh
----------------------------------------------------------------------
diff --git a/bin/cqlsh b/bin/cqlsh
index b65ae00..852f4f5 100755
--- a/bin/cqlsh
+++ b/bin/cqlsh
@@ -118,7 +118,7 @@ cqlshlibdir = os.path.join(CASSANDRA_PATH, 'pylib')
 if os.path.isdir(cqlshlibdir):
     sys.path.insert(0, cqlshlibdir)
 
-from cqlshlib import cqlhandling, cql3handling, pylexotron, sslhandling
+from cqlshlib import cqlhandling, cql3handling, pylexotron, sslhandling, 
async_insert
 from cqlshlib.displaying import (RED, BLUE, CYAN, ANSI_RESET, 
COLUMN_NAME_COLORS,
                                  FormattedValue, colorme)
 from cqlshlib.formatting import format_by_type, formatter_for, 
format_value_utype
@@ -1348,27 +1348,33 @@ class Shell(cmd.Cmd):
             if header:
                 linesource.next()
             table_meta = self.get_table_meta(ks, cf)
-            rownum = -1
             reader = csv.reader(linesource, **dialect_options)
-            for rownum, row in enumerate(reader):
-                if len(row) != len(columns):
-                    self.printerr("Record #%d (line %d) has the wrong number 
of fields "
-                                  "(%d instead of %d)."
-                                  % (rownum, reader.line_num, len(row), 
len(columns)))
-                    return rownum
-                if not self.do_import_row(columns, nullval, table_meta, row):
-                    self.printerr("Aborting import at record #%d (line %d). "
-                                  "Previously-inserted values still present."
-                                  % (rownum, reader.line_num))
-                    return rownum
+            from functools import partial
+            rownum, error = \
+                async_insert.insert_concurrent(self.session, enumerate(reader, 
start=1),
+                                               partial(
+                                                   
self.create_insert_statement,
+                                                   columns, nullval,
+                                                   table_meta))
+            if error:
+                self.printerr(str(error[0]))
+                self.printerr("Aborting import at record #%d. "
+                              "Previously-inserted values still present."
+                               % error[1])
         finally:
             if do_close:
                 linesource.close()
             elif self.tty:
                 print
-        return rownum + 1
+        return rownum-1
+
+    def create_insert_statement(self, columns, nullval, table_meta, row):
+
+        if len(row) != len(columns):
+            raise ValueError(
+                "Record has the wrong number of fields (%d instead of %d)."
+                % (len(row), len(columns)))
 
-    def do_import_row(self, columns, nullval, table_meta, row):
         rowmap = {}
         primary_key_columns = [col.name for col in table_meta.primary_key]
         for name, value in zip(columns, row):
@@ -1390,9 +1396,6 @@ class Shell(cmd.Cmd):
                 return False
             else:
                 rowmap[name] = 'null'
-        return self.do_import_insert(table_meta, rowmap)
-
-    def do_import_insert(self, table_meta, rowmap):
         # would be nice to be able to use a prepared query here, but in order
         # to use that interface, we'd need to have all the input as native
         # values already, reading them from text just like the various
@@ -1406,7 +1409,8 @@ class Shell(cmd.Cmd):
         )
         if self.debug:
             print 'Import using CQL: %s' % query
-        return self.perform_simple_statement(SimpleStatement(query))
+        return SimpleStatement(query)
+
 
     def perform_csv_export(self, ks, cf, columns, fname, opts):
         dialect_options = self.csv_dialect_defaults.copy()
@@ -1460,13 +1464,7 @@ class Shell(cmd.Cmd):
         if columns is None:
             columns = self.get_column_names(ks, cf)
         columnlist = ', '.join(protect_names(columns))
-        # this limit is pretty awful. would be better to use row-key-paging, so
-        # that the dump could be pretty easily aborted if necessary, but that
-        # can be kind of tricky with cql3. Punt for now, until the real cursor
-        # API is added in CASSANDRA-4415.
-        # https://datastax-oss.atlassian.net/browse/PYTHON-16
-        query = 'SELECT %s FROM %s.%s LIMIT 99999999' \
-                % (columnlist, protect_name(ks), protect_name(cf))
+        query = 'SELECT %s FROM %s.%s' % (columnlist, protect_name(ks), 
protect_name(cf))
         return self.session.execute(query)
 
     def do_show(self, parsed):

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a323a1a6/pylib/cqlshlib/async_insert.py
----------------------------------------------------------------------
diff --git a/pylib/cqlshlib/async_insert.py b/pylib/cqlshlib/async_insert.py
new file mode 100644
index 0000000..145b0d6
--- /dev/null
+++ b/pylib/cqlshlib/async_insert.py
@@ -0,0 +1,110 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from itertools import count
+from threading import Event, Condition
+import sys
+
+
+class _CountDownLatch(object):
+    def __init__(self, counter=1):
+        self._count = counter
+        self._lock = Condition()
+
+    def count_down(self):
+        with self._lock:
+            self._count -= 1
+            if self._count <= 0:
+                self._lock.notifyAll()
+
+    def await(self):
+        with self._lock:
+            while self._count > 0:
+                self._lock.wait()
+
+
+class _ChainedWriter(object):
+
+    CONCURRENCY = 100
+
+    def __init__(self, session, enumerated_reader, statement_func):
+        self._sentinel = object()
+        self._session = session
+        self._cancellation_event = Event()
+        self._first_error = None
+        self._num_finished = count(start=1)
+        self._task_counter = _CountDownLatch(self.CONCURRENCY)
+        self._enumerated_reader = enumerated_reader
+        self._statement_func = statement_func
+
+    def insert(self):
+        if not self._enumerated_reader:
+            return 0, None
+
+        for i in xrange(self.CONCURRENCY):
+            self._execute_next(self._sentinel, 0)
+
+        self._task_counter.await()
+        return next(self._num_finished), self._first_error
+
+    def _abort(self, error, failed_record):
+        if not self._first_error:
+            self._first_error = error, failed_record
+        self._task_counter.count_down()
+        self._cancellation_event.set()
+
+    def _handle_error(self, error, failed_record):
+        self._abort(error, failed_record)
+
+    def _execute_next(self, result, last_completed_record):
+        if self._cancellation_event.is_set():
+            self._task_counter.count_down()
+            return
+
+        if result is not self._sentinel:
+            finished = next(self._num_finished)
+            if not finished % 1000:
+                sys.stdout.write('Imported %s rows\r' % finished)
+                sys.stdout.flush()
+
+        try:
+            (current_record, row) = next(self._enumerated_reader)
+        except StopIteration:
+            self._task_counter.count_down()
+            return
+        except Exception as exc:
+            self._abort(exc, last_completed_record)
+            return
+
+        if self._cancellation_event.is_set():
+            self._task_counter.count_down()
+            return
+
+        try:
+            statement = self._statement_func(row)
+            future = self._session.execute_async(statement)
+            future.add_callbacks(callback=self._execute_next,
+                                 callback_args=(current_record,),
+                                 errback=self._handle_error,
+                                 errback_args=(current_record,))
+        except Exception as exc:
+            self._abort(exc, current_record)
+            return
+
+
+def insert_concurrent(session, enumerated_reader, statement_func):
+    return _ChainedWriter(session, enumerated_reader, statement_func).insert()
+

Reply via email to