cqlsh: add a COPY TO command Patch by paul cannon, reviewed by brandonwilliams for CASSANDRA-4434
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/9a633947 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/9a633947 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/9a633947 Branch: refs/heads/trunk Commit: 9a63394765de28160d576c9285be68587e222a86 Parents: 41c9ba6 Author: Brandon Williams <brandonwilli...@apache.org> Authored: Tue Jul 24 13:57:19 2012 -0500 Committer: Brandon Williams <brandonwilli...@apache.org> Committed: Tue Jul 24 13:57:19 2012 -0500 ---------------------------------------------------------------------- CHANGES.txt | 1 + bin/cqlsh | 126 ++++++++++++++++++++++++++++++++++++++++++++--------- 2 files changed, 105 insertions(+), 22 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/9a633947/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 0885387..638574c 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -23,6 +23,7 @@ Merged from 1.0: * Fix LCS splitting sstable base on uncompressed size (CASSANDRA-4419) * Bootstraps that fail are detected upon restart and will retry safely without needing to delete existing data first (CASSANDRA-4427) + * (cqlsh) add a COPY TO command to copy a CF to a CSV file (CASSANDRA-4434) 1.1.2 http://git-wip-us.apache.org/repos/asf/cassandra/blob/9a633947/bin/cqlsh ---------------------------------------------------------------------- diff --git a/bin/cqlsh b/bin/cqlsh index 574d49b..c67a818 100755 --- a/bin/cqlsh +++ b/bin/cqlsh @@ -224,7 +224,8 @@ cqlsh_extra_syntax_rules = r''' <copyCommand> ::= "COPY" cf=<columnFamilyName> ( "(" [colnames]=<colname> ( "," [colnames]=<colname> )* ")" )? - "FROM" ( fname=<stringLiteral> | "STDIN" ) + ( dir="FROM" ( fname=<stringLiteral> | "STDIN" ) + | dir="TO" ( fname=<stringLiteral> | "STDOUT" ) ) ( "WITH" <copyOption> ( "AND" <copyOption> )* )? ; @@ -303,12 +304,16 @@ def complete_copy_column_names(ctxt, cqlsh): return [colnames[0]] return set(colnames[1:]) - set(existcols) -COPY_OPTIONS = ('DELIMITER', 'QUOTE', 'ESCAPE', 'HEADER') +COPY_OPTIONS = ('DELIMITER', 'QUOTE', 'ESCAPE', 'HEADER', 'ENCODING', 'NULL') @cqlsh_syntax_completer('copyOption', 'optnames') def complete_copy_options(ctxt, cqlsh): optnames = map(str.upper, ctxt.get_binding('optnames', ())) - return set(COPY_OPTIONS) - set(optnames) + direction = ctxt.get_binding('dir').upper() + opts = set(COPY_OPTIONS) - set(optnames) + if direction == 'FROM': + opts -= ('ENCODING', 'NULL') + return opts @cqlsh_syntax_completer('copyOption', 'optvals') def complete_copy_opt_values(ctxt, cqlsh): @@ -448,13 +453,13 @@ def unix_time_from_uuid1(u): return (u.get_time() - 0x01B21DD213814000) / 10000000.0 def format_value(val, casstype, output_encoding, addcolor=False, time_format='', - float_precision=3, colormap=DEFAULT_VALUE_COLORS): + float_precision=3, colormap=DEFAULT_VALUE_COLORS, nullval='null'): color = colormap['default'] coloredval = None displaywidth = None if val is None: - bval = 'null' + bval = nullval color = colormap['error'] elif isinstance(val, DecodeError): casstype = 'BytesType' @@ -727,7 +732,7 @@ class Shell(cmd.Cmd): def get_column_names(self, ksname, cfname): if ksname is None: ksname = self.current_keyspace - if self.cqlver_atleast(3): + if ksname != 'system' and self.cqlver_atleast(3): return self.get_column_names_from_layout(ksname, cfname) else: return self.get_column_names_from_cfdef(ksname, cfname) @@ -1433,6 +1438,9 @@ class Shell(cmd.Cmd): COPY <table_name> [ ( column [, ...] ) ] FROM ( '<filename>' | STDIN ) [ WITH <option>='value' [AND ...] ]; + COPY <table_name> [ ( column [, ...] ) ] + TO ( '<filename>' | STDOUT ) + [ WITH <option>='value' [AND ...] ]; Available options and defaults: @@ -1440,6 +1448,8 @@ class Shell(cmd.Cmd): QUOTE='"' - quoting character to be used to quote fields ESCAPE='\' - character to appear before the QUOTE char when quoted HEADER=false - whether to ignore the first line + ENCODING='utf8' - encoding for CSV output (COPY TO only) + NULL='' - string that represents a null value (COPY TO only) When entering CSV data on STDIN, you can use the sequence "\." on a line by itself to end the data input. @@ -1448,12 +1458,11 @@ class Shell(cmd.Cmd): ks = self.cql_unprotect_name(parsed.get_binding('ksname', None)) if ks is None: ks = self.current_keyspace + if ks is None: + raise NoKeyspaceError("Not in any keyspace.") cf = self.cql_unprotect_name(parsed.get_binding('cfname')) columns = parsed.get_binding('colnames', None) - if columns is None: - # default to all known columns - columns = self.get_column_names(ks, cf) - else: + if columns is not None: columns = map(self.cql_unprotect_name, columns) fname = parsed.get_binding('fname', None) if fname is not None: @@ -1462,14 +1471,20 @@ class Shell(cmd.Cmd): copyoptvals = map(self.cql_unprotect_value, parsed.get_binding('optvals', ())) opts = dict(zip(copyoptnames, copyoptvals)) - # when/if COPY TO is supported, this would be a good place to branch - # on direction. - timestart = time.time() - rows = self.perform_csv_import(ks, cf, columns, fname, opts) - timeend = time.time() - print "%d rows imported in %s." % (rows, describe_interval(timeend - timestart)) + direction = parsed.get_binding('dir').upper() + if direction == 'FROM': + rows = self.perform_csv_import(ks, cf, columns, fname, opts) + verb = 'imported' + elif direction == 'TO': + rows = self.perform_csv_export(ks, cf, columns, fname, opts) + verb = 'exported' + else: + raise SyntaxError("Unknown direction %s" % direction) + + timeend = time.time() + print "%d rows %s in %s." % (rows, verb, describe_interval(timeend - timestart)) def perform_csv_import(self, ks, cf, columns, fname, opts): dialect_options = self.csv_dialect_defaults.copy() @@ -1483,7 +1498,6 @@ class Shell(cmd.Cmd): if dialect_options['quotechar'] == dialect_options['escapechar']: dialect_options['doublequote'] = True del dialect_options['escapechar'] - if opts: self.printerr('Unrecognized COPY FROM options: %s' % ', '.join(opts.keys())) @@ -1496,15 +1510,15 @@ class Shell(cmd.Cmd): else: do_close = True try: - linesource = open(fname, 'r') + linesource = open(fname, 'rb') except IOError, e: self.printerr("Can't open %r for reading: %s" % (fname, e)) return 0 - if header: - linesource.next() - - prepq = self.prep_import_insert(ks, cf, columns) try: + if header: + linesource.next() + prepq = self.prep_import_insert(ks, cf, columns) + rownum = -1 reader = csv.reader(linesource, **dialect_options) for rownum, row in enumerate(reader): if len(row) != len(columns): @@ -1525,6 +1539,10 @@ class Shell(cmd.Cmd): return rownum + 1 def prep_import_insert(self, ks, cf, columns): + if columns is None: + # default to all known columns + columns = self.get_column_names(ks, cf) + # would be nice to be able to use a prepared query here, but in order # to use that interface, we'd need to have all the input as native # values already, reading them from text just like the various @@ -1543,6 +1561,70 @@ class Shell(cmd.Cmd): print "Import using CQL: %s" % cql return self.perform_statement(cql) + def perform_csv_export(self, ks, cf, columns, fname, opts): + dialect_options = self.csv_dialect_defaults.copy() + if 'quote' in opts: + dialect_options['quotechar'] = opts.pop('quote') + if 'escape' in opts: + dialect_options['escapechar'] = opts.pop('escape') + if 'delimiter' in opts: + dialect_options['delimiter'] = opts.pop('delimiter') + encoding = opts.pop('encoding', 'utf8') + nullval = opts.pop('null', '') + header = bool(opts.pop('header', '').lower() == 'true') + if dialect_options['quotechar'] == dialect_options['escapechar']: + dialect_options['doublequote'] = True + del dialect_options['escapechar'] + + if opts: + self.printerr('Unrecognized COPY TO options: %s' + % ', '.join(opts.keys())) + return 0 + + if fname is None: + do_close = False + csvdest = sys.stdout + else: + do_close = True + try: + csvdest = open(fname, 'wb') + except IOError, e: + self.printerr("Can't open %r for writing: %s" % (fname, e)) + return 0 + try: + self.prep_export_dump(ks, cf, columns) + writer = csv.writer(csvdest, **dialect_options) + if header: + writer.writerow([d[0] for d in self.cursor.description]) + rows = 0 + while True: + row = self.cursor.fetchone() + if row is None: + break + fmt = lambda v, d: \ + format_value(v, d[1], output_encoding=encoding, nullval=nullval, + time_format=self.display_time_format, + float_precision=self.display_float_precision).strval + writer.writerow(map(fmt, row, self.cursor.description)) + rows += 1 + finally: + if do_close: + csvdest.close() + return rows + + def prep_export_dump(self, ks, cf, columns): + if columns is None: + columnlist = '*' + else: + columnlist = ', '.join(map(self.cql_protect_name, columns)) + # this limit is pretty awful. would be better to use row-key-paging, so + # that the dump could be pretty easily aborted if necessary, but that + # can be kind of tricky with cql3. Punt for now, until the real cursor + # API is added in CASSANDRA-4415. + query = 'SELECT %s FROM %s.%s LIMIT 99999999' \ + % (columnlist, self.cql_protect_name(ks), self.cql_protect_name(cf)) + self.cursor.execute(query) + def do_show(self, parsed): """ SHOW [cqlsh only]