Add sstabledump tool patch by Chris Lohfink; reviewed by yukim for CASSANDRA-7464
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/71b1c4a6 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/71b1c4a6 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/71b1c4a6 Branch: refs/heads/trunk Commit: 71b1c4a63187f746c0caecc41bc07b42dedd3488 Parents: a623977 Author: Chris Lohfink <chris.lohf...@datastax.com> Authored: Tue Feb 23 11:22:26 2016 -0600 Committer: Yuki Morishita <yu...@apache.org> Committed: Wed Feb 24 11:48:53 2016 -0600 ---------------------------------------------------------------------- CHANGES.txt | 1 + NEWS.txt | 14 + .../org/apache/cassandra/config/CFMetaData.java | 8 +- .../cassandra/db/SerializationHeader.java | 25 + .../db/rows/AbstractRangeTombstoneMarker.java | 4 + .../apache/cassandra/db/rows/AbstractRow.java | 12 +- .../apache/cassandra/db/rows/Unfiltered.java | 1 + .../io/sstable/format/SSTableReader.java | 8 + .../io/sstable/format/big/BigTableReader.java | 13 +- .../io/sstable/format/big/BigTableScanner.java | 5 + .../apache/cassandra/tools/JsonTransformer.java | 501 +++++++++++++++++++ .../apache/cassandra/tools/SSTableExport.java | 242 +++++++++ .../io/sstable/SSTableScannerTest.java | 8 +- tools/bin/sstabledump | 52 ++ tools/bin/sstabledump.bat | 48 ++ 15 files changed, 933 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/71b1c4a6/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 4549ded..aefc02e 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 3.0.4 + * Add sstabledump tool (CASSANDRA-7464) * Introduce backpressure for hints (CASSANDRA-10972) * Fix ClusteringPrefix not being able to read tombstone range boundaries (CASSANDRA-11158) * Prevent logging in sandboxed state (CASSANDRA-11033) http://git-wip-us.apache.org/repos/asf/cassandra/blob/71b1c4a6/NEWS.txt ---------------------------------------------------------------------- diff --git a/NEWS.txt b/NEWS.txt index 89fc4a7..5fca578 100644 --- a/NEWS.txt +++ b/NEWS.txt @@ -13,6 +13,20 @@ restore snapshots created with the previous major version using the 'sstableloader' tool. You can upgrade the file format of your snapshots using the provided 'sstableupgrade' tool. +3.0.4 +===== + +New features +------------ + - sstabledump tool is added to be 3.0 version of former sstable2json. The tool only + supports v3.0+ SSTables. See tool's help for more detail. + +Upgrading +--------- + - Nothing specific to this release, but please see previous versions upgrading section, + especially if you are upgrading from 2.2. + + 3.0.3 ===== http://git-wip-us.apache.org/repos/asf/cassandra/blob/71b1c4a6/src/java/org/apache/cassandra/config/CFMetaData.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/config/CFMetaData.java b/src/java/org/apache/cassandra/config/CFMetaData.java index cb6d3b8..79cd779 100644 --- a/src/java/org/apache/cassandra/config/CFMetaData.java +++ b/src/java/org/apache/cassandra/config/CFMetaData.java @@ -1132,7 +1132,7 @@ public final class CFMetaData private final boolean isSuper; private final boolean isCounter; private final boolean isView; - private IPartitioner partitioner; + private Optional<IPartitioner> partitioner; private UUID tableId; @@ -1150,7 +1150,7 @@ public final class CFMetaData this.isSuper = isSuper; this.isCounter = isCounter; this.isView = isView; - this.partitioner = DatabaseDescriptor.getPartitioner(); + this.partitioner = Optional.empty(); } public static Builder create(String keyspace, String table) @@ -1185,7 +1185,7 @@ public final class CFMetaData public Builder withPartitioner(IPartitioner partitioner) { - this.partitioner = partitioner; + this.partitioner = Optional.ofNullable(partitioner); return this; } @@ -1296,7 +1296,7 @@ public final class CFMetaData partitions, clusterings, builder.build(), - partitioner); + partitioner.orElseGet(DatabaseDescriptor::getPartitioner)); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/71b1c4a6/src/java/org/apache/cassandra/db/SerializationHeader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/SerializationHeader.java b/src/java/org/apache/cassandra/db/SerializationHeader.java index 0706d06..6aee0b6 100644 --- a/src/java/org/apache/cassandra/db/SerializationHeader.java +++ b/src/java/org/apache/cassandra/db/SerializationHeader.java @@ -361,6 +361,31 @@ public class SerializationHeader return String.format("SerializationHeader.Component[key=%s, cks=%s, statics=%s, regulars=%s, stats=%s]", keyType, clusteringTypes, staticColumns, regularColumns, stats); } + + public AbstractType<?> getKetType() + { + return keyType; + } + + public List<AbstractType<?>> getClusteringTypes() + { + return clusteringTypes; + } + + public Map<ByteBuffer, AbstractType<?>> getStaticColumns() + { + return staticColumns; + } + + public Map<ByteBuffer, AbstractType<?>> getRegularColumns() + { + return regularColumns; + } + + public EncodingStats getEncodingStats() + { + return stats; + } } public static class Serializer implements IMetadataComponentSerializer<Component> http://git-wip-us.apache.org/repos/asf/cassandra/blob/71b1c4a6/src/java/org/apache/cassandra/db/rows/AbstractRangeTombstoneMarker.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/rows/AbstractRangeTombstoneMarker.java b/src/java/org/apache/cassandra/db/rows/AbstractRangeTombstoneMarker.java index e90e52b..b1ee7ec 100644 --- a/src/java/org/apache/cassandra/db/rows/AbstractRangeTombstoneMarker.java +++ b/src/java/org/apache/cassandra/db/rows/AbstractRangeTombstoneMarker.java @@ -71,4 +71,8 @@ public abstract class AbstractRangeTombstoneMarker implements RangeTombstoneMark { return toString(metadata); } + public String toString(CFMetaData metadata, boolean includeClusteringKeys, boolean fullDetails) + { + return toString(metadata); + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/71b1c4a6/src/java/org/apache/cassandra/db/rows/AbstractRow.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/rows/AbstractRow.java b/src/java/org/apache/cassandra/db/rows/AbstractRow.java index 484f981..0295e2e 100644 --- a/src/java/org/apache/cassandra/db/rows/AbstractRow.java +++ b/src/java/org/apache/cassandra/db/rows/AbstractRow.java @@ -92,6 +92,11 @@ public abstract class AbstractRow extends AbstractCollection<ColumnData> impleme public String toString(CFMetaData metadata, boolean fullDetails) { + return toString(metadata, true, fullDetails); + } + + public String toString(CFMetaData metadata, boolean includeClusterKeys, boolean fullDetails) + { StringBuilder sb = new StringBuilder(); sb.append("Row"); if (fullDetails) @@ -101,7 +106,12 @@ public abstract class AbstractRow extends AbstractCollection<ColumnData> impleme sb.append(" del=").append(deletion()); sb.append(" ]"); } - sb.append(": ").append(clustering().toString(metadata)).append(" | "); + sb.append(": "); + if(includeClusterKeys) + sb.append(clustering().toString(metadata)); + else + sb.append(clustering().toCQLString(metadata)); + sb.append(" | "); boolean isFirst = true; for (ColumnData cd : this) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/71b1c4a6/src/java/org/apache/cassandra/db/rows/Unfiltered.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/rows/Unfiltered.java b/src/java/org/apache/cassandra/db/rows/Unfiltered.java index ba03741..9d96137 100644 --- a/src/java/org/apache/cassandra/db/rows/Unfiltered.java +++ b/src/java/org/apache/cassandra/db/rows/Unfiltered.java @@ -57,6 +57,7 @@ public interface Unfiltered extends Clusterable public String toString(CFMetaData metadata); public String toString(CFMetaData metadata, boolean fullDetails); + public String toString(CFMetaData metadata, boolean includeClusterKeys, boolean fullDetails); default boolean isRow() { http://git-wip-us.apache.org/repos/asf/cassandra/blob/71b1c4a6/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java index 691bf45..8a778b7 100644 --- a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java +++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java @@ -1761,6 +1761,14 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS public abstract ISSTableScanner getScanner(Collection<Range<Token>> ranges, RateLimiter limiter); /** + * Direct I/O SSTableScanner over an iterator of bounds. + * + * @param bounds the keys to cover + * @return A Scanner for seeking over the rows of the SSTable. + */ + public abstract ISSTableScanner getScanner(Iterator<AbstractBounds<PartitionPosition>> rangeIterator); + + /** * @param columns the columns to return. * @param dataRange filter to use when reading the columns * @return A Scanner for seeking over the rows of the SSTable. http://git-wip-us.apache.org/repos/asf/cassandra/blob/71b1c4a6/src/java/org/apache/cassandra/io/sstable/format/big/BigTableReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableReader.java b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableReader.java index c16018a..dbab0f4 100644 --- a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableReader.java +++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableReader.java @@ -25,7 +25,7 @@ import org.apache.cassandra.db.rows.SliceableUnfilteredRowIterator; import org.apache.cassandra.db.filter.ColumnFilter; import org.apache.cassandra.db.columniterator.SSTableIterator; import org.apache.cassandra.db.columniterator.SSTableReversedIterator; -import org.apache.cassandra.dht.IPartitioner; +import org.apache.cassandra.dht.AbstractBounds; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; import org.apache.cassandra.io.sstable.Component; @@ -82,6 +82,17 @@ public class BigTableReader extends SSTableReader } /** + * Direct I/O SSTableScanner over an iterator of bounds. + * + * @param boundsIterator the keys to cover + * @return A Scanner for seeking over the rows of the SSTable. + */ + public ISSTableScanner getScanner(Iterator<AbstractBounds<PartitionPosition>> boundsIterator) + { + return BigTableScanner.getScanner(this, boundsIterator); + } + + /** * Direct I/O SSTableScanner over the full sstable. * * @return A Scanner for reading the full SSTable. http://git-wip-us.apache.org/repos/asf/cassandra/blob/71b1c4a6/src/java/org/apache/cassandra/io/sstable/format/big/BigTableScanner.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableScanner.java b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableScanner.java index fd413fd..717cfdc 100644 --- a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableScanner.java +++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableScanner.java @@ -86,6 +86,11 @@ public class BigTableScanner implements ISSTableScanner return new BigTableScanner(sstable, ColumnFilter.all(sstable.metadata), null, limiter, false, makeBounds(sstable, tokenRanges).iterator()); } + public static ISSTableScanner getScanner(SSTableReader sstable, Iterator<AbstractBounds<PartitionPosition>> rangeIterator) + { + return new BigTableScanner(sstable, ColumnFilter.all(sstable.metadata), null, null, false, rangeIterator); + } + private BigTableScanner(SSTableReader sstable, ColumnFilter columns, DataRange dataRange, RateLimiter limiter, boolean isForThrift, Iterator<AbstractBounds<PartitionPosition>> rangeIterator) { assert sstable != null; http://git-wip-us.apache.org/repos/asf/cassandra/blob/71b1c4a6/src/java/org/apache/cassandra/tools/JsonTransformer.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/JsonTransformer.java b/src/java/org/apache/cassandra/tools/JsonTransformer.java new file mode 100644 index 0000000..7b0ec5d --- /dev/null +++ b/src/java/org/apache/cassandra/tools/JsonTransformer.java @@ -0,0 +1,501 @@ +package org.apache.cassandra.tools; + +import java.io.IOException; +import java.io.OutputStream; +import java.io.OutputStreamWriter; +import java.nio.ByteBuffer; +import java.util.List; +import java.util.stream.Stream; + +import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.config.ColumnDefinition; +import org.apache.cassandra.db.ClusteringPrefix; +import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.db.DeletionTime; +import org.apache.cassandra.db.LivenessInfo; +import org.apache.cassandra.db.RangeTombstone; +import org.apache.cassandra.db.marshal.AbstractType; +import org.apache.cassandra.db.marshal.CollectionType; +import org.apache.cassandra.db.marshal.CompositeType; +import org.apache.cassandra.db.rows.Cell; +import org.apache.cassandra.db.rows.RangeTombstoneBoundMarker; +import org.apache.cassandra.db.rows.RangeTombstoneBoundaryMarker; +import org.apache.cassandra.db.rows.RangeTombstoneMarker; +import org.apache.cassandra.db.rows.Row; +import org.apache.cassandra.db.rows.Unfiltered; +import org.apache.cassandra.db.rows.UnfilteredRowIterator; +import org.apache.cassandra.io.sstable.ISSTableScanner; +import org.apache.cassandra.utils.ByteBufferUtil; +import org.codehaus.jackson.JsonFactory; +import org.codehaus.jackson.JsonGenerator; +import org.codehaus.jackson.impl.Indenter; +import org.codehaus.jackson.util.DefaultPrettyPrinter; +import org.codehaus.jackson.util.DefaultPrettyPrinter.NopIndenter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public final class JsonTransformer +{ + + private static final Logger logger = LoggerFactory.getLogger(JsonTransformer.class); + + private static final JsonFactory jsonFactory = new JsonFactory(); + + private final JsonGenerator json; + + private final CompactIndenter objectIndenter = new CompactIndenter(); + + private final CompactIndenter arrayIndenter = new CompactIndenter(); + + private final CFMetaData metadata; + + private final ISSTableScanner currentScanner; + + private long currentPosition = 0; + + private JsonTransformer(JsonGenerator json, ISSTableScanner currentScanner, CFMetaData metadata) + { + this.json = json; + this.metadata = metadata; + this.currentScanner = currentScanner; + + DefaultPrettyPrinter prettyPrinter = new DefaultPrettyPrinter(); + prettyPrinter.indentObjectsWith(objectIndenter); + prettyPrinter.indentArraysWith(arrayIndenter); + json.setPrettyPrinter(prettyPrinter); + } + + public static void toJson(ISSTableScanner currentScanner, Stream<UnfilteredRowIterator> partitions, CFMetaData metadata, OutputStream out) + throws IOException + { + try (JsonGenerator json = jsonFactory.createJsonGenerator(new OutputStreamWriter(out, "UTF-8"))) + { + JsonTransformer transformer = new JsonTransformer(json, currentScanner, metadata); + json.writeStartArray(); + partitions.forEach(transformer::serializePartition); + json.writeEndArray(); + } + } + + public static void keysToJson(ISSTableScanner currentScanner, Stream<DecoratedKey> keys, CFMetaData metadata, OutputStream out) throws IOException + { + try (JsonGenerator json = jsonFactory.createJsonGenerator(new OutputStreamWriter(out, "UTF-8"))) + { + JsonTransformer transformer = new JsonTransformer(json, currentScanner, metadata); + json.writeStartArray(); + keys.forEach(transformer::serializePartitionKey); + json.writeEndArray(); + } + } + + private void updatePosition() + { + this.currentPosition = currentScanner.getCurrentPosition(); + } + + private void serializePartitionKey(DecoratedKey key) + { + AbstractType<?> keyValidator = metadata.getKeyValidator(); + objectIndenter.setCompact(true); + try + { + arrayIndenter.setCompact(true); + json.writeStartArray(); + if (keyValidator instanceof CompositeType) + { + // if a composite type, the partition has multiple keys. + CompositeType compositeType = (CompositeType) keyValidator; + ByteBuffer keyBytes = key.getKey().duplicate(); + // Skip static data if it exists. + if (keyBytes.remaining() >= 2) + { + int header = ByteBufferUtil.getShortLength(keyBytes, keyBytes.position()); + if ((header & 0xFFFF) == 0xFFFF) + { + ByteBufferUtil.readShortLength(keyBytes); + } + } + + int i = 0; + while (keyBytes.remaining() > 0 && i < compositeType.getComponents().size()) + { + AbstractType<?> colType = compositeType.getComponents().get(i); + + ByteBuffer value = ByteBufferUtil.readBytesWithShortLength(keyBytes); + String colValue = colType.getString(value); + + json.writeString(colValue); + + byte b = keyBytes.get(); + if (b != 0) + { + break; + } + ++i; + } + } + else + { + // if not a composite type, assume a single column partition key. + assert metadata.partitionKeyColumns().size() == 1; + json.writeString(keyValidator.getString(key.getKey())); + } + json.writeEndArray(); + objectIndenter.setCompact(false); + arrayIndenter.setCompact(false); + } + catch (IOException e) + { + logger.error("Failure serializing partition key.", e); + } + } + + private void serializePartition(UnfilteredRowIterator partition) + { + String key = metadata.getKeyValidator().getString(partition.partitionKey().getKey()); + try + { + json.writeStartObject(); + + json.writeFieldName("partition"); + json.writeStartObject(); + json.writeFieldName("key"); + serializePartitionKey(partition.partitionKey()); + json.writeNumberField("position", this.currentScanner.getCurrentPosition()); + + if (!partition.partitionLevelDeletion().isLive()) + { + json.writeFieldName("deletion_info"); + objectIndenter.setCompact(true); + json.writeStartObject(); + json.writeFieldName("deletion_time"); + json.writeNumber(partition.partitionLevelDeletion().markedForDeleteAt()); + json.writeFieldName("tstamp"); + json.writeNumber(partition.partitionLevelDeletion().localDeletionTime()); + json.writeEndObject(); + objectIndenter.setCompact(false); + json.writeEndObject(); + } + else + { + json.writeEndObject(); + json.writeFieldName("rows"); + json.writeStartArray(); + updatePosition(); + if (!partition.staticRow().isEmpty()) + { + serializeRow(partition.staticRow()); + } + Unfiltered unfiltered; + updatePosition(); + while (partition.hasNext()) + { + unfiltered = partition.next(); + if (unfiltered instanceof Row) + { + serializeRow((Row) unfiltered); + } + else if (unfiltered instanceof RangeTombstoneMarker) + { + serializeTombstone((RangeTombstoneMarker) unfiltered); + } + updatePosition(); + } + json.writeEndArray(); + } + + json.writeEndObject(); + } + catch (IOException e) + { + logger.error("Fatal error parsing partition: {}", key, e); + } + } + + private void serializeRow(Row row) + { + try + { + json.writeStartObject(); + String rowType = row.isStatic() ? "static_block" : "row"; + json.writeFieldName("type"); + json.writeString(rowType); + json.writeNumberField("position", this.currentPosition); + + // Only print clustering information for non-static rows. + if (!row.isStatic()) + { + serializeClustering(row.clustering()); + } + + LivenessInfo liveInfo = row.primaryKeyLivenessInfo(); + if (!liveInfo.isEmpty()) + { + objectIndenter.setCompact(false); + json.writeFieldName("liveness_info"); + objectIndenter.setCompact(true); + json.writeStartObject(); + json.writeFieldName("tstamp"); + json.writeNumber(liveInfo.timestamp()); + if (liveInfo.isExpiring()) + { + json.writeFieldName("ttl"); + json.writeNumber(liveInfo.ttl()); + json.writeFieldName("expires_at"); + json.writeNumber(liveInfo.localExpirationTime()); + json.writeFieldName("expired"); + json.writeBoolean(liveInfo.localExpirationTime() < (System.currentTimeMillis() / 1000)); + } + json.writeEndObject(); + objectIndenter.setCompact(false); + } + + // If this is a deletion, indicate that, otherwise write cells. + if (!row.deletion().isLive()) + { + json.writeFieldName("deletion_info"); + objectIndenter.setCompact(true); + json.writeStartObject(); + json.writeFieldName("deletion_time"); + json.writeNumber(row.deletion().time().markedForDeleteAt()); + json.writeFieldName("tstamp"); + json.writeNumber(row.deletion().time().localDeletionTime()); + json.writeEndObject(); + objectIndenter.setCompact(false); + } + else + { + json.writeFieldName("cells"); + json.writeStartArray(); + row.cells().forEach(c -> serializeCell(c, liveInfo)); + json.writeEndArray(); + } + json.writeEndObject(); + } + catch (IOException e) + { + logger.error("Fatal error parsing row.", e); + } + } + + private void serializeTombstone(RangeTombstoneMarker tombstone) + { + try + { + json.writeStartObject(); + json.writeFieldName("type"); + + if (tombstone instanceof RangeTombstoneBoundMarker) + { + json.writeString("range_tombstone_bound"); + RangeTombstoneBoundMarker bm = (RangeTombstoneBoundMarker) tombstone; + serializeBound(bm.clustering(), bm.deletionTime()); + } + else + { + assert tombstone instanceof RangeTombstoneBoundaryMarker; + json.writeString("range_tombstone_boundary"); + RangeTombstoneBoundaryMarker bm = (RangeTombstoneBoundaryMarker) tombstone; + serializeBound(bm.openBound(false), bm.openDeletionTime(false)); + serializeBound(bm.closeBound(false), bm.closeDeletionTime(false)); + } + json.writeEndObject(); + objectIndenter.setCompact(false); + } + catch (IOException e) + { + logger.error("Failure parsing tombstone.", e); + } + } + + private void serializeBound(RangeTombstone.Bound bound, DeletionTime deletionTime) throws IOException + { + json.writeFieldName(bound.isStart() ? "start" : "end"); + json.writeStartObject(); + json.writeFieldName("type"); + json.writeString(bound.isInclusive() ? "inclusive" : "exclusive"); + serializeClustering(bound.clustering()); + serializeDeletion(deletionTime); + json.writeEndObject(); + } + + private void serializeClustering(ClusteringPrefix clustering) throws IOException + { + if (clustering.size() > 0) + { + json.writeFieldName("clustering"); + objectIndenter.setCompact(true); + json.writeStartArray(); + arrayIndenter.setCompact(true); + List<ColumnDefinition> clusteringColumns = metadata.clusteringColumns(); + for (int i = 0; i < clusteringColumns.size(); i++) + { + ColumnDefinition column = clusteringColumns.get(i); + if (i >= clustering.size()) + { + json.writeString("*"); + } + else + { + json.writeString(column.cellValueType().getString(clustering.get(i))); + } + } + json.writeEndArray(); + objectIndenter.setCompact(false); + arrayIndenter.setCompact(false); + } + } + + private void serializeDeletion(DeletionTime deletion) throws IOException + { + json.writeFieldName("deletion_info"); + objectIndenter.setCompact(true); + json.writeStartObject(); + json.writeFieldName("deletion_time"); + json.writeNumber(deletion.markedForDeleteAt()); + json.writeFieldName("tstamp"); + json.writeNumber(deletion.localDeletionTime()); + json.writeEndObject(); + objectIndenter.setCompact(false); + } + + private void serializeCell(Cell cell, LivenessInfo liveInfo) + { + try + { + json.writeStartObject(); + objectIndenter.setCompact(true); + json.writeFieldName("name"); + AbstractType<?> type = cell.column().type; + json.writeString(cell.column().name.toCQLString()); + + if (cell.path() != null && cell.path().size() > 0) + { + CollectionType ct = (CollectionType) type; + json.writeFieldName("path"); + arrayIndenter.setCompact(true); + json.writeStartArray(); + for (int i = 0; i < cell.path().size(); i++) + { + json.writeString(ct.nameComparator().getString(cell.path().get(i))); + } + json.writeEndArray(); + arrayIndenter.setCompact(false); + } + if (cell.isTombstone()) + { + json.writeFieldName("deletion_time"); + json.writeNumber(cell.localDeletionTime()); + } + else + { + json.writeFieldName("value"); + json.writeString(cell.column().cellValueType().getString(cell.value())); + } + if (liveInfo.isEmpty() || cell.timestamp() != liveInfo.timestamp()) + { + json.writeFieldName("tstamp"); + json.writeNumber(cell.timestamp()); + } + if (cell.isExpiring() && (liveInfo.isEmpty() || cell.ttl() != liveInfo.ttl())) + { + json.writeFieldName("ttl"); + json.writeNumber(cell.ttl()); + json.writeFieldName("expires_at"); + json.writeNumber(cell.localDeletionTime()); + json.writeFieldName("expired"); + json.writeBoolean(!cell.isLive((int) (System.currentTimeMillis() / 1000))); + } + json.writeEndObject(); + objectIndenter.setCompact(false); + } + catch (IOException e) + { + logger.error("Failure parsing cell.", e); + } + } + + /** + * A specialized {@link Indenter} that enables a 'compact' mode which puts all subsequent json values on the same + * line. This is manipulated via {@link CompactIndenter#setCompact(boolean)} + */ + private static final class CompactIndenter extends NopIndenter + { + + private static final int INDENT_LEVELS = 16; + private final char[] indents; + private final int charsPerLevel; + private final String eol; + private static final String space = " "; + + private boolean compact = false; + + CompactIndenter() + { + this(" ", System.lineSeparator()); + } + + CompactIndenter(String indent, String eol) + { + this.eol = eol; + + charsPerLevel = indent.length(); + + indents = new char[indent.length() * INDENT_LEVELS]; + int offset = 0; + for (int i = 0; i < INDENT_LEVELS; i++) + { + indent.getChars(0, indent.length(), indents, offset); + offset += indent.length(); + } + } + + @Override + public boolean isInline() + { + return false; + } + + /** + * Configures whether or not subsequent json values should be on the same line delimited by string or not. + * + * @param compact + * Whether or not to compact. + */ + public void setCompact(boolean compact) + { + this.compact = compact; + } + + @Override + public void writeIndentation(JsonGenerator jg, int level) + { + try + { + if (!compact) + { + jg.writeRaw(eol); + if (level > 0) + { // should we err on negative values (as there's some flaw?) + level *= charsPerLevel; + while (level > indents.length) + { // unlike to happen but just in case + jg.writeRaw(indents, 0, indents.length); + level -= indents.length; + } + jg.writeRaw(indents, 0, level); + } + } + else + { + jg.writeRaw(space); + } + } + catch (IOException e) + { + e.printStackTrace(); + System.exit(1); + } + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/cassandra/blob/71b1c4a6/src/java/org/apache/cassandra/tools/SSTableExport.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/SSTableExport.java b/src/java/org/apache/cassandra/tools/SSTableExport.java new file mode 100644 index 0000000..ebe36c5 --- /dev/null +++ b/src/java/org/apache/cassandra/tools/SSTableExport.java @@ -0,0 +1,242 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.tools; + +import java.io.File; +import java.io.IOException; +import java.util.*; +import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import java.util.stream.StreamSupport; + +import org.apache.commons.cli.*; + +import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.config.Config; +import org.apache.cassandra.cql3.ColumnIdentifier; +import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.db.PartitionPosition; +import org.apache.cassandra.db.SerializationHeader; +import org.apache.cassandra.db.marshal.UTF8Type; +import org.apache.cassandra.db.rows.UnfilteredRowIterator; +import org.apache.cassandra.dht.AbstractBounds; +import org.apache.cassandra.dht.Bounds; +import org.apache.cassandra.dht.IPartitioner; +import org.apache.cassandra.exceptions.ConfigurationException; +import org.apache.cassandra.io.sstable.Descriptor; +import org.apache.cassandra.io.sstable.ISSTableScanner; +import org.apache.cassandra.io.sstable.KeyIterator; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.io.sstable.metadata.MetadataComponent; +import org.apache.cassandra.io.sstable.metadata.MetadataType; +import org.apache.cassandra.io.sstable.metadata.ValidationMetadata; +import org.apache.cassandra.utils.FBUtilities; + +/** + * Export SSTables to JSON format. + */ +public class SSTableExport +{ + + private static final String KEY_OPTION = "k"; + private static final String DEBUG_OUTPUT_OPTION = "d"; + private static final String EXCLUDE_KEY_OPTION = "x"; + private static final String ENUMERATE_KEYS_OPTION = "e"; + + private static final Options options = new Options(); + private static CommandLine cmd; + + static + { + Config.setClientMode(true); + + Option optKey = new Option(KEY_OPTION, true, "Row key"); + // Number of times -k <key> can be passed on the command line. + optKey.setArgs(500); + options.addOption(optKey); + + Option excludeKey = new Option(EXCLUDE_KEY_OPTION, true, "Excluded row key"); + // Number of times -x <key> can be passed on the command line. + excludeKey.setArgs(500); + options.addOption(excludeKey); + + Option optEnumerate = new Option(ENUMERATE_KEYS_OPTION, false, "enumerate keys only"); + options.addOption(optEnumerate); + + Option debugOutput = new Option(DEBUG_OUTPUT_OPTION, false, "CQL row per line internal representation"); + options.addOption(debugOutput); + } + + /** + * Construct table schema from info stored in SSTable's Stats.db + * + * @param desc SSTable's descriptor + * @return Restored CFMetaData + * @throws IOException when Stats.db cannot be read + */ + public static CFMetaData metadataFromSSTable(Descriptor desc) throws IOException + { + if (!desc.version.storeRows()) + throw new IOException("pre-3.0 SSTable is not supported."); + + EnumSet<MetadataType> types = EnumSet.of(MetadataType.VALIDATION, MetadataType.STATS, MetadataType.HEADER); + Map<MetadataType, MetadataComponent> sstableMetadata = desc.getMetadataSerializer().deserialize(desc, types); + ValidationMetadata validationMetadata = (ValidationMetadata) sstableMetadata.get(MetadataType.VALIDATION); + SerializationHeader.Component header = (SerializationHeader.Component) sstableMetadata.get(MetadataType.HEADER); + + IPartitioner partitioner = FBUtilities.newPartitioner(validationMetadata.partitioner); + CFMetaData.Builder builder = CFMetaData.Builder.create("keyspace", "table").withPartitioner(partitioner); + header.getStaticColumns().entrySet().stream() + .forEach(entry -> { + ColumnIdentifier ident = ColumnIdentifier.getInterned(UTF8Type.instance.getString(entry.getKey()), true); + builder.addStaticColumn(ident, entry.getValue()); + }); + header.getRegularColumns().entrySet().stream() + .forEach(entry -> { + ColumnIdentifier ident = ColumnIdentifier.getInterned(UTF8Type.instance.getString(entry.getKey()), true); + builder.addRegularColumn(ident, entry.getValue()); + }); + builder.addPartitionKey("PartitionKey", header.getKetType()); + for (int i = 0; i < header.getClusteringTypes().size(); i++) + { + builder.addClusteringColumn("clustering" + (i > 0 ? i : ""), header.getClusteringTypes().get(i)); + } + return builder.build(); + } + + private static <T> Stream<T> iterToStream(Iterator<T> iter) + { + Spliterator<T> splititer = Spliterators.spliteratorUnknownSize(iter, Spliterator.IMMUTABLE); + return StreamSupport.stream(splititer, false); + } + + /** + * Given arguments specifying an SSTable, and optionally an output file, export the contents of the SSTable to JSON. + * + * @param args + * command lines arguments + * @throws ConfigurationException + * on configuration failure (wrong params given) + */ + public static void main(String[] args) throws ConfigurationException + { + CommandLineParser parser = new PosixParser(); + try + { + cmd = parser.parse(options, args); + } + catch (ParseException e1) + { + System.err.println(e1.getMessage()); + printUsage(); + System.exit(1); + } + + if (cmd.getArgs().length != 1) + { + System.err.println("You must supply exactly one sstable"); + printUsage(); + System.exit(1); + } + + String[] keys = cmd.getOptionValues(KEY_OPTION); + HashSet<String> excludes = new HashSet<>(Arrays.asList( + cmd.getOptionValues(EXCLUDE_KEY_OPTION) == null + ? new String[0] + : cmd.getOptionValues(EXCLUDE_KEY_OPTION))); + String ssTableFileName = new File(cmd.getArgs()[0]).getAbsolutePath(); + + if (Descriptor.isLegacyFile(new File(ssTableFileName))) + { + System.err.println("Unsupported legacy sstable"); + System.exit(1); + } + if (!new File(ssTableFileName).exists()) + { + System.err.println("Cannot find file " + ssTableFileName); + System.exit(1); + } + Descriptor desc = Descriptor.fromFilename(ssTableFileName); + try + { + CFMetaData metadata = metadataFromSSTable(desc); + if (cmd.hasOption(ENUMERATE_KEYS_OPTION)) + { + JsonTransformer.keysToJson(null, iterToStream(new KeyIterator(desc, metadata)), metadata, System.out); + } + else + { + SSTableReader sstable = SSTableReader.openNoValidation(desc, metadata); + IPartitioner partitioner = sstable.getPartitioner(); + final ISSTableScanner currentScanner; + if ((keys != null) && (keys.length > 0)) + { + List<AbstractBounds<PartitionPosition>> bounds = Arrays.stream(keys) + .filter(key -> !excludes.contains(key)) + .map(metadata.getKeyValidator()::fromString) + .map(partitioner::decorateKey) + .sorted() + .map(DecoratedKey::getToken) + .map(token -> new Bounds<>(token.minKeyBound(), token.maxKeyBound())).collect(Collectors.toList()); + currentScanner = sstable.getScanner(bounds.iterator()); + } + else + { + currentScanner = sstable.getScanner(); + } + Stream<UnfilteredRowIterator> partitions = iterToStream(currentScanner).filter(i -> + excludes.isEmpty() || !excludes.contains(metadata.getKeyValidator().getString(i.partitionKey().getKey())) + ); + if (cmd.hasOption(DEBUG_OUTPUT_OPTION)) + { + AtomicLong position = new AtomicLong(); + partitions.forEach(partition -> + { + position.set(currentScanner.getCurrentPosition()); + partition.forEachRemaining(row -> + { + System.out.println( + "[" + metadata.getKeyValidator().getString(partition.partitionKey().getKey()) + "]@" + + position.get() + " " + row.toString(metadata, false, true)); + position.set(currentScanner.getCurrentPosition()); + }); + }); + } + else + { + JsonTransformer.toJson(currentScanner, partitions, metadata, System.out); + } + } + } + catch (IOException e) + { + // throwing exception outside main with broken pipe causes windows cmd to hang + e.printStackTrace(System.err); + } + + System.exit(0); + } + + private static void printUsage() + { + String usage = String.format("sstabledump <options> <sstable file path>%n"); + String header = "Dump contents of given SSTable to standard output in JSON format."; + new HelpFormatter().printHelp(usage, header, options, ""); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/cassandra/blob/71b1c4a6/test/unit/org/apache/cassandra/io/sstable/SSTableScannerTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableScannerTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableScannerTest.java index 272b62f..d73c278 100644 --- a/test/unit/org/apache/cassandra/io/sstable/SSTableScannerTest.java +++ b/test/unit/org/apache/cassandra/io/sstable/SSTableScannerTest.java @@ -24,6 +24,8 @@ import java.util.Collection; import java.util.List; import com.google.common.collect.Iterables; +import com.google.common.util.concurrent.RateLimiter; + import org.junit.BeforeClass; import org.junit.Test; @@ -217,7 +219,7 @@ public class SSTableScannerTest SSTableReader sstable = store.getLiveSSTables().iterator().next(); // full range scan - ISSTableScanner scanner = sstable.getScanner(null); + ISSTableScanner scanner = sstable.getScanner(RateLimiter.create(Double.MAX_VALUE)); for (int i = 2; i < 10; i++) assertEquals(toKey(i), new String(scanner.next().partitionKey().getKey().array())); @@ -323,7 +325,7 @@ public class SSTableScannerTest SSTableReader sstable = store.getLiveSSTables().iterator().next(); // full range scan - ISSTableScanner fullScanner = sstable.getScanner(null); + ISSTableScanner fullScanner = sstable.getScanner(RateLimiter.create(Double.MAX_VALUE)); assertScanContainsRanges(fullScanner, 2, 9, 102, 109, @@ -453,7 +455,7 @@ public class SSTableScannerTest SSTableReader sstable = store.getLiveSSTables().iterator().next(); // full range scan - ISSTableScanner fullScanner = sstable.getScanner(null); + ISSTableScanner fullScanner = sstable.getScanner(RateLimiter.create(Double.MAX_VALUE)); assertScanContainsRanges(fullScanner, 205, 205); // scan three ranges separately http://git-wip-us.apache.org/repos/asf/cassandra/blob/71b1c4a6/tools/bin/sstabledump ---------------------------------------------------------------------- diff --git a/tools/bin/sstabledump b/tools/bin/sstabledump new file mode 100755 index 0000000..7eeb708 --- /dev/null +++ b/tools/bin/sstabledump @@ -0,0 +1,52 @@ +#!/bin/sh + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +if [ "x$CASSANDRA_INCLUDE" = "x" ]; then + for include in "`dirname "$0"`/cassandra.in.sh" \ + "$HOME/.cassandra.in.sh" \ + /usr/share/cassandra/cassandra.in.sh \ + /usr/local/share/cassandra/cassandra.in.sh \ + /opt/cassandra/cassandra.in.sh; do + if [ -r "$include" ]; then + . "$include" + break + fi + done +elif [ -r "$CASSANDRA_INCLUDE" ]; then + . "$CASSANDRA_INCLUDE" +fi + + +# Use JAVA_HOME if set, otherwise look for java in PATH +if [ -x "$JAVA_HOME/bin/java" ]; then + JAVA="$JAVA_HOME/bin/java" +else + JAVA="`which java`" +fi + +if [ -z "$CLASSPATH" ]; then + echo "You must set the CLASSPATH var" >&2 + exit 1 +fi + +"$JAVA" $JAVA_AGENT -cp "$CLASSPATH" $JVM_OPTS -Dstorage-config="$CASSANDRA_CONF" \ + -Dcassandra.storagedir="$cassandra_storagedir" \ + -Dlogback.configurationFile=logback-tools.xml \ + org.apache.cassandra.tools.SSTableExport "$@" + +# vi:ai sw=4 ts=4 tw=0 et http://git-wip-us.apache.org/repos/asf/cassandra/blob/71b1c4a6/tools/bin/sstabledump.bat ---------------------------------------------------------------------- diff --git a/tools/bin/sstabledump.bat b/tools/bin/sstabledump.bat new file mode 100644 index 0000000..0a3a380 --- /dev/null +++ b/tools/bin/sstabledump.bat @@ -0,0 +1,48 @@ +@REM +@REM Licensed to the Apache Software Foundation (ASF) under one or more +@REM contributor license agreements. See the NOTICE file distributed with +@REM this work for additional information regarding copyright ownership. +@REM The ASF licenses this file to You under the Apache License, Version 2.0 +@REM (the "License"); you may not use this file except in compliance with +@REM the License. You may obtain a copy of the License at +@REM +@REM http://www.apache.org/licenses/LICENSE-2.0 +@REM +@REM Unless required by applicable law or agreed to in writing, software +@REM distributed under the License is distributed on an "AS IS" BASIS, +@REM WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +@REM See the License for the specific language governing permissions and +@REM limitations under the License. + +@echo off +if "%OS%" == "Windows_NT" setlocal + +pushd "%~dp0" +call cassandra.in.bat + +if NOT DEFINED CASSANDRA_MAIN set CASSANDRA_MAIN=org.apache.cassandra.tools.SSTableExport +if NOT DEFINED JAVA_HOME goto :err + +REM ***** JAVA options ***** +set JAVA_OPTS=^ + -Dlogback.configurationFile=logback-tools.xml + +set TOOLS_PARAMS= +FOR %%A IN (%*) DO call :appendToolsParams %%A +goto runTool + +:appendToolsParams +set TOOLS_PARAMS=%TOOLS_PARAMS% %1 +goto :eof + +:runTool +"%JAVA_HOME%\bin\java" %JAVA_OPTS% %CASSANDRA_PARAMS% -cp %CASSANDRA_CLASSPATH% "%CASSANDRA_MAIN%" %TOOLS_PARAMS% +goto finally + +:err +echo JAVA_HOME environment variable must be set! +pause + +:finally + +ENDLOCAL