Match cassandra-loader options in COPY FROM (2.2 version) patch by Stefania; reviewed by pauloricardomg for CASSANDRA-9303
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/202cf9b0 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/202cf9b0 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/202cf9b0 Branch: refs/heads/cassandra-3.2 Commit: 202cf9b0bed8bbff41318f1f10043aabf3a7cd4d Parents: 078aabe Author: Stefania Alborghetti <stefania.alborghe...@datastax.com> Authored: Wed Jan 6 12:10:13 2016 +0100 Committer: Sylvain Lebresne <sylv...@datastax.com> Committed: Wed Jan 6 17:56:30 2016 +0100 ---------------------------------------------------------------------- CHANGES.txt | 1 + NEWS.txt | 7 + bin/cqlsh.py | 135 +- conf/cqlshrc.sample | 17 +- pylib/cqlshlib/copyutil.py | 1260 +++++++++++++----- pylib/cqlshlib/formatting.py | 96 +- .../cql3/statements/BatchStatement.java | 28 +- .../cassandra/service/ClientWarningsTest.java | 5 +- tools/bin/cassandra-stress.bat | 2 +- 9 files changed, 1134 insertions(+), 417 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/202cf9b0/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index fc87c7d..b12f593 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -13,6 +13,7 @@ * Disable reloading of GossipingPropertyFileSnitch (CASSANDRA-9474) * Verify tables in pseudo-system keyspaces at startup (CASSANDRA-10761) Merged from 2.1: + * Match cassandra-loader options in COPY FROM (CASSANDRA-9303) * Fix binding to any address in CqlBulkRecordWriter (CASSANDRA-9309) * cqlsh fails to decode utf-8 characters for text typed columns (CASSANDRA-10875) * Log error when stream session fails (CASSANDRA-9294) http://git-wip-us.apache.org/repos/asf/cassandra/blob/202cf9b0/NEWS.txt ---------------------------------------------------------------------- diff --git a/NEWS.txt b/NEWS.txt index 57e321e..8cbe4f7 100644 --- a/NEWS.txt +++ b/NEWS.txt @@ -269,6 +269,13 @@ Upgrading to exclude data centers when the global status is enabled, see CASSANDRA-9035 for details. +2.1.13 +====== + +New features +------------ + - New options for cqlsh COPY FROM and COPY TO, see CASSANDRA-9303 for details. + 2.1.10 ===== http://git-wip-us.apache.org/repos/asf/cassandra/blob/202cf9b0/bin/cqlsh.py ---------------------------------------------------------------------- diff --git a/bin/cqlsh.py b/bin/cqlsh.py index 8469836..c38bc2e 100644 --- a/bin/cqlsh.py +++ b/bin/cqlsh.py @@ -41,7 +41,6 @@ import optparse import os import platform import sys -import time import traceback import warnings import webbrowser @@ -151,7 +150,8 @@ cqlshlibdir = os.path.join(CASSANDRA_PATH, 'pylib') if os.path.isdir(cqlshlibdir): sys.path.insert(0, cqlshlibdir) -from cqlshlib import cql3handling, cqlhandling, copyutil, pylexotron, sslhandling +from cqlshlib import cql3handling, cqlhandling, pylexotron, sslhandling +from cqlshlib.copyutil import ExportTask, ImportTask from cqlshlib.displaying import (ANSI_RESET, BLUE, COLUMN_NAME_COLORS, CYAN, RED, FormattedValue, colorme) from cqlshlib.formatting import (DEFAULT_DATE_FORMAT, DEFAULT_NANOTIME_FORMAT, @@ -456,10 +456,12 @@ def complete_copy_column_names(ctxt, cqlsh): return set(colnames[1:]) - set(existcols) -COPY_COMMON_OPTIONS = ['DELIMITER', 'QUOTE', 'ESCAPE', 'HEADER', 'NULL', - 'MAXATTEMPTS', 'REPORTFREQUENCY'] -COPY_FROM_OPTIONS = ['CHUNKSIZE', 'INGESTRATE', 'MAXBATCHSIZE', 'MINBATCHSIZE'] -COPY_TO_OPTIONS = ['ENCODING', 'TIMEFORMAT', 'PAGESIZE', 'PAGETIMEOUT', 'MAXREQUESTS'] +COPY_COMMON_OPTIONS = ['DELIMITER', 'QUOTE', 'ESCAPE', 'HEADER', 'NULL', 'DATETIMEFORMAT', + 'MAXATTEMPTS', 'REPORTFREQUENCY', 'DECIMALSEP', 'THOUSANDSSEP', 'BOOLSTYLE', + 'NUMPROCESSES', 'CONFIGFILE', 'RATEFILE'] +COPY_FROM_OPTIONS = ['CHUNKSIZE', 'INGESTRATE', 'MAXBATCHSIZE', 'MINBATCHSIZE', 'MAXROWS', + 'SKIPROWS', 'SKIPCOLS', 'MAXPARSEERRORS', 'MAXINSERTERRORS', 'ERRFILE'] +COPY_TO_OPTIONS = ['ENCODING', 'PAGESIZE', 'PAGETIMEOUT', 'BEGINTOKEN', 'ENDTOKEN', 'MAXOUTPUTSIZE', 'MAXREQUESTS'] @cqlsh_syntax_completer('copyOption', 'optnames') @@ -575,23 +577,6 @@ warnings.showwarning = show_warning_without_quoting_line warnings.filterwarnings('always', category=cql3handling.UnexpectedTableStructure) -def describe_interval(seconds): - desc = [] - for length, unit in ((86400, 'day'), (3600, 'hour'), (60, 'minute')): - num = int(seconds) / length - if num > 0: - desc.append('%d %s' % (num, unit)) - if num > 1: - desc[-1] += 's' - seconds %= length - words = '%.03f seconds' % seconds - if len(desc) > 1: - words = ', '.join(desc) + ', and ' + words - elif len(desc) == 1: - words = desc[0] + ' and ' + words - return words - - def insert_driver_hooks(): extend_cql_deserialization() auto_format_udts() @@ -658,8 +643,7 @@ class Shell(cmd.Cmd): last_hist = None shunted_query_out = None use_paging = True - csv_dialect_defaults = dict(delimiter=',', doublequote=False, - escapechar='\\', quotechar='"') + default_page_size = 100 def __init__(self, hostname, port, color=False, @@ -1711,32 +1695,67 @@ class Shell(cmd.Cmd): COPY x TO: Exports data from a Cassandra table in CSV format. COPY <table_name> [ ( column [, ...] ) ] - FROM ( '<filename>' | STDIN ) + FROM ( '<file_pattern_1, file_pattern_2, ... file_pattern_n>' | STDIN ) [ WITH <option>='value' [AND ...] ]; + File patterns are either file names or valid python glob expressions, e.g. *.csv or folder/*.csv. + COPY <table_name> [ ( column [, ...] ) ] TO ( '<filename>' | STDOUT ) [ WITH <option>='value' [AND ...] ]; - Available options and defaults: + Available common COPY options and defaults: DELIMITER=',' - character that appears between records QUOTE='"' - quoting character to be used to quote fields ESCAPE='\' - character to appear before the QUOTE char when quoted HEADER=false - whether to ignore the first line NULL='' - string that represents a null value - ENCODING='utf8' - encoding for CSV output (COPY TO) - TIMEFORMAT= - timestamp strftime format (COPY TO) + DATETIMEFORMAT= - timestamp strftime format '%Y-%m-%d %H:%M:%S%z' defaults to time_format value in cqlshrc - MAXREQUESTS=6 - the maximum number of requests each worker process can work on in parallel (COPY TO) - PAGESIZE=1000 - the page size for fetching results (COPY TO) - PAGETIMEOUT=10 - the page timeout for fetching results (COPY TO) - MAXATTEMPTS=5 - the maximum number of attempts for errors - CHUNKSIZE=1000 - the size of chunks passed to worker processes (COPY FROM) - INGESTRATE=100000 - an approximate ingest rate in rows per second (COPY FROM) - MAXBATCHSIZE=20 - the maximum size of an import batch (COPY FROM) - MINBATCHSIZE=2 - the minimum size of an import batch (COPY FROM) + MAXATTEMPTS=5 - the maximum number of attempts per batch or range REPORTFREQUENCY=0.25 - the frequency with which we display status updates in seconds + DECIMALSEP='.' - the separator for decimal values + THOUSANDSSEP='' - the separator for thousands digit groups + BOOLSTYLE='True,False' - the representation for booleans, case insensitive, specify true followed by false, + for example yes,no or 1,0 + NUMPROCESSES=n - the number of worker processes, by default the number of cores minus one + capped at 16 + CONFIGFILE='' - a configuration file with the same format as .cqlshrc (see the Python ConfigParser + documentation) where you can specify WITH options under the following optional + sections: [copy], [copy-to], [copy-from], [copy:ks.table], [copy-to:ks.table], + [copy-from:ks.table], where <ks> is your keyspace name and <table> is your table + name. Options are read from these sections, in the order specified + above, and command line options always override options in configuration files. + Depending on the COPY direction, only the relevant copy-from or copy-to sections + are used. If no configfile is specified then .cqlshrc is searched instead. + RATEFILE='' - an optional file where to print the output statistics + + Available COPY FROM options and defaults: + + CHUNKSIZE=1000 - the size of chunks passed to worker processes + INGESTRATE=100000 - an approximate ingest rate in rows per second + MINBATCHSIZE=2 - the minimum size of an import batch + MAXBATCHSIZE=20 - the maximum size of an import batch + MAXROWS=-1 - the maximum number of rows, -1 means no maximum + SKIPROWS=0 - the number of rows to skip + SKIPCOLS='' - a comma separated list of column names to skip + MAXPARSEERRORS=-1 - the maximum global number of parsing errors, -1 means no maximum + MAXINSERTERRORS=-1 - the maximum global number of insert errors, -1 means no maximum + ERRFILE='' - a file where to store all rows that could not be imported, by default this is + import_ks_table.err where <ks> is your keyspace and <table> is your table name. + + Available COPY TO options and defaults: + + ENCODING='utf8' - encoding for CSV output + PAGESIZE='1000' - the page size for fetching results + PAGETIMEOUT=10 - the page timeout in seconds for fetching results + BEGINTOKEN='' - the minimum token string to consider when exporting data + ENDTOKEN='' - the maximum token string to consider when exporting data + MAXREQUESTS=6 - the maximum number of requests each worker process can work on in parallel + MAXOUTPUTSIZE='-1' - the maximum size of the output file measured in number of lines, + beyond this maximum the output file will be split into segments, + -1 means unlimited. When entering CSV data on STDIN, you can use the sequence "\." on a line by itself to end the data input. @@ -1747,55 +1766,31 @@ class Shell(cmd.Cmd): ks = self.current_keyspace if ks is None: raise NoKeyspaceError("Not in any keyspace.") - cf = self.cql_unprotect_name(parsed.get_binding('cfname')) + table = self.cql_unprotect_name(parsed.get_binding('cfname')) columns = parsed.get_binding('colnames', None) if columns is not None: columns = map(self.cql_unprotect_name, columns) else: # default to all known columns - columns = self.get_column_names(ks, cf) + columns = self.get_column_names(ks, table) + fname = parsed.get_binding('fname', None) if fname is not None: - fname = os.path.expanduser(self.cql_unprotect_value(fname)) + fname = self.cql_unprotect_value(fname) + copyoptnames = map(str.lower, parsed.get_binding('optnames', ())) copyoptvals = map(self.cql_unprotect_value, parsed.get_binding('optvals', ())) - cleancopyoptvals = [optval.decode('string-escape') for optval in copyoptvals] - opts = dict(zip(copyoptnames, cleancopyoptvals)) - - print "\nStarting copy of %s.%s with columns %s." % (ks, cf, columns) - - timestart = time.time() + opts = dict(zip(copyoptnames, copyoptvals)) direction = parsed.get_binding('dir').upper() if direction == 'FROM': - rows = self.perform_csv_import(ks, cf, columns, fname, opts) - verb = 'imported' + task = ImportTask(self, ks, table, columns, fname, opts, DEFAULT_PROTOCOL_VERSION, CONFIG_FILE) elif direction == 'TO': - rows = self.perform_csv_export(ks, cf, columns, fname, opts) - verb = 'exported' + task = ExportTask(self, ks, table, columns, fname, opts, DEFAULT_PROTOCOL_VERSION, CONFIG_FILE) else: raise SyntaxError("Unknown direction %s" % direction) - timeend = time.time() - print "\n%d rows %s in %s." % (rows, verb, describe_interval(timeend - timestart)) - - def perform_csv_import(self, ks, cf, columns, fname, opts): - csv_options, dialect_options, unrecognized_options = copyutil.parse_options(self, opts) - if unrecognized_options: - self.printerr('Unrecognized COPY FROM options: %s' % ', '.join(unrecognized_options.keys())) - return 0 - - return copyutil.ImportTask(self, ks, cf, columns, fname, csv_options, dialect_options, - DEFAULT_PROTOCOL_VERSION, CONFIG_FILE).run() - - def perform_csv_export(self, ks, cf, columns, fname, opts): - csv_options, dialect_options, unrecognized_options = copyutil.parse_options(self, opts) - if unrecognized_options: - self.printerr('Unrecognized COPY TO options: %s' % ', '.join(unrecognized_options.keys())) - return 0 - - return copyutil.ExportTask(self, ks, cf, columns, fname, csv_options, dialect_options, - DEFAULT_PROTOCOL_VERSION, CONFIG_FILE).run() + task.run() def do_show(self, parsed): """ http://git-wip-us.apache.org/repos/asf/cassandra/blob/202cf9b0/conf/cqlshrc.sample ---------------------------------------------------------------------- diff --git a/conf/cqlshrc.sample b/conf/cqlshrc.sample index 302d25f..4c66861 100644 --- a/conf/cqlshrc.sample +++ b/conf/cqlshrc.sample @@ -43,7 +43,7 @@ completekey = tab ;browser = [cql] -version = 3.1.5 +version = 3.2.1 [connection] hostname = 127.0.0.1 @@ -68,3 +68,18 @@ max_trace_wait = 10.0 ; vim: set ft=dosini : + +;; optional options for COPY TO and COPY FROM +;[copy] +;maxattempts=10 +;numprocesses=4 + +;; optional options for COPY FROM +;[copy-from] +;chunksize=5000 +;ingestrate=50000 + +;; optional options for COPY TO +;[copy-to] +;pagesize=2000 +;pagetimeout=20