Updated Branches: refs/heads/cassandra-1.2 edc37bc84 -> 8d03008c2
cqlsh: Fix COPY FROM value quoting and null handling patch by Aleksey Yeschenko; reviewed by Brandon Williams for CASSANDRA-5305 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/8d03008c Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/8d03008c Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/8d03008c Branch: refs/heads/cassandra-1.2 Commit: 8d03008c20cbc21e82953d2fd3bcb54888c6c930 Parents: edc37bc Author: Aleksey Yeschenko <alek...@apache.org> Authored: Mon Mar 11 23:08:53 2013 +0300 Committer: Aleksey Yeschenko <alek...@apache.org> Committed: Mon Mar 11 23:08:53 2013 +0300 ---------------------------------------------------------------------- CHANGES.txt | 1 + bin/cqlsh | 81 ++++++++++++++++++++++++++++++++++++++---------------- 2 files changed, 58 insertions(+), 24 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/8d03008c/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 1279da7..93c30c0 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -16,6 +16,7 @@ * Re-enable unknown option in replication/compaction strategies option for backward compatibility (CASSANDRA-4795) * Add binary protocol support to stress (CASSANDRA-4993) + * cqlsh: Fix COPY FROM value quoting and null handling (CASSANDRA-5305) Merged from 1.1: * fix logging of "Found table data in data directories" when only system tables are present (CASSANDRA-5289) http://git-wip-us.apache.org/repos/asf/cassandra/blob/8d03008c/bin/cqlsh ---------------------------------------------------------------------- diff --git a/bin/cqlsh b/bin/cqlsh index 6db59a3..2456009 100755 --- a/bin/cqlsh +++ b/bin/cqlsh @@ -299,7 +299,7 @@ def complete_assume_col(ctxt, cqlsh): cfdef = cqlsh.get_columnfamily(cf, ksname=ks) cols = [cm.name for cm in cfdef.column_metadata] cols.append(cfdef.key_alias or 'KEY') - return map(cqlsh.cql_protect_name, cols) + return cqlsh.cql_protect_names(cols) def complete_source_quoted_filename(ctxt, cqlsh): partial = ctxt.get_binding('partial', '') @@ -351,7 +351,7 @@ def complete_copy_options(ctxt, cqlsh): direction = ctxt.get_binding('dir').upper() opts = set(COPY_OPTIONS) - set(optnames) if direction == 'FROM': - opts -= ('ENCODING', 'NULL') + opts -= ('ENCODING',) return opts @cqlsh_syntax_completer('copyOption', 'optvals') @@ -1180,6 +1180,9 @@ class Shell(cmd.Cmd): name = name.encode('utf8') return cqlruleset.maybe_escape_name(name) + def cql_protect_names(self, names): + return map(self.cql_protect_name, names) + def cql_protect_value(self, value): return cqlruleset.escape_value(value) @@ -1311,12 +1314,12 @@ class Shell(cmd.Cmd): if len(layout.primary_key_components) > 1: out.write(",\n PRIMARY KEY (") - partkeynames = map(self.cql_protect_name, layout.partition_key_components) + partkeynames = self.cql_protect_names(layout.partition_key_components) if len(partkeynames) > 1: partkey = "(%s)" % ', '.join(partkeynames) else: partkey = partkeynames[0] - pk_parts = [partkey] + map(self.cql_protect_name, layout.column_aliases) + pk_parts = [partkey] + self.cql_protect_names(layout.column_aliases) out.write(', '.join(pk_parts) + ')') out.write("\n)") @@ -1524,12 +1527,16 @@ class Shell(cmd.Cmd): QUOTE='"' - quoting character to be used to quote fields ESCAPE='\' - character to appear before the QUOTE char when quoted HEADER=false - whether to ignore the first line + NULL='' - string that represents a null value ENCODING='utf8' - encoding for CSV output (COPY TO only) - NULL='' - string that represents a null value (COPY TO only) When entering CSV data on STDIN, you can use the sequence "\." on a line by itself to end the data input. """ + if not self.cqlver_atleast(3): + self.printerr('COPY requires CQL version 3.0.0 or higher.') + return + ks = self.cql_unprotect_name(parsed.get_binding('ksname', None)) if ks is None: ks = self.current_keyspace @@ -1539,6 +1546,9 @@ class Shell(cmd.Cmd): columns = parsed.get_binding('colnames', None) if columns is not None: columns = map(self.cql_unprotect_name, columns) + else: + # default to all known columns + columns = self.get_column_names(ks, cf) fname = parsed.get_binding('fname', None) if fname is not None: fname = os.path.expanduser(self.cql_unprotect_value(fname)) @@ -1569,6 +1579,7 @@ class Shell(cmd.Cmd): dialect_options['escapechar'] = opts.pop('escape') if 'delimiter' in opts: dialect_options['delimiter'] = opts.pop('delimiter') + nullval = opts.pop('null', '') header = bool(opts.pop('header', '').lower() == 'true') if dialect_options['quotechar'] == dialect_options['escapechar']: dialect_options['doublequote'] = True @@ -1592,16 +1603,16 @@ class Shell(cmd.Cmd): try: if header: linesource.next() - numcol, prepq = self.prep_import_insert(ks, cf, columns) + layout = self.get_columnfamily_layout(ks, cf) rownum = -1 reader = csv.reader(linesource, **dialect_options) for rownum, row in enumerate(reader): - if len(row) != numcol: + if len(row) != len(columns): self.printerr("Record #%d (line %d) has the wrong number of fields " "(%d instead of %d)." - % (rownum, reader.line_num, len(row), numcol)) + % (rownum, reader.line_num, len(row), len(columns))) return rownum - if not self.do_import_insert(prepq, row): + if not self.do_import_row(columns, nullval, layout, row): self.printerr("Aborting import at record #%d (line %d). " "Previously-inserted values still present." % (rownum, reader.line_num)) @@ -1613,28 +1624,50 @@ class Shell(cmd.Cmd): print return rownum + 1 - def prep_import_insert(self, ks, cf, columns): - if columns is None: - # default to all known columns - columns = self.get_column_names(ks, cf) + def do_import_row(self, columns, nullval, layout, row): + rowmap = {} + for name, value in zip(columns, row): + if value != nullval: + type = layout.get_column(name).cqltype.cql_parameterized_type() + if type in ('ascii', 'text', 'timestamp', 'inet'): + rowmap[name] = self.cql_protect_value(value) + else: + rowmap[name] = value + if not self.do_import_insert(layout, rowmap): + return False + nulls = set(columns) - set(rowmap.keys()) + if nulls: + return self.do_import_delete(layout, rowmap, nulls) + return True + def do_import_insert(self, layout, rowmap): # would be nice to be able to use a prepared query here, but in order # to use that interface, we'd need to have all the input as native # values already, reading them from text just like the various # Cassandra cql types do. Better just to submit them all as intact # CQL string literals and let Cassandra do its thing. - return len(columns), 'INSERT INTO %s.%s (%s) VALUES (%%s)' % ( - self.cql_protect_name(ks), - self.cql_protect_name(cf), - ', '.join(map(self.cql_protect_name, columns)) + query = 'INSERT INTO %s.%s (%s) VALUES (%s)' % ( + self.cql_protect_name(layout.keyspace_name), + self.cql_protect_name(layout.columnfamily_name), + ', '.join(self.cql_protect_names(rowmap.keys())), + ', '.join(rowmap.values()) + ) + if self.debug: + print 'Import using CQL: %s' % query + return self.perform_statement_untraced(query) + + def do_import_delete(self, layout, rowmap, nulls): + pk_components = layout.primary_key_components + where = [(self.cql_protect_name(pk), rowmap[pk]) for pk in pk_components] + query = 'DELETE %s FROM %s.%s WHERE %s' % ( + ', '.join(self.cql_protect_names(nulls)), + self.cql_protect_name(layout.keyspace_name), + self.cql_protect_name(layout.columnfamily_name), + ' AND '.join([ '%s = %s' % kv for kv in where]) ) - - def do_import_insert(self, prepq, rowvalues): - valstring = ', '.join(map(self.cql_protect_value, rowvalues)) - cql = prepq % valstring if self.debug: - print "Import using CQL: %s" % cql - return self.perform_statement_untraced(cql) + print 'Import using CQL: %s' % query + return self.perform_statement_untraced(query) def perform_csv_export(self, ks, cf, columns, fname, opts): dialect_options = self.csv_dialect_defaults.copy() @@ -1690,7 +1723,7 @@ class Shell(cmd.Cmd): def prep_export_dump(self, ks, cf, columns): if columns is None: columns = self.get_column_names(ks, cf) - columnlist = ', '.join(map(self.cql_protect_name, columns)) + columnlist = ', '.join(self.cql_protect_names(columns)) # this limit is pretty awful. would be better to use row-key-paging, so # that the dump could be pretty easily aborted if necessary, but that # can be kind of tricky with cql3. Punt for now, until the real cursor