Author: jbellis Date: Fri Apr 1 17:15:06 2011 New Revision: 1087815 URL: http://svn.apache.org/viewvc?rev=1087815&view=rev Log: really revert SSTableExport move
Added: cassandra/trunk/src/java/org/apache/cassandra/tools/SSTableExport.java cassandra/trunk/src/java/org/apache/cassandra/tools/SSTableImport.java Added: cassandra/trunk/src/java/org/apache/cassandra/tools/SSTableExport.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/tools/SSTableExport.java?rev=1087815&view=auto ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/tools/SSTableExport.java (added) +++ cassandra/trunk/src/java/org/apache/cassandra/tools/SSTableExport.java Fri Apr 1 17:15:06 2011 @@ -0,0 +1,420 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.tools; + +import java.io.File; +import java.io.IOException; +import java.io.PrintStream; +import java.nio.ByteBuffer; +import java.util.*; + +import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.db.*; +import org.apache.cassandra.db.marshal.AbstractType; +import org.apache.cassandra.io.util.BufferedRandomAccessFile; +import org.apache.cassandra.service.StorageService; + +import org.apache.commons.cli.*; + +import org.apache.cassandra.config.ConfigurationException; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.dht.IPartitioner; +import org.apache.cassandra.io.sstable.*; +import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.cassandra.utils.Pair; + +import static org.apache.cassandra.utils.ByteBufferUtil.bytesToHex; +import static org.apache.cassandra.utils.ByteBufferUtil.hexToBytes; + +/** + * Export SSTables to JSON format. + */ +public class SSTableExport +{ + // size of the columns page + private static final int PAGE_SIZE = 1000; + + private static final String KEY_OPTION = "k"; + private static final String EXCLUDEKEY_OPTION = "x"; + private static final String ENUMERATEKEYS_OPTION = "e"; + private static Options options; + private static CommandLine cmd; + + static + { + options = new Options(); + + Option optKey = new Option(KEY_OPTION, true, "Row key"); + // Number of times -k <key> can be passed on the command line. + optKey.setArgs(500); + options.addOption(optKey); + + Option excludeKey = new Option(EXCLUDEKEY_OPTION, true, "Excluded row key"); + // Number of times -x <key> can be passed on the command line. + excludeKey.setArgs(500); + options.addOption(excludeKey); + + Option optEnumerate = new Option(ENUMERATEKEYS_OPTION, false, "enumerate keys only"); + options.addOption(optEnumerate); + } + + /** + * Wraps given string into quotes + * @param val string to quote + * @return quoted string + */ + private static String quote(String val) + { + return String.format("\"%s\"", val); + } + + /** + * JSON Hash Key serializer + * @param val value to set as a key + * @return JSON Hash key + */ + private static String asKey(String val) + { + return String.format("%s: ", quote(val)); + } + + /** + * Serialize columns using given column iterator + * @param columns column iterator + * @param out output stream + * @param comparator columns comparator + * @param cfMetaData Column Family metadata (to get validator) + * @return pair of (number of columns serialized, last column serialized) + */ + private static void serializeColumns(Iterator<IColumn> columns, PrintStream out, AbstractType comparator, CFMetaData cfMetaData) + { + while (columns.hasNext()) + { + IColumn column = columns.next(); + serializeColumn(column, out, comparator, cfMetaData); + + if (columns.hasNext()) + out.print(", "); + } + } + + /** + * Serialize a given column to the JSON format + * @param column column presentation + * @param out output stream + * @param comparator columns comparator + * @param cfMetaData Column Family metadata (to get validator) + */ + private static void serializeColumn(IColumn column, PrintStream out, AbstractType comparator, CFMetaData cfMetaData) + { + ByteBuffer name = ByteBufferUtil.clone(column.name()); + ByteBuffer value = ByteBufferUtil.clone(column.value()); + AbstractType validator = cfMetaData.getValueValidator(name); + + out.print("["); + out.print(quote(comparator.getString(name))); + out.print(", "); + out.print(quote(validator.getString(value))); + out.print(", "); + out.print(column.timestamp()); + + if (column instanceof DeletedColumn) + { + out.print(", "); + out.print("\"d\""); + } + else if (column instanceof ExpiringColumn) + { + out.print(", "); + out.print("\"e\""); + out.print(", "); + out.print(((ExpiringColumn) column).getTimeToLive()); + out.print(", "); + out.print(column.getLocalDeletionTime()); + } + else if (column instanceof CounterColumn) + { + out.print(", "); + out.print("\"c\""); + out.print(", "); + out.print(((CounterColumn) column).timestampOfLastDelete()); + } + + out.print("]"); + } + + /** + * Get portion of the columns and serialize in loop while not more columns left in the row + * @param row SSTableIdentityIterator row representation with Column Family + * @param key Decorated Key for the required row + * @param out output stream + */ + private static void serializeRow(SSTableIdentityIterator row, DecoratedKey key, PrintStream out) + { + ColumnFamily columnFamily = row.getColumnFamily(); + boolean isSuperCF = columnFamily.isSuper(); + CFMetaData cfMetaData = columnFamily.metadata(); + AbstractType comparator = columnFamily.getComparator(); + + out.print(asKey(bytesToHex(key.key))); + out.print(isSuperCF ? "{" : "["); + + if (isSuperCF) + { + while (row.hasNext()) + { + IColumn column = row.next(); + + out.print(asKey(comparator.getString(column.name()))); + out.print("{"); + out.print(asKey("deletedAt")); + out.print(column.getMarkedForDeleteAt()); + out.print(", "); + out.print(asKey("subColumns")); + out.print("["); + serializeColumns(column.getSubColumns().iterator(), out, columnFamily.getSubComparator(), cfMetaData); + out.print("]"); + out.print("}"); + + if (row.hasNext()) + out.print(", "); + } + } + else + { + serializeColumns(row, out, comparator, cfMetaData); + } + + out.print(isSuperCF ? "}" : "]"); + } + + /** + * Enumerate row keys from an SSTableReader and write the result to a PrintStream. + * + * @param ssTableFile the file to export the rows from + * @param outs PrintStream to write the output to + * @throws IOException on failure to read/write input/output + */ + public static void enumeratekeys(String ssTableFile, PrintStream outs) + throws IOException + { + Descriptor desc = Descriptor.fromFilename(ssTableFile); + KeyIterator iter = new KeyIterator(desc); + DecoratedKey lastKey = null; + while (iter.hasNext()) + { + DecoratedKey key = iter.next(); + + // validate order of the keys in the sstable + if (lastKey != null && lastKey.compareTo(key) > 0 ) + throw new IOException("Key out of order! " + lastKey + " > " + key); + lastKey = key; + + outs.println(bytesToHex(key.key)); + } + iter.close(); + outs.flush(); + } + + /** + * Export specific rows from an SSTable and write the resulting JSON to a PrintStream. + * + * @param ssTableFile the SSTableScanner to export the rows from + * @param outs PrintStream to write the output to + * @param toExport the keys corresponding to the rows to export + * @param excludes keys to exclude from export + * @throws IOException on failure to read/write input/output + */ + public static void export(String ssTableFile, PrintStream outs, Collection<String> toExport, String[] excludes) throws IOException + { + SSTableReader reader = SSTableReader.open(Descriptor.fromFilename(ssTableFile)); + SSTableScanner scanner = reader.getDirectScanner(BufferedRandomAccessFile.DEFAULT_BUFFER_SIZE); + + IPartitioner<?> partitioner = StorageService.getPartitioner(); + + if (excludes != null) + toExport.removeAll(Arrays.asList(excludes)); + + outs.println("{"); + + int i = 0; + + // last key to compare order + DecoratedKey lastKey = null; + + for (String key : toExport) + { + DecoratedKey decoratedKey = partitioner.decorateKey(hexToBytes(key)); + + if (lastKey != null && lastKey.compareTo(decoratedKey) > 0) + throw new IOException("Key out of order! " + lastKey + " > " + decoratedKey); + + lastKey = decoratedKey; + + scanner.seekTo(decoratedKey); + + if (!scanner.hasNext()) + continue; + + SSTableIdentityIterator row = (SSTableIdentityIterator) scanner.next(); + if (!row.getKey().equals(decoratedKey)) + continue; + + serializeRow(row, decoratedKey, outs); + + if (i != 0) + outs.println(","); + + i++; + } + + outs.println("\n}"); + outs.flush(); + + scanner.close(); + } + + // This is necessary to accommodate the test suite since you cannot open a Reader more + // than once from within the same process. + static void export(SSTableReader reader, PrintStream outs, String[] excludes) throws IOException + { + Set<String> excludeSet = new HashSet<String>(); + + if (excludes != null) + excludeSet = new HashSet<String>(Arrays.asList(excludes)); + + + SSTableIdentityIterator row; + SSTableScanner scanner = reader.getDirectScanner(BufferedRandomAccessFile.DEFAULT_BUFFER_SIZE); + + outs.println("{"); + + int i = 0; + + // collecting keys to export + while (scanner.hasNext()) + { + row = (SSTableIdentityIterator) scanner.next(); + + String currentKey = bytesToHex(row.getKey().key); + + if (excludeSet.contains(currentKey)) + continue; + else if (i != 0) + outs.println(","); + + serializeRow(row, row.getKey(), outs); + + i++; + } + + outs.println("\n}"); + outs.flush(); + + scanner.close(); + } + + /** + * Export an SSTable and write the resulting JSON to a PrintStream. + * + * @param ssTableFile the SSTable to export + * @param outs PrintStream to write the output to + * @param excludes keys to exclude from export + * + * @throws IOException on failure to read/write input/output + */ + public static void export(String ssTableFile, PrintStream outs, String[] excludes) throws IOException + { + export(SSTableReader.open(Descriptor.fromFilename(ssTableFile)), outs, excludes); + } + + /** + * Export an SSTable and write the resulting JSON to standard out. + * + * @param ssTableFile SSTable to export + * @param excludes keys to exclude from export + * + * @throws IOException on failure to read/write SSTable/standard out + */ + public static void export(String ssTableFile, String[] excludes) throws IOException + { + export(ssTableFile, System.out, excludes); + } + + /** + * Given arguments specifying an SSTable, and optionally an output file, + * export the contents of the SSTable to JSON. + * + * @param args command lines arguments + * + * @throws IOException on failure to open/read/write files or output streams + * @throws ConfigurationException on configuration failure (wrong params given) + */ + public static void main(String[] args) throws IOException, ConfigurationException + { + String usage = String.format("Usage: %s <sstable> [-k key [-k key [...]] -x key [-x key [...]]]%n", SSTableExport.class.getName()); + + CommandLineParser parser = new PosixParser(); + try + { + cmd = parser.parse(options, args); + } + catch (ParseException e1) + { + System.err.println(e1.getMessage()); + System.err.println(usage); + System.exit(1); + } + + + if (cmd.getArgs().length != 1) + { + System.err.println("You must supply exactly one sstable"); + System.err.println(usage); + System.exit(1); + } + + + String[] keys = cmd.getOptionValues(KEY_OPTION); + String[] excludes = cmd.getOptionValues(EXCLUDEKEY_OPTION); + String ssTableFileName = new File(cmd.getArgs()[0]).getAbsolutePath(); + + DatabaseDescriptor.loadSchemas(); + if (DatabaseDescriptor.getNonSystemTables().size() < 1) + { + String msg = "no non-system tables are defined"; + System.err.println(msg); + throw new ConfigurationException(msg); + } + + if (cmd.hasOption(ENUMERATEKEYS_OPTION)) + { + enumeratekeys(ssTableFileName, System.out); + } + else + { + if ((keys != null) && (keys.length > 0)) + export(ssTableFileName, System.out, Arrays.asList(keys), excludes); + else + export(ssTableFileName, excludes); + } + + System.exit(0); + } +} Added: cassandra/trunk/src/java/org/apache/cassandra/tools/SSTableImport.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/tools/SSTableImport.java?rev=1087815&view=auto ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/tools/SSTableImport.java (added) +++ cassandra/trunk/src/java/org/apache/cassandra/tools/SSTableImport.java Fri Apr 1 17:15:06 2011 @@ -0,0 +1,528 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.tools; + +import java.io.File; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.*; + +import org.apache.cassandra.db.marshal.AbstractType; +import org.apache.cassandra.db.marshal.BytesType; +import org.apache.cassandra.db.marshal.MarshalException; +import org.apache.commons.cli.*; + +import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.config.ConfigurationException; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.db.*; +import org.apache.cassandra.db.filter.QueryPath; +import org.apache.cassandra.dht.IPartitioner; +import org.apache.cassandra.io.sstable.SSTableWriter; +import org.codehaus.jackson.type.TypeReference; + +import org.codehaus.jackson.JsonFactory; +import org.codehaus.jackson.map.MappingJsonFactory; + +import org.codehaus.jackson.JsonParser; + +import static org.apache.cassandra.utils.ByteBufferUtil.hexToBytes; + +/** + * Create SSTables from JSON input + */ +public class SSTableImport +{ + private static final String KEYSPACE_OPTION = "K"; + private static final String COLUMN_FAMILY_OPTION = "c"; + private static final String KEY_COUNT_OPTION = "n"; + private static final String IS_SORTED_OPTION = "s"; + + private static Options options; + private static CommandLine cmd; + + private static Integer keyCountToImport = null; + private static boolean isSorted = false; + + private static JsonFactory factory = new MappingJsonFactory(); + + static + { + options = new Options(); + + Option optKeyspace = new Option(KEYSPACE_OPTION, true, "Keyspace name."); + optKeyspace.setRequired(true); + options.addOption(optKeyspace); + + Option optColfamily = new Option(COLUMN_FAMILY_OPTION, true, "Column Family name."); + optColfamily.setRequired(true); + options.addOption(optColfamily); + + options.addOption(new Option(KEY_COUNT_OPTION, true, "Number of keys to import (Optional).")); + options.addOption(new Option(IS_SORTED_OPTION, false, "Assume JSON file as already sorted (e.g. created by sstable2json tool) (Optional).")); + } + + private static class JsonColumn<T> + { + private ByteBuffer name; + private ByteBuffer value; + private long timestamp; + + private String kind; + // Expiring columns + private int ttl; + private int localExpirationTime; + + // Counter columns + private long timestampOfLastDelete; + + public JsonColumn(T json, CFMetaData meta, boolean isSubColumn) + { + AbstractType comparator = (isSubColumn) ? meta.subcolumnComparator : meta.comparator; + + if (json instanceof List) + { + List fields = (List<?>) json; + + assert fields.size() >= 3 : "Column definition should have at least 3"; + + name = stringAsType((String) fields.get(0), comparator); + value = stringAsType((String) fields.get(1), meta.getValueValidator(name.duplicate())); + timestamp = (Long) fields.get(2); + kind = ""; + + if (fields.size() > 3) + { + if (fields.get(3) instanceof Boolean) + { + // old format, reading this for backward compatibility sake + if (fields.size() == 6) + { + kind = "e"; + ttl = (Integer) fields.get(4); + localExpirationTime = (int) (long) ((Long) fields.get(5)); + } + else + { + kind = ((Boolean) fields.get(3)) ? "d" : ""; + } + } + else + { + kind = (String) fields.get(3); + if (isExpiring()) + { + ttl = (Integer) fields.get(4); + localExpirationTime = (int) (long) ((Long) fields.get(5)); + } + else if (isCounter()) + { + timestampOfLastDelete = (long) ((Integer) fields.get(4)); + } + } + } + } + } + + public boolean isDeleted() + { + return kind.equals("d"); + } + + public boolean isExpiring() + { + return kind.equals("e"); + } + + public boolean isCounter() + { + return kind.equals("c"); + } + + public ByteBuffer getName() + { + return name.duplicate(); + } + + public ByteBuffer getValue() + { + return value.duplicate(); + } + } + + private static void addToStandardCF(List<?> row, ColumnFamily cfamily) + { + addColumnsToCF(row, null, cfamily); + } + + /** + * Add columns to a column family. + * + * @param row the columns associated with a row + * @param superName name of the super column if any + * @param cfamily the column family to add columns to + */ + private static void addColumnsToCF(List<?> row, ByteBuffer superName, ColumnFamily cfamily) + { + CFMetaData cfm = cfamily.metadata(); + assert cfm != null; + + for (Object c : row) + { + JsonColumn col = new JsonColumn<List>((List) c, cfm, (superName != null)); + QueryPath path = new QueryPath(cfm.cfName, superName, col.getName()); + + if (col.isExpiring()) + { + cfamily.addColumn(null, new ExpiringColumn(col.getName(), col.getValue(), col.timestamp, col.ttl, col.localExpirationTime)); + } + else if (col.isCounter()) + { + cfamily.addColumn(null, new CounterColumn(col.getName(), col.getValue(), col.timestamp, col.timestampOfLastDelete)); + } + else if (col.isDeleted()) + { + cfamily.addTombstone(path, col.getValue(), col.timestamp); + } + else + { + cfamily.addColumn(path, col.getValue(), col.timestamp); + } + } + } + + /** + * Add super columns to a column family. + * + * @param row the super columns associated with a row + * @param cfamily the column family to add columns to + */ + private static void addToSuperCF(Map<?, ?> row, ColumnFamily cfamily) + { + CFMetaData metaData = cfamily.metadata(); + assert metaData != null; + + AbstractType comparator = metaData.comparator; + + // Super columns + for (Map.Entry<?, ?> entry : row.entrySet()) + { + Map<?, ?> data = (Map<?, ?>) entry.getValue(); + + addColumnsToCF((List<?>) data.get("subColumns"), stringAsType((String) entry.getKey(), comparator), cfamily); + + // *WARNING* markForDeleteAt has been DEPRECATED at Cassandra side + //BigInteger deletedAt = (BigInteger) data.get("deletedAt"); + //SuperColumn superColumn = (SuperColumn) cfamily.getColumn(superName); + //superColumn.markForDeleteAt((int) (System.currentTimeMillis()/1000), deletedAt); + } + } + + /** + * Convert a JSON formatted file to an SSTable. + * + * @param jsonFile the file containing JSON formatted data + * @param keyspace keyspace the data belongs to + * @param cf column family the data belongs to + * @param ssTablePath file to write the SSTable to + * + * @throws IOException for errors reading/writing input/output + */ + public static void importJson(String jsonFile, String keyspace, String cf, String ssTablePath) throws IOException + { + ColumnFamily columnFamily = ColumnFamily.create(keyspace, cf); + IPartitioner<?> partitioner = DatabaseDescriptor.getPartitioner(); + + int importedKeys = (isSorted) ? importSorted(jsonFile, columnFamily, ssTablePath, partitioner) + : importUnsorted(getParser(jsonFile), columnFamily, ssTablePath, partitioner); + + if (importedKeys != -1) + System.out.printf("%d keys imported successfully.%n", importedKeys); + } + + private static int importUnsorted(JsonParser parser, ColumnFamily columnFamily, String ssTablePath, IPartitioner<?> partitioner) throws IOException + { + int importedKeys = 0; + long start = System.currentTimeMillis(); + Map<?, ?> data = parser.readValueAs(new TypeReference<Map<?, ?>>() {}); + + keyCountToImport = (keyCountToImport == null) ? data.size() : keyCountToImport; + SSTableWriter writer = new SSTableWriter(ssTablePath, keyCountToImport); + + System.out.printf("Importing %s keys...%n", keyCountToImport); + + // sort by dk representation, but hold onto the hex version + SortedMap<DecoratedKey,String> decoratedKeys = new TreeMap<DecoratedKey,String>(); + + for (Object keyObject : data.keySet()) + { + String key = (String) keyObject; + decoratedKeys.put(partitioner.decorateKey(hexToBytes(key)), key); + } + + for (Map.Entry<DecoratedKey, String> rowKey : decoratedKeys.entrySet()) + { + if (columnFamily.getColumnFamilyType() == ColumnFamilyType.Super) + { + addToSuperCF((Map<?, ?>) data.get(rowKey.getValue()), columnFamily); + } + else + { + addToStandardCF((List<?>) data.get(rowKey.getValue()), columnFamily); + } + + writer.append(rowKey.getKey(), columnFamily); + columnFamily.clear(); + + importedKeys++; + + long current = System.currentTimeMillis(); + + if (current - start >= 5000) // 5 secs. + { + System.out.printf("Currently imported %d keys.%n", importedKeys); + start = current; + } + + if (keyCountToImport == importedKeys) + break; + } + + writer.closeAndOpenReader(); + + return importedKeys; + } + + public static int importSorted(String jsonFile, ColumnFamily columnFamily, String ssTablePath, IPartitioner<?> partitioner) throws IOException + { + int importedKeys = 0; // already imported keys count + long start = System.currentTimeMillis(); + + JsonParser parser = getParser(jsonFile); + + if (keyCountToImport == null) + { + keyCountToImport = 0; + System.out.println("Counting keys to import, please wait... (NOTE: to skip this use -n <num_keys>)"); + + parser.nextToken(); // START_OBJECT + while (parser.nextToken() != null) + { + parser.nextToken(); + parser.skipChildren(); + if (parser.getCurrentName() == null) continue; + + keyCountToImport++; + } + } + + System.out.printf("Importing %s keys...%n", keyCountToImport); + + parser = getParser(jsonFile); // renewing parser + SSTableWriter writer = new SSTableWriter(ssTablePath, keyCountToImport); + + int lineNumber = 1; + DecoratedKey prevStoredKey = null; + + while (parser.nextToken() != null) + { + String key = parser.getCurrentName(); + + if (key != null) + { + String tokenName = parser.nextToken().name(); + + if (tokenName.equals("START_ARRAY")) + { + if (columnFamily.getColumnFamilyType() == ColumnFamilyType.Super) + { + throw new RuntimeException("Can't write Standard columns to the Super Column Family."); + } + + List<?> columns = parser.readValueAs(new TypeReference<List<?>>() {}); + addToStandardCF(columns, columnFamily); + } + else if (tokenName.equals("START_OBJECT")) + { + if (columnFamily.getColumnFamilyType() == ColumnFamilyType.Standard) + { + throw new RuntimeException("Can't write Super columns to the Standard Column Family."); + } + + Map<?, ?> columns = parser.readValueAs(new TypeReference<Map<?, ?>>() {}); + addToSuperCF(columns, columnFamily); + } + else + { + throw new UnsupportedOperationException("Only Array or Hash allowed as row content."); + } + + DecoratedKey currentKey = partitioner.decorateKey(hexToBytes(key)); + + if (prevStoredKey != null && prevStoredKey.compareTo(currentKey) != -1) + { + System.err.printf("Line %d: Key %s is greater than previous, collection is not sorted properly. Aborting import. You might need to delete SSTables manually.%n", lineNumber, key); + return -1; + } + + // saving decorated key + writer.append(currentKey, columnFamily); + columnFamily.clear(); + + prevStoredKey = currentKey; + importedKeys++; + lineNumber++; + + long current = System.currentTimeMillis(); + + if (current - start >= 5000) // 5 secs. + { + System.out.printf("Currently imported %d keys.%n", importedKeys); + start = current; + } + + if (keyCountToImport == importedKeys) + break; + } + } + + writer.closeAndOpenReader(); + + return importedKeys; + } + + /** + * Get JsonParser object for file + * @param fileName name of the file + * @return json parser instance for given file + * @throws IOException if any I/O error. + */ + private static JsonParser getParser(String fileName) throws IOException + { + return factory.createJsonParser(new File(fileName)).configure(JsonParser.Feature.INTERN_FIELD_NAMES, false); + } + + /** + * Converts JSON to an SSTable file. JSON input can either be a file specified + * using an optional command line argument, or supplied on standard in. + * + * @param args command line arguments + * @throws IOException on failure to open/read/write files or output streams + * @throws ParseException on failure to parse JSON input + * @throws ConfigurationException on configuration error. + */ + public static void main(String[] args) throws IOException, ParseException, ConfigurationException + { + CommandLineParser parser = new PosixParser(); + + try + { + cmd = parser.parse(options, args); + } + catch (org.apache.commons.cli.ParseException e) + { + System.err.println(e.getMessage()); + printProgramUsage(); + System.exit(1); + } + + if (cmd.getArgs().length != 2) + { + printProgramUsage(); + System.exit(1); + } + + String json = cmd.getArgs()[0]; + String ssTable = cmd.getArgs()[1]; + String keyspace = cmd.getOptionValue(KEYSPACE_OPTION); + String cfamily = cmd.getOptionValue(COLUMN_FAMILY_OPTION); + + if (cmd.hasOption(KEY_COUNT_OPTION)) + { + keyCountToImport = Integer.valueOf(cmd.getOptionValue(KEY_COUNT_OPTION)); + } + + if (cmd.hasOption(IS_SORTED_OPTION)) + { + isSorted = true; + } + + DatabaseDescriptor.loadSchemas(); + if (DatabaseDescriptor.getNonSystemTables().size() < 1) + { + String msg = "no non-system tables are defined"; + System.err.println(msg); + throw new ConfigurationException(msg); + } + + try + { + importJson(json, keyspace, cfamily, ssTable); + } + catch (Exception e) + { + e.printStackTrace(); + System.err.println("ERROR: " + e.getMessage()); + System.exit(-1); + } + + System.exit(0); + } + + private static void printProgramUsage() + { + System.out.printf("Usage: %s -s -K <keyspace> -c <column_family> -n <num_keys> <json> <sstable>%n%n", + SSTableImport.class.getName()); + + System.out.println("Options:"); + for (Object o : options.getOptions()) + { + Option opt = (Option) o; + System.out.println(" -" +opt.getOpt() + " - " + opt.getDescription()); + } + } + + /** + * Used by test framework to set key count + * @param keyCount numbers of keys to import + */ + public static void setKeyCountToImport(Integer keyCount) + { + keyCountToImport = keyCount; + } + + /** + * Convert a string to bytes (ByteBuffer) according to type + * @param content string to convert + * @param type type to use for conversion + * @return byte buffer representation of the given string + */ + private static ByteBuffer stringAsType(String content, AbstractType type) + { + try + { + return (type == BytesType.instance) ? hexToBytes(content) : type.fromString(content); + } + catch (MarshalException e) + { + throw new RuntimeException(e.getMessage()); + } + } + +}