This is an automated email from the ASF dual-hosted git repository. ycai pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra-analytics.git
The following commit(s) were added to refs/heads/trunk by this push: new 014db08 CASSANDRA-18683: Add PartitionSizeTableProvider for reading the compressed and uncompressed sizes of all partitions in a table by utilizing the SSTable Index.db files 014db08 is described below commit 014db08a79f00ef0d94e6855779e398c9dc689c1 Author: James Berragan <jberra...@apple.com> AuthorDate: Wed Jul 19 12:23:07 2023 -0700 CASSANDRA-18683: Add PartitionSizeTableProvider for reading the compressed and uncompressed sizes of all partitions in a table by utilizing the SSTable Index.db files Patch by James Berragan; Reviewed by Dinesh Joshi, Yifan Cai for CASSANDRA-18683 --- CHANGES.txt | 1 + cassandra-analytics-core/build.gradle | 11 + .../cassandra/spark/data/FileSystemSSTable.java | 15 +- .../spark/data/SidecarProvisionedSSTable.java | 10 + .../spark/sparksql/SparkCellIterator.java | 48 +-- .../spark/sparksql/LocalPartitionSizeSource.java | 28 +- .../spark/sparksql/PartitionSizeIterator.java | 93 ++++++ .../spark/sparksql/PartitionSizeTableProvider.java | 178 +++++++++++ .../org/apache/cassandra/spark/TestDataLayer.java | 11 +- .../java/org/apache/cassandra/spark/TestUtils.java | 48 +++ .../apache/cassandra/spark/PartitionSizeTests.java | 95 ++++++ .../apache/cassandra/bridge/CassandraBridge.java | 56 ++-- .../apache/cassandra/spark/data/BasicSupplier.java | 0 .../org/apache/cassandra/spark/data/DataLayer.java | 64 +++- .../org/apache/cassandra/spark/data/SSTable.java | 2 + .../cassandra/spark/reader/EmptyStreamScanner.java | 2 +- ...{EmptyStreamScanner.java => IndexConsumer.java} | 28 +- .../apache/cassandra/spark/reader/IndexEntry.java | 71 +++++ .../cassandra/spark/reader/StreamScanner.java | 29 +- .../reader/common/AbstractCompressionMetadata.java | 15 +- .../IIndexReader.java} | 29 +- .../spark/reader/common/IndexIterator.java | 184 ++++++++++++ .../org/apache/cassandra/spark/stats/Stats.java | 137 +++++++++ .../cassandra/spark/utils/test/TestSSTable.java | 6 + .../cassandra/spark/utils/test/TestSchema.java | 5 +- cassandra-four-zero/build.gradle | 5 + .../bridge/CassandraBridgeImplementation.java | 64 ++-- .../spark/reader/AbstractStreamScanner.java | 2 +- .../cassandra/spark/reader/CdcScannerBuilder.java | 2 +- .../spark/reader/CompressionMetadata.java | 4 +- .../apache/cassandra/spark/reader/IndexReader.java | 324 +++++++++++++++++++++ .../apache/cassandra/spark/reader/ReaderUtils.java | 9 +- .../cassandra/spark/reader/IndexReaderTests.java | 273 +++++++++++++++++ 33 files changed, 1677 insertions(+), 172 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index 41e2b04..37b126d 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 1.0.0 + * Expose per partition on-disk usage through new DataFrame that utilizes the Index.db SSTable file components (CASSANDRA-18683) * Fix bulk writes with Buffered RowBufferMode (CASSANDRA-18692) * Minor Refactoring to Improve Code Reusability (CASSANDRA-18684) * Fix cassandra-analytics-core-example (CASSANDRA-18662) diff --git a/cassandra-analytics-core/build.gradle b/cassandra-analytics-core/build.gradle index 572af8d..e1bd028 100644 --- a/cassandra-analytics-core/build.gradle +++ b/cassandra-analytics-core/build.gradle @@ -170,6 +170,17 @@ project(':cassandra-analytics-core') { finalizedBy(tasks.jacocoTestReport) } /* End: JaCoCo check */ + + configurations { + testArtifacts + } + task testJar(type: Jar) { + baseName = "${project.name}-test" + from sourceSets.test.output + } + artifacts { + testArtifacts testJar + } } private void writeBuildVersion(version, projectDir) { diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/FileSystemSSTable.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/FileSystemSSTable.java index 512f4c9..c8c4296 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/FileSystemSSTable.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/FileSystemSSTable.java @@ -31,8 +31,10 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.cassandra.spark.stats.Stats; +import org.apache.cassandra.spark.utils.IOUtils; import org.apache.cassandra.spark.utils.ThrowableUtils; import org.apache.cassandra.spark.utils.streaming.SSTableInputStream; +import org.jetbrains.annotations.Nullable; class FileSystemSSTable extends SSTable { @@ -75,10 +77,21 @@ class FileSystemSSTable extends SSTable } } + public long length(FileType fileType) + { + return IOUtils.size(resolveComponentFile(fileType)); + } + @Override public boolean isMissing(FileType fileType) { - return FileType.resolveComponentFile(fileType, dataFilePath) == null; + return resolveComponentFile(fileType) == null; + } + + @Nullable + private Path resolveComponentFile(FileType fileType) + { + return FileType.resolveComponentFile(fileType, dataFilePath); } @Override diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/SidecarProvisionedSSTable.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/SidecarProvisionedSSTable.java index e985512..43be542 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/SidecarProvisionedSSTable.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/SidecarProvisionedSSTable.java @@ -127,6 +127,16 @@ public class SidecarProvisionedSSTable extends SSTable return openStream(snapshotFile.fileName, snapshotFile.size, fileType); } + public long length(FileType fileType) + { + ListSnapshotFilesResponse.FileInfo snapshotFile = components.get(fileType); + if (snapshotFile == null) + { + throw new IncompleteSSTableException(fileType); + } + return snapshotFile.size; + } + @Override public boolean isMissing(FileType fileType) { diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/sparksql/SparkCellIterator.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/sparksql/SparkCellIterator.java index f40a88f..f5519bc 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/sparksql/SparkCellIterator.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/sparksql/SparkCellIterator.java @@ -59,13 +59,12 @@ public class SparkCellIterator implements Iterator<Cell>, AutoCloseable private final Stats stats; private final CqlTable cqlTable; private final Object[] values; - private final int numPartitionKeys; private final boolean noValueColumns; @Nullable protected final PruneColumnFilter columnFilter; private final long startTimeNanos; @NotNull - private final StreamScanner scanner; + private final StreamScanner<Rid> scanner; @NotNull private final Rid rid; @@ -88,7 +87,6 @@ public class SparkCellIterator implements Iterator<Cell>, AutoCloseable this.dataLayer = dataLayer; stats = dataLayer.stats(); cqlTable = dataLayer.cqlTable(); - numPartitionKeys = cqlTable.numPartitionKeys(); columnFilter = buildColumnFilter(requiredSchema, cqlTable); if (columnFilter != null) { @@ -118,9 +116,9 @@ public class SparkCellIterator implements Iterator<Cell>, AutoCloseable stats.openedSparkCellIterator(); } - protected StreamScanner openScanner(int partitionId, - @NotNull List<PartitionKeyFilter> partitionKeyFilters, - @Nullable CdcOffsetFilter cdcOffsetFilter) + protected StreamScanner<Rid> openScanner(int partitionId, + @NotNull List<PartitionKeyFilter> partitionKeyFilters, + @Nullable CdcOffsetFilter cdcOffsetFilter) { return dataLayer.openCompactionScanner(partitionId, partitionKeyFilters, columnFilter); } @@ -129,11 +127,11 @@ public class SparkCellIterator implements Iterator<Cell>, AutoCloseable static PruneColumnFilter buildColumnFilter(@Nullable StructType requiredSchema, @NotNull CqlTable cqlTable) { return requiredSchema != null - ? new PruneColumnFilter(Arrays.stream(requiredSchema.fields()) - .map(StructField::name) - .filter(cqlTable::has) - .collect(Collectors.toSet())) - : null; + ? new PruneColumnFilter(Arrays.stream(requiredSchema.fields()) + .map(StructField::name) + .filter(cqlTable::has) + .collect(Collectors.toSet())) + : null; } public boolean noValueColumns() @@ -339,21 +337,28 @@ public class SparkCellIterator implements Iterator<Cell>, AutoCloseable } // Or new partition, so deserialize partition keys and update 'values' array - ByteBuffer partitionKey = rid.getPartitionKey(); - if (numPartitionKeys == 1) + readPartitionKey(rid.getPartitionKey(), cqlTable, this.values, stats); + } + + public static void readPartitionKey(ByteBuffer partitionKey, + CqlTable table, + Object[] values, + Stats stats) + { + if (table.numPartitionKeys() == 1) { // Not a composite partition key - CqlField field = cqlTable.partitionKeys().get(0); - values[field.position()] = deserialize(field, partitionKey); + CqlField field = table.partitionKeys().get(0); + values[field.position()] = deserialize(field, partitionKey, stats); } else { // Split composite partition keys - ByteBuffer[] partitionKeyBufs = ColumnTypes.split(partitionKey, numPartitionKeys); + ByteBuffer[] partitionKeyBufs = ColumnTypes.split(partitionKey, table.numPartitionKeys()); int index = 0; - for (CqlField field : cqlTable.partitionKeys()) + for (CqlField field : table.partitionKeys()) { - values[field.position()] = deserialize(field, partitionKeyBufs[index++]); + values[field.position()] = deserialize(field, partitionKeyBufs[index++], stats); } } } @@ -403,9 +408,14 @@ public class SparkCellIterator implements Iterator<Cell>, AutoCloseable } private Object deserialize(CqlField field, ByteBuffer buffer) + { + return deserialize(field, buffer, stats); + } + + private static Object deserialize(CqlField field, ByteBuffer buffer, Stats stats) { long now = System.nanoTime(); - Object value = buffer != null ? field.deserialize(buffer) : null; + Object value = buffer == null ? null : field.deserialize(buffer); stats.fieldDeserialization(field, System.nanoTime() - now); return value; } diff --git a/cassandra-bridge/src/main/java/org/apache/cassandra/spark/reader/EmptyStreamScanner.java b/cassandra-analytics-core/src/main/spark3/org/apache/cassandra/spark/sparksql/LocalPartitionSizeSource.java similarity index 64% copy from cassandra-bridge/src/main/java/org/apache/cassandra/spark/reader/EmptyStreamScanner.java copy to cassandra-analytics-core/src/main/spark3/org/apache/cassandra/spark/sparksql/LocalPartitionSizeSource.java index b4c8e0e..cf24f1c 100644 --- a/cassandra-bridge/src/main/java/org/apache/cassandra/spark/reader/EmptyStreamScanner.java +++ b/cassandra-analytics-core/src/main/spark3/org/apache/cassandra/spark/sparksql/LocalPartitionSizeSource.java @@ -17,31 +17,23 @@ * under the License. */ -package org.apache.cassandra.spark.reader; +package org.apache.cassandra.spark.sparksql; -public class EmptyStreamScanner implements StreamScanner -{ - public static final EmptyStreamScanner INSTANCE = new EmptyStreamScanner(); - - @Override - public Rid rid() - { - return null; - } - - @Override - public boolean hasNext() - { - return false; - } +import org.apache.cassandra.spark.data.DataLayer; +import org.apache.cassandra.spark.data.LocalDataLayer; +import org.apache.spark.sql.util.CaseInsensitiveStringMap; +public class LocalPartitionSizeSource extends PartitionSizeTableProvider +{ @Override - public void advanceToNextColumn() + public String shortName() { + return "localpartitionsizesource"; } @Override - public void close() + public DataLayer getDataLayer(CaseInsensitiveStringMap options) { + return LocalDataLayer.from(options); } } diff --git a/cassandra-analytics-core/src/main/spark3/org/apache/cassandra/spark/sparksql/PartitionSizeIterator.java b/cassandra-analytics-core/src/main/spark3/org/apache/cassandra/spark/sparksql/PartitionSizeIterator.java new file mode 100644 index 0000000..d1efa94 --- /dev/null +++ b/cassandra-analytics-core/src/main/spark3/org/apache/cassandra/spark/sparksql/PartitionSizeIterator.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cassandra.spark.sparksql; + +import java.io.IOException; + +import org.apache.cassandra.spark.data.CqlTable; +import org.apache.cassandra.spark.data.DataLayer; +import org.apache.cassandra.spark.reader.IndexEntry; +import org.apache.cassandra.spark.reader.StreamScanner; +import org.apache.cassandra.spark.stats.Stats; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.catalyst.expressions.GenericInternalRow; +import org.apache.spark.sql.connector.read.PartitionReader; +import org.jetbrains.annotations.NotNull; + +/** + * Wrapper iterator around IndexIterator to read all Index.db files and return SparkSQL + * rows containing all partition keys and the associated on-disk uncompressed and compressed sizes. + */ +public class PartitionSizeIterator implements PartitionReader<InternalRow> +{ + private final StreamScanner<IndexEntry> it; + private final CqlTable cqlTable; + private final int numPartitionKeys; + private final Stats stats; + private final long startTimeNanos; + private GenericInternalRow curr = null; + + public PartitionSizeIterator(int partitionId, @NotNull DataLayer dataLayer) + { + this.cqlTable = dataLayer.cqlTable(); + this.numPartitionKeys = cqlTable.numPartitionKeys(); + this.stats = dataLayer.stats(); + this.startTimeNanos = System.nanoTime(); + this.it = dataLayer.openPartitionSizeIterator(partitionId); + stats.openedPartitionSizeIterator(System.nanoTime() - startTimeNanos); + } + + /** + * The expected schema is defined in {@link DataLayer#partitionSizeStructType()}. + * It consists of the Cassandra partition keys, appended with the columns "uncompressed" and "compressed". + */ + public boolean next() throws IOException + { + if (it.hasNext()) + { + it.advanceToNextColumn(); + + IndexEntry entry = it.rid(); + Object[] values = new Object[numPartitionKeys + 2]; + + SparkCellIterator.readPartitionKey(entry.getPartitionKey(), cqlTable, values, stats); + values[numPartitionKeys] = entry.getUncompressed(); + values[numPartitionKeys + 1] = entry.getCompressed(); + + this.curr = new GenericInternalRow(values); + stats.emitIndexEntry(entry); + + return true; + } + + return false; + } + + public InternalRow get() + { + return curr; + } + + public void close() throws IOException + { + this.it.close(); + stats.closedPartitionSizeIterator(System.nanoTime() - startTimeNanos); + } +} diff --git a/cassandra-analytics-core/src/main/spark3/org/apache/cassandra/spark/sparksql/PartitionSizeTableProvider.java b/cassandra-analytics-core/src/main/spark3/org/apache/cassandra/spark/sparksql/PartitionSizeTableProvider.java new file mode 100644 index 0000000..1f2eeb0 --- /dev/null +++ b/cassandra-analytics-core/src/main/spark3/org/apache/cassandra/spark/sparksql/PartitionSizeTableProvider.java @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cassandra.spark.sparksql; + +import java.util.Map; +import java.util.Set; +import java.util.stream.IntStream; + +import com.google.common.collect.ImmutableSet; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.spark.data.DataLayer; +import org.apache.spark.TaskContext; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.connector.catalog.SupportsRead; +import org.apache.spark.sql.connector.catalog.Table; +import org.apache.spark.sql.connector.catalog.TableCapability; +import org.apache.spark.sql.connector.expressions.Transform; +import org.apache.spark.sql.connector.read.Batch; +import org.apache.spark.sql.connector.read.InputPartition; +import org.apache.spark.sql.connector.read.PartitionReader; +import org.apache.spark.sql.connector.read.PartitionReaderFactory; +import org.apache.spark.sql.connector.read.Scan; +import org.apache.spark.sql.connector.read.ScanBuilder; +import org.apache.spark.sql.connector.read.streaming.MicroBatchStream; +import org.apache.spark.sql.types.StructType; +import org.apache.spark.sql.util.CaseInsensitiveStringMap; + +public abstract class PartitionSizeTableProvider extends CassandraTableProvider +{ + public abstract DataLayer getDataLayer(CaseInsensitiveStringMap options); + + @Override + public StructType inferSchema(CaseInsensitiveStringMap options) + { + return getDataLayerInternal(options).partitionSizeStructType(); + } + + @Override + public Table getTable(StructType schema, Transform[] partitioning, Map<String, String> properties) + { + return new PartitionSizeTable(getDataLayerInternal(new CaseInsensitiveStringMap(properties)), schema); + } +} + +class PartitionSizeTable implements Table, SupportsRead +{ + private final DataLayer dataLayer; + private final StructType schema; + + PartitionSizeTable(DataLayer dataLayer, StructType schema) + { + this.dataLayer = dataLayer; + this.schema = schema; + } + + @Override + public String name() + { + return dataLayer.cqlTable().keyspace() + "." + dataLayer.cqlTable().table(); + } + + @Override + public StructType schema() + { + return schema; + } + + @Override + public Set<TableCapability> capabilities() + { + return ImmutableSet.of(TableCapability.BATCH_READ); + } + + @Override + public ScanBuilder newScanBuilder(CaseInsensitiveStringMap options) + { + return new PartitionSizeScanBuilder(dataLayer, schema, options); + } +} + +class PartitionSizeScanBuilder implements ScanBuilder, Scan, Batch +{ + final DataLayer dataLayer; + final StructType schema; + final CaseInsensitiveStringMap options; + + PartitionSizeScanBuilder(DataLayer dataLayer, StructType schema, CaseInsensitiveStringMap options) + { + this.dataLayer = dataLayer; + this.schema = schema; + this.options = options; + } + + @Override + public Scan build() + { + return this; + } + + @Override + public StructType readSchema() + { + return schema; + } + + @Override + public Batch toBatch() + { + return this; + } + + @Override + public InputPartition[] planInputPartitions() + { + return IntStream.range(0, this.dataLayer.partitionCount()) + .mapToObj(CassandraInputPartition::new) + .toArray(InputPartition[]::new); + } + + @Override + public MicroBatchStream toMicroBatchStream(String checkpointLocation) + { + return new CassandraMicroBatchStream(dataLayer, null, options); + } + + @Override + public PartitionReaderFactory createReaderFactory() + { + return new PartitionSizeReaderFactory(dataLayer); + } +} + +class PartitionSizeReaderFactory implements PartitionReaderFactory +{ + private static final Logger LOGGER = LoggerFactory.getLogger(CassandraPartitionReaderFactory.class); + final DataLayer dataLayer; + + PartitionSizeReaderFactory(DataLayer dataLayer) + { + this.dataLayer = dataLayer; + } + + @Override + public PartitionReader<InternalRow> createReader(InputPartition partition) + { + int partitionId; + if (partition instanceof CassandraInputPartition) + { + partitionId = ((CassandraInputPartition) partition).getPartitionId(); + } + else + { + partitionId = TaskContext.getPartitionId(); + LOGGER.warn("InputPartition is not of CassandraInputPartition type. Using TaskContext to determine the partitionId type={}, partitionId={}", + partition.getClass().getName(), partitionId); + } + return new PartitionSizeIterator(partitionId, dataLayer); + } +} diff --git a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/TestDataLayer.java b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/TestDataLayer.java index 96267e6..6583c98 100644 --- a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/TestDataLayer.java +++ b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/TestDataLayer.java @@ -31,6 +31,7 @@ import java.util.UUID; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.stream.Collectors; +import java.util.stream.Stream; import com.google.common.util.concurrent.ThreadFactoryBuilder; @@ -42,6 +43,7 @@ import org.apache.cassandra.spark.cdc.TableIdLookup; import org.apache.cassandra.spark.data.BasicSupplier; import org.apache.cassandra.spark.data.CqlTable; import org.apache.cassandra.spark.data.DataLayer; +import org.apache.cassandra.spark.data.SSTable; import org.apache.cassandra.spark.data.SSTablesSupplier; import org.apache.cassandra.spark.data.partitioner.Partitioner; import org.apache.cassandra.spark.sparksql.filters.PartitionKeyFilter; @@ -123,9 +125,12 @@ public class TestDataLayer extends DataLayer @Nullable SparkRangeFilter sparkRangeFilter, @NotNull List<PartitionKeyFilter> partitionKeyFilters) { - return new BasicSupplier(dataDbFiles.stream() - .map(TestSSTable::at) - .collect(Collectors.toSet())); + return new BasicSupplier(listSSTables().collect(Collectors.toSet())); + } + + public Stream<SSTable> listSSTables() + { + return dataDbFiles.stream().map(TestSSTable::at); } @Override diff --git a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/TestUtils.java b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/TestUtils.java index 4b1f726..0144648 100644 --- a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/TestUtils.java +++ b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/TestUtils.java @@ -21,6 +21,7 @@ package org.apache.cassandra.spark; import java.io.IOException; import java.math.BigInteger; +import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Path; import java.util.ArrayList; @@ -38,6 +39,7 @@ import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import org.apache.commons.io.FileUtils; +import org.apache.commons.lang3.StringUtils; import org.apache.cassandra.bridge.CassandraBridge; import org.apache.cassandra.bridge.CassandraBridgeFactory; @@ -50,6 +52,7 @@ import org.apache.cassandra.spark.data.partitioner.CassandraInstance; import org.apache.cassandra.spark.data.partitioner.CassandraRing; import org.apache.cassandra.spark.data.partitioner.Partitioner; import org.apache.cassandra.spark.utils.FilterUtils; +import org.apache.cassandra.spark.utils.RandomUtils; import org.apache.spark.sql.DataFrameReader; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; @@ -190,6 +193,29 @@ public final class TestUtils } } + static Dataset<Row> openLocalPartitionSizeSource(Partitioner partitioner, + Path dir, + String keyspace, + String createStmt, + CassandraVersion version, + Set<CqlField.CqlUdt> udts, + @Nullable String statsClass) + { + DataFrameReader frameReader = SPARK.read().format("org.apache.cassandra.spark.sparksql.LocalPartitionSizeSource") + .option("keyspace", keyspace) + .option("createStmt", createStmt) + .option("dirs", dir.toAbsolutePath().toString()) + .option("version", version.toString()) + .option("useSSTableInputStream", true) // use in the test system to test the SSTableInputStream + .option("partitioner", partitioner.name()) + .option("udts", udts.stream().map(f -> f.createStatement(keyspace)).collect(Collectors.joining("\n"))); + if (statsClass != null) + { + frameReader = frameReader.option("statsClass", statsClass); + } + return frameReader.load(); + } + public static Dataset<Row> read(Path path, StructType schema) { return SPARK.read() @@ -352,4 +378,26 @@ public final class TestUtils }); return filterKeys; } + + public static String randomLowEntropyString() + { + return new String(randomLowEntropyData(), StandardCharsets.UTF_8); + } + + public static byte[] randomLowEntropyData() + { + return randomLowEntropyData(RandomUtils.randomPositiveInt(16384 - 512) + 512); + } + + public static byte[] randomLowEntropyData(int size) + { + return randomLowEntropyData("Hello world!", size); + } + + public static byte[] randomLowEntropyData(String str, int size) + { + return StringUtils.repeat(str, size / str.length() + 1) + .substring(0, size) + .getBytes(StandardCharsets.UTF_8); + } } diff --git a/cassandra-analytics-core/src/test/spark3/org/apache/cassandra/spark/PartitionSizeTests.java b/cassandra-analytics-core/src/test/spark3/org/apache/cassandra/spark/PartitionSizeTests.java new file mode 100644 index 0000000..c08b2b9 --- /dev/null +++ b/cassandra-analytics-core/src/test/spark3/org/apache/cassandra/spark/PartitionSizeTests.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cassandra.spark; + +import java.nio.charset.StandardCharsets; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; + +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.MethodSource; + +import org.apache.cassandra.bridge.CassandraVersion; +import org.apache.cassandra.spark.data.VersionRunner; +import org.apache.cassandra.spark.utils.test.TestSchema; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class PartitionSizeTests extends VersionRunner +{ + @ParameterizedTest + @MethodSource("org.apache.cassandra.spark.data.VersionRunner#versions") + public void testReadingPartitionSize(CassandraVersion version) + { + TestUtils.runTest(version, (partitioner, dir, bridge) -> { + int numRows = Tester.DEFAULT_NUM_ROWS; + int numCols = 25; + TestSchema schema = TestSchema.builder() + .withPartitionKey("a", bridge.text()) + .withClusteringKey("b", bridge.aInt()) + .withColumn("c", bridge.aInt()) + .withColumn("d", bridge.text()).build(); + + Map<String, Integer> sizes = new HashMap<>(numRows); + schema.writeSSTable(dir, bridge, partitioner, (writer) -> { + for (int i = 0; i < numRows; i++) + { + String key = UUID.randomUUID().toString(); + int size = 0; + for (int j = 0; j < numCols; j++) + { + String str = TestUtils.randomLowEntropyString(); + writer.write(key, j, i + j, str); + size += 4 + 4 + str.getBytes(StandardCharsets.UTF_8).length; + } + sizes.put(key, size); + } + }); + + Dataset<Row> ds = TestUtils.openLocalPartitionSizeSource(partitioner, + dir, + schema.keyspace, + schema.createStatement, + version, + Collections.emptySet(), + null); + List<Row> rows = ds.collectAsList(); + assertEquals(numRows, rows.size()); + for (Row row : rows) + { + String key = row.getString(0); + long uncompressed = row.getLong(1); + long compressed = row.getLong(2); + assertTrue(sizes.containsKey(key)); + long len = sizes.get(key); + assertTrue(len < uncompressed); + assertTrue(Math.abs(uncompressed - len) < 500); // uncompressed size should be ~len size but with a fixed overhead + assertTrue(compressed < uncompressed); + assertTrue(compressed / (float) uncompressed < 0.1); + } + }); + } +} diff --git a/cassandra-bridge/src/main/java/org/apache/cassandra/bridge/CassandraBridge.java b/cassandra-bridge/src/main/java/org/apache/cassandra/bridge/CassandraBridge.java index 907c656..0239bdf 100644 --- a/cassandra-bridge/src/main/java/org/apache/cassandra/bridge/CassandraBridge.java +++ b/cassandra-bridge/src/main/java/org/apache/cassandra/bridge/CassandraBridge.java @@ -63,6 +63,8 @@ import org.apache.cassandra.spark.data.CqlTable; import org.apache.cassandra.spark.data.ReplicationFactor; import org.apache.cassandra.spark.data.SSTablesSupplier; import org.apache.cassandra.spark.data.partitioner.Partitioner; +import org.apache.cassandra.spark.reader.IndexEntry; +import org.apache.cassandra.spark.reader.Rid; import org.apache.cassandra.spark.reader.StreamScanner; import org.apache.cassandra.spark.sparksql.filters.CdcOffsetFilter; import org.apache.cassandra.spark.sparksql.filters.PartitionKeyFilter; @@ -94,32 +96,40 @@ public abstract class CassandraBridge // CDC Stream Scanner // CHECKSTYLE IGNORE: Method with many parameters - public abstract StreamScanner getCdcScanner(int partitionId, - @NotNull CqlTable table, - @NotNull Partitioner partitioner, - @NotNull CommitLogProvider commitLogProvider, - @NotNull TableIdLookup tableIdLookup, - @NotNull Stats stats, - @Nullable SparkRangeFilter sparkRangeFilter, - @Nullable CdcOffsetFilter offset, - int minimumReplicasPerMutation, - @NotNull Watermarker watermarker, - @NotNull String jobId, - @NotNull ExecutorService executorService, - @NotNull TimeProvider timeProvider); + public abstract StreamScanner<Rid> getCdcScanner(int partitionId, + @NotNull CqlTable table, + @NotNull Partitioner partitioner, + @NotNull CommitLogProvider commitLogProvider, + @NotNull TableIdLookup tableIdLookup, + @NotNull Stats stats, + @Nullable SparkRangeFilter sparkRangeFilter, + @Nullable CdcOffsetFilter offset, + int minimumReplicasPerMutation, + @NotNull Watermarker watermarker, + @NotNull String jobId, + @NotNull ExecutorService executorService, + @NotNull TimeProvider timeProvider); // Compaction Stream Scanner // CHECKSTYLE IGNORE: Method with many parameters - public abstract StreamScanner getCompactionScanner(@NotNull CqlTable table, - @NotNull Partitioner partitionerType, - @NotNull SSTablesSupplier ssTables, - @Nullable SparkRangeFilter sparkRangeFilter, - @NotNull Collection<PartitionKeyFilter> partitionKeyFilters, - @Nullable PruneColumnFilter columnFilter, - @NotNull TimeProvider timeProvider, - boolean readIndexOffset, - boolean useIncrementalRepair, - @NotNull Stats stats); + public abstract StreamScanner<Rid> getCompactionScanner(@NotNull CqlTable table, + @NotNull Partitioner partitionerType, + @NotNull SSTablesSupplier ssTables, + @Nullable SparkRangeFilter sparkRangeFilter, + @NotNull Collection<PartitionKeyFilter> partitionKeyFilters, + @Nullable PruneColumnFilter columnFilter, + @NotNull TimeProvider timeProvider, + boolean readIndexOffset, + boolean useIncrementalRepair, + @NotNull Stats stats); + + public abstract StreamScanner<IndexEntry> getPartitionSizeIterator(@NotNull CqlTable table, + @NotNull Partitioner partitioner, + @NotNull SSTablesSupplier ssTables, + @Nullable SparkRangeFilter rangeFilter, + @NotNull TimeProvider timeProvider, + @NotNull Stats stats, + @NotNull ExecutorService executor); public abstract CassandraVersion getVersion(); diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/BasicSupplier.java b/cassandra-bridge/src/main/java/org/apache/cassandra/spark/data/BasicSupplier.java similarity index 100% rename from cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/BasicSupplier.java rename to cassandra-bridge/src/main/java/org/apache/cassandra/spark/data/BasicSupplier.java diff --git a/cassandra-bridge/src/main/java/org/apache/cassandra/spark/data/DataLayer.java b/cassandra-bridge/src/main/java/org/apache/cassandra/spark/data/DataLayer.java index b2cd8c0..b7ba896 100644 --- a/cassandra-bridge/src/main/java/org/apache/cassandra/spark/data/DataLayer.java +++ b/cassandra-bridge/src/main/java/org/apache/cassandra/spark/data/DataLayer.java @@ -42,6 +42,8 @@ import org.apache.cassandra.spark.cdc.watermarker.Watermarker; import org.apache.cassandra.spark.config.SchemaFeature; import org.apache.cassandra.spark.data.partitioner.Partitioner; import org.apache.cassandra.spark.reader.EmptyStreamScanner; +import org.apache.cassandra.spark.reader.IndexEntry; +import org.apache.cassandra.spark.reader.Rid; import org.apache.cassandra.spark.reader.StreamScanner; import org.apache.cassandra.spark.sparksql.NoMatchFoundException; import org.apache.cassandra.spark.sparksql.filters.CdcOffsetFilter; @@ -53,6 +55,7 @@ import org.apache.cassandra.spark.utils.TimeProvider; import org.apache.spark.sql.sources.EqualTo; import org.apache.spark.sql.sources.Filter; import org.apache.spark.sql.sources.In; +import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.MetadataBuilder; import org.apache.spark.sql.types.StructType; import org.jetbrains.annotations.NotNull; @@ -67,6 +70,27 @@ public abstract class DataLayer implements Serializable { } + /** + * @return SparkSQL table schema expected for reading Partition sizes with PartitionSizeTableProvider. + */ + public StructType partitionSizeStructType() + { + StructType structType = new StructType(); + for (CqlField field : cqlTable().partitionKeys()) + { + MetadataBuilder metadata = fieldMetaData(field); + structType = structType.add(field.name(), + field.type().sparkSqlType(bigNumberConfig(field)), + true, + metadata.build()); + } + + structType = structType.add("uncompressed", DataTypes.LongType); + structType = structType.add("compressed", DataTypes.LongType); + + return structType; + } + /** * Map Cassandra CQL table schema to SparkSQL StructType * @@ -78,15 +102,7 @@ public abstract class DataLayer implements Serializable for (CqlField field : cqlTable().fields()) { // Pass Cassandra field metadata in StructField metadata - MetadataBuilder metadata = new MetadataBuilder(); - metadata.putLong("position", field.position()); - metadata.putString("cqlType", field.cqlTypeName()); - metadata.putBoolean("isPartitionKey", field.isPartitionKey()); - metadata.putBoolean("isPrimaryKey", field.isPrimaryKey()); - metadata.putBoolean("isClusteringKey", field.isClusteringColumn()); - metadata.putBoolean("isStaticColumn", field.isStaticColumn()); - metadata.putBoolean("isValueColumn", field.isValueColumn()); - + MetadataBuilder metadata = fieldMetaData(field); structType = structType.add(field.name(), field.type().sparkSqlType(bigNumberConfig(field)), true, @@ -103,6 +119,19 @@ public abstract class DataLayer implements Serializable return structType; } + private MetadataBuilder fieldMetaData(CqlField field) + { + MetadataBuilder metadata = new MetadataBuilder(); + metadata.putLong("position", field.position()); + metadata.putString("cqlType", field.cqlTypeName()); + metadata.putBoolean("isPartitionKey", field.isPartitionKey()); + metadata.putBoolean("isPrimaryKey", field.isPrimaryKey()); + metadata.putBoolean("isClusteringKey", field.isClusteringColumn()); + metadata.putBoolean("isStaticColumn", field.isStaticColumn()); + metadata.putBoolean("isValueColumn", field.isValueColumn()); + return metadata; + } + public List<SchemaFeature> requestedFeatures() { return Collections.emptyList(); @@ -271,9 +300,9 @@ public abstract class DataLayer implements Serializable /** * @return CompactionScanner for iterating over one or more SSTables, compacting data and purging tombstones */ - public StreamScanner openCompactionScanner(int partitionId, - List<PartitionKeyFilter> partitionKeyFilters, - @Nullable PruneColumnFilter columnFilter) + public StreamScanner<Rid> openCompactionScanner(int partitionId, + List<PartitionKeyFilter> partitionKeyFilters, + @Nullable PruneColumnFilter columnFilter) { List<PartitionKeyFilter> filtersInRange; try @@ -297,6 +326,17 @@ public abstract class DataLayer implements Serializable stats()); } + /** + * @param partitionId Spark partition id + * @return a PartitionSizeIterator that iterates over Index.db files to calculate partition size. + */ + public StreamScanner<IndexEntry> openPartitionSizeIterator(int partitionId) + { + SparkRangeFilter rangeFilter = sparkRangeFilter(partitionId); + return bridge().getPartitionSizeIterator(cqlTable(), partitioner(), sstables(partitionId, rangeFilter, Collections.emptyList()), + rangeFilter, timeProvider(), stats(), executorService()); + } + /** * @return a TimeProvider that returns the time now in seconds. User can override with their own provider */ diff --git a/cassandra-bridge/src/main/java/org/apache/cassandra/spark/data/SSTable.java b/cassandra-bridge/src/main/java/org/apache/cassandra/spark/data/SSTable.java index 4ae1198..39cf693 100644 --- a/cassandra-bridge/src/main/java/org/apache/cassandra/spark/data/SSTable.java +++ b/cassandra-bridge/src/main/java/org/apache/cassandra/spark/data/SSTable.java @@ -77,6 +77,8 @@ public abstract class SSTable implements Serializable return Objects.requireNonNull(openInputStream(FileType.DATA), "Data.db SSTable file component must exist"); } + public abstract long length(FileType fileType); + public abstract boolean isMissing(FileType fileType); public void verify() throws IncompleteSSTableException diff --git a/cassandra-bridge/src/main/java/org/apache/cassandra/spark/reader/EmptyStreamScanner.java b/cassandra-bridge/src/main/java/org/apache/cassandra/spark/reader/EmptyStreamScanner.java index b4c8e0e..822a8cf 100644 --- a/cassandra-bridge/src/main/java/org/apache/cassandra/spark/reader/EmptyStreamScanner.java +++ b/cassandra-bridge/src/main/java/org/apache/cassandra/spark/reader/EmptyStreamScanner.java @@ -19,7 +19,7 @@ package org.apache.cassandra.spark.reader; -public class EmptyStreamScanner implements StreamScanner +public class EmptyStreamScanner implements StreamScanner<Rid> { public static final EmptyStreamScanner INSTANCE = new EmptyStreamScanner(); diff --git a/cassandra-bridge/src/main/java/org/apache/cassandra/spark/reader/EmptyStreamScanner.java b/cassandra-bridge/src/main/java/org/apache/cassandra/spark/reader/IndexConsumer.java similarity index 68% copy from cassandra-bridge/src/main/java/org/apache/cassandra/spark/reader/EmptyStreamScanner.java copy to cassandra-bridge/src/main/java/org/apache/cassandra/spark/reader/IndexConsumer.java index b4c8e0e..c372bda 100644 --- a/cassandra-bridge/src/main/java/org/apache/cassandra/spark/reader/EmptyStreamScanner.java +++ b/cassandra-bridge/src/main/java/org/apache/cassandra/spark/reader/IndexConsumer.java @@ -19,29 +19,11 @@ package org.apache.cassandra.spark.reader; -public class EmptyStreamScanner implements StreamScanner -{ - public static final EmptyStreamScanner INSTANCE = new EmptyStreamScanner(); - - @Override - public Rid rid() - { - return null; - } +import java.util.function.Consumer; - @Override - public boolean hasNext() - { - return false; - } - - @Override - public void advanceToNextColumn() - { - } +public interface IndexConsumer extends Consumer<IndexEntry> +{ + void onFailure(Throwable t); - @Override - public void close() - { - } + void onFinished(long runtimeNanos); } diff --git a/cassandra-bridge/src/main/java/org/apache/cassandra/spark/reader/IndexEntry.java b/cassandra-bridge/src/main/java/org/apache/cassandra/spark/reader/IndexEntry.java new file mode 100644 index 0000000..b86a2ef --- /dev/null +++ b/cassandra-bridge/src/main/java/org/apache/cassandra/spark/reader/IndexEntry.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cassandra.spark.reader; + +import java.math.BigInteger; +import java.nio.ByteBuffer; + +import org.apache.cassandra.spark.utils.ByteBufferUtils; + +public class IndexEntry +{ + public final ByteBuffer partitionKey; + public final BigInteger token; + public final long uncompressed; + public final long compressed; + + public IndexEntry(ByteBuffer partitionKey, + BigInteger token, + long uncompressed, + long compressed) + { + this.partitionKey = partitionKey; + this.token = token; + this.uncompressed = uncompressed; + this.compressed = compressed; + } + + public ByteBuffer getPartitionKey() + { + return this.partitionKey; + } + + public BigInteger getToken() + { + return this.token; + } + + public long getCompressed() + { + return compressed; + } + + public long getUncompressed() + { + return uncompressed; + } + + @Override + public String toString() + { + return ByteBufferUtils.toHexString(this.partitionKey) + + ": " + uncompressed + " uncompressed bytes, " + compressed + " compressed bytes"; + } +} diff --git a/cassandra-bridge/src/main/java/org/apache/cassandra/spark/reader/StreamScanner.java b/cassandra-bridge/src/main/java/org/apache/cassandra/spark/reader/StreamScanner.java index 2f58f19..2a4e975 100644 --- a/cassandra-bridge/src/main/java/org/apache/cassandra/spark/reader/StreamScanner.java +++ b/cassandra-bridge/src/main/java/org/apache/cassandra/spark/reader/StreamScanner.java @@ -28,19 +28,19 @@ import java.io.IOException; * they belong to: * <p> * Cassandra: - * r1 | c1, c2, c3 - * r2 | c4 - * r3 | c5, c6, c7, c8 + * r1 | c1, c2, c3 + * r2 | c4 + * r3 | c5, c6, c7, c8 * <p> * Pivoted: - * r1 | c1 - * r1 | c2 - * r1 | c3 - * r2 | c4 - * r3 | c5 - * r3 | c6 - * r3 | c7 - * r3 | c8 + * r1 | c1 + * r1 | c2 + * r1 | c3 + * r2 | c4 + * r3 | c5 + * r3 | c6 + * r3 | c7 + * r3 | c8 * <p> * During a loading operation we will extract up to a few trillion items out of SSTables, so it is of * high importance to reuse objects - the caller to the scanner creates a rid using the @@ -49,9 +49,10 @@ import java.io.IOException; * <p> * Upon return from the next() call the current values of the scanner can be obtained by calling * the methods in Rid, getPartitionKey(), getColumnName(), getValue(). + * @param <Type> type of object returned by rid() method. */ @SuppressWarnings("unused") -public interface StreamScanner extends Closeable +public interface StreamScanner<Type> extends Closeable { /** * Expose the data/rid to be consumed. @@ -59,13 +60,13 @@ public interface StreamScanner extends Closeable * * @return rid */ - Rid rid(); + Type rid(); /** * Indicate if there are more data/rid avaiable * * @return true when the rid is available to be consumed; - * otherwise, return false to indicate the scanner has exhausted + * otherwise, return false to indicate the scanner has exhausted * @throws IOException */ boolean hasNext() throws IOException; diff --git a/cassandra-bridge/src/main/java/org/apache/cassandra/spark/reader/common/AbstractCompressionMetadata.java b/cassandra-bridge/src/main/java/org/apache/cassandra/spark/reader/common/AbstractCompressionMetadata.java index 985b827..4fa69d4 100644 --- a/cassandra-bridge/src/main/java/org/apache/cassandra/spark/reader/common/AbstractCompressionMetadata.java +++ b/cassandra-bridge/src/main/java/org/apache/cassandra/spark/reader/common/AbstractCompressionMetadata.java @@ -45,7 +45,7 @@ public abstract class AbstractCompressionMetadata return dataLength; } - private Chunk chunkAtIndex(int index) + public Chunk chunkAtIndex(int index) { long chunkOffset = chunkOffsets.get(index); long nextChunkOffset = (index + 1 == chunkOffsets.size) ? -1 : chunkOffsets.get(index + 1); @@ -54,6 +54,15 @@ public abstract class AbstractCompressionMetadata return new Chunk(chunkOffset, (nextChunkOffset == -1) ? -1 : (int) (nextChunkOffset - chunkOffset - 4)); } + /** + * @param position uncompressed position + * @return the compressed chunk index for an uncompressed position. + */ + public int chunkIdx(long position) + { + return (int) (position / chunkLength()); + } + /** * Get a chunk of compressed data (offset, length) corresponding to given position * @@ -64,7 +73,7 @@ public abstract class AbstractCompressionMetadata public Chunk chunkAtPosition(long position) throws IOException { // Position of the chunk - int index = (int) (position / chunkLength()); + int index = chunkIdx(position); if (index >= chunkOffsets.size) { @@ -82,7 +91,7 @@ public abstract class AbstractCompressionMetadata public final long offset; public int length; // CHECKSTYLE IGNORE: Public mutable field - Chunk(long offset, int length) + public Chunk(long offset, int length) { this.offset = offset; this.length = length; diff --git a/cassandra-bridge/src/main/java/org/apache/cassandra/spark/reader/EmptyStreamScanner.java b/cassandra-bridge/src/main/java/org/apache/cassandra/spark/reader/common/IIndexReader.java similarity index 64% copy from cassandra-bridge/src/main/java/org/apache/cassandra/spark/reader/EmptyStreamScanner.java copy to cassandra-bridge/src/main/java/org/apache/cassandra/spark/reader/common/IIndexReader.java index b4c8e0e..6930655 100644 --- a/cassandra-bridge/src/main/java/org/apache/cassandra/spark/reader/EmptyStreamScanner.java +++ b/cassandra-bridge/src/main/java/org/apache/cassandra/spark/reader/common/IIndexReader.java @@ -17,31 +17,10 @@ * under the License. */ -package org.apache.cassandra.spark.reader; +package org.apache.cassandra.spark.reader.common; -public class EmptyStreamScanner implements StreamScanner -{ - public static final EmptyStreamScanner INSTANCE = new EmptyStreamScanner(); - - @Override - public Rid rid() - { - return null; - } - - @Override - public boolean hasNext() - { - return false; - } +import org.apache.cassandra.spark.reader.SparkSSTableReader; - @Override - public void advanceToNextColumn() - { - } - - @Override - public void close() - { - } +public interface IIndexReader extends SparkSSTableReader +{ } diff --git a/cassandra-bridge/src/main/java/org/apache/cassandra/spark/reader/common/IndexIterator.java b/cassandra-bridge/src/main/java/org/apache/cassandra/spark/reader/common/IndexIterator.java new file mode 100644 index 0000000..1103153 --- /dev/null +++ b/cassandra-bridge/src/main/java/org/apache/cassandra/spark/reader/common/IndexIterator.java @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cassandra.spark.reader.common; + +import java.io.IOException; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.spark.data.SSTable; +import org.apache.cassandra.spark.data.SSTablesSupplier; +import org.apache.cassandra.spark.reader.IndexConsumer; +import org.apache.cassandra.spark.reader.IndexEntry; +import org.apache.cassandra.spark.reader.StreamScanner; +import org.apache.cassandra.spark.stats.Stats; +import org.jetbrains.annotations.NotNull; + +/** + * Iterator for reading through IndexEntries for multiple Index.db files. + * + * @param <ReaderType> + */ +public class IndexIterator<ReaderType extends IIndexReader> implements StreamScanner<IndexEntry>, IndexConsumer +{ + private static final Logger LOGGER = LoggerFactory.getLogger(IndexIterator.class); + + private final AtomicInteger finished = new AtomicInteger(0); + private final AtomicReference<Throwable> failure = new AtomicReference<>(); + + private final Set<ReaderType> readers; + private final LinkedBlockingQueue<IndexEntry> queue = new LinkedBlockingQueue<>(); + private final long startTimeNanos; + private final Stats stats; + + private final AtomicBoolean closed = new AtomicBoolean(false); + private IndexEntry curr = null; + + public IndexIterator(@NotNull SSTablesSupplier ssTables, + @NotNull Stats stats, + @NotNull IndexReaderOpener<ReaderType> supplier) + { + this.startTimeNanos = System.nanoTime(); + this.stats = stats; + this.readers = ssTables.openAll((ssTable, isRepairPrimary) -> supplier.openReader(ssTable, isRepairPrimary, this)); + stats.openedIndexFiles(System.nanoTime() - startTimeNanos); + } + + public interface IndexReaderOpener<ReaderType extends IIndexReader> + { + ReaderType openReader(SSTable ssTable, boolean isRepairPrimary, IndexConsumer consumer) throws IOException; + } + + public void accept(IndexEntry wrapper) + { + if (closed.get()) + { + return; + } + + try + { + queue.put(wrapper); + stats.indexEntryConsumed(); + } + catch (InterruptedException e) + { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } + } + + public IndexEntry rid() + { + return Objects.requireNonNull(curr, "advanceToNextColumn() must be called before data()"); + } + + public boolean hasNext() throws IOException + { + return isRunning() && noFailures() && (hasPendingItems() || notFinished()); + } + + protected boolean isRunning() + { + return !closed.get(); + } + + protected boolean hasPendingItems() + { + return !queue.isEmpty(); + } + + protected boolean noFailures() + { + return !maybeFail(); + } + + protected boolean maybeFail() + { + Throwable t = failure.get(); + if (t != null) + { + throw new RuntimeException(t); + } + return false; + } + + private boolean notFinished() + { + return !isFinished(); + } + + private boolean isFinished() + { + return finished.get() == readers.size(); + } + + public void advanceToNextColumn() + { + if (closed.get()) + { + throw new IllegalStateException("Iterator closed"); + } + + try + { + long startTimeNanos = System.nanoTime(); + this.curr = queue.take(); + stats.indexIteratorTimeBlocked(System.nanoTime() - startTimeNanos); + } + catch (InterruptedException e) + { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } + } + + public void close() + { + if (!this.closed.get() && this.closed.compareAndSet(false, true)) + { + queue.clear(); + stats.closedIndexIterator(System.nanoTime() - startTimeNanos); + } + } + + public void onFailure(Throwable t) + { + LOGGER.warn("IndexReader failed with exception", t); + stats.indexReaderFailure(t); + if (failure.get() == null) + { + failure.compareAndSet(null, t); + } + } + + public void onFinished(long runtimeNanos) + { + stats.indexReaderFinished(runtimeNanos); + finished.incrementAndGet(); + } +} diff --git a/cassandra-bridge/src/main/java/org/apache/cassandra/spark/stats/Stats.java b/cassandra-bridge/src/main/java/org/apache/cassandra/spark/stats/Stats.java index 45e5400..7593bf8 100644 --- a/cassandra-bridge/src/main/java/org/apache/cassandra/spark/stats/Stats.java +++ b/cassandra-bridge/src/main/java/org/apache/cassandra/spark/stats/Stats.java @@ -28,6 +28,7 @@ import org.apache.cassandra.spark.cdc.IPartitionUpdateWrapper; import org.apache.cassandra.spark.data.CqlField; import org.apache.cassandra.spark.data.SSTable; import org.apache.cassandra.spark.data.SSTablesSupplier; +import org.apache.cassandra.spark.reader.IndexEntry; import org.apache.cassandra.spark.sparksql.filters.PartitionKeyFilter; import org.apache.cassandra.spark.sparksql.filters.SparkRangeFilter; import org.apache.cassandra.spark.utils.streaming.SSTableSource; @@ -664,4 +665,140 @@ public abstract class Stats public void droppedOldMutation(long maxTimestampMicros) { } + + // PartitionSizeIterator stats + + /** + * @param timeToOpenNanos time taken to open PartitionSizeIterator in nanos + */ + public void openedPartitionSizeIterator(long timeToOpenNanos) + { + + } + + /** + * @param entry emitted single IndexEntry. + */ + public void emitIndexEntry(IndexEntry entry) + { + + } + + /** + * @param timeNanos the time in nanos spent blocking waiting for next IndexEntry. + */ + public void indexIteratorTimeBlocked(long timeNanos) + { + + } + + /** + * @param timeNanos time taken to for PartitionSizeIterator to run in nanos. + */ + public void closedPartitionSizeIterator(long timeNanos) + { + + } + + /** + * @param timeToOpenNanos time taken to open Index.db files in nanos + */ + public void openedIndexFiles(long timeToOpenNanos) + { + + } + + /** + * @param timeToOpenNanos time in nanos the IndexIterator was open for. + */ + public void closedIndexIterator(long timeToOpenNanos) + { + + } + + /** + * An index reader closed with a failure. + * + * @param t throwable + */ + public void indexReaderFailure(Throwable t) + { + + } + + /** + * An index reader closed successfully. + * + * @param runtimeNanos time in nanos the IndexReader was open for. + */ + public void indexReaderFinished(long runtimeNanos) + { + + } + + /** + * IndexReader skipped out-of-range partition keys. + * + * @param skipAhead number of bytes skipped. + */ + public void indexBytesSkipped(long skipAhead) + { + + } + + /** + * IndexReader read bytes. + * + * @param bytesRead number of bytes read. + */ + public void indexBytesRead(long bytesRead) + { + + } + + /** + * When a single index entry is consumer. + */ + public void indexEntryConsumed() + { + + } + + /** + * The Summary.db file was read to check start-end token range of associated Index.db file. + * + * @param timeNanos time taken in nanos. + */ + public void indexSummaryFileRead(long timeNanos) + { + + } + + /** + * CompressionInfo.db file was read. + * + * @param timeNanos time taken in nanos. + */ + public void indexCompressionFileRead(long timeNanos) + { + + } + + /** + * Index.db was fully read + * + * @param timeNanos time taken in nanos. + */ + public void indexFileRead(long timeNanos) + { + + } + + /** + * When an Index.db file can be fully skipped because it does not overlap with token range. + */ + public void indexFileSkipped() + { + + } } diff --git a/cassandra-bridge/src/main/java/org/apache/cassandra/spark/utils/test/TestSSTable.java b/cassandra-bridge/src/main/java/org/apache/cassandra/spark/utils/test/TestSSTable.java index 423ea5a..d7e42c4 100644 --- a/cassandra-bridge/src/main/java/org/apache/cassandra/spark/utils/test/TestSSTable.java +++ b/cassandra-bridge/src/main/java/org/apache/cassandra/spark/utils/test/TestSSTable.java @@ -35,6 +35,7 @@ import com.google.common.annotations.VisibleForTesting; import org.apache.cassandra.spark.data.FileType; import org.apache.cassandra.spark.data.SSTable; +import org.apache.cassandra.spark.utils.IOUtils; import org.jetbrains.annotations.Nullable; public final class TestSSTable extends SSTable @@ -108,6 +109,11 @@ public final class TestSSTable extends SSTable } } + public long length(FileType fileType) + { + return IOUtils.size(FileType.resolveComponentFile(fileType, dataFile)); + } + public boolean isMissing(FileType fileType) { return FileType.resolveComponentFile(fileType, dataFile) == null; diff --git a/cassandra-bridge/src/main/java/org/apache/cassandra/spark/utils/test/TestSchema.java b/cassandra-bridge/src/main/java/org/apache/cassandra/spark/utils/test/TestSchema.java index f5022f4..3f738e0 100644 --- a/cassandra-bridge/src/main/java/org/apache/cassandra/spark/utils/test/TestSchema.java +++ b/cassandra-bridge/src/main/java/org/apache/cassandra/spark/utils/test/TestSchema.java @@ -192,6 +192,8 @@ public final class TestSchema public final String keyspace; public final String table; public final String createStatement; + public final ReplicationFactor rf = new ReplicationFactor(ReplicationFactor.ReplicationStrategy.NetworkTopologyStrategy, + ImmutableMap.of("DC1", 3)); public final String insertStatement; public final String updateStatement; public final String deleteStatement; @@ -428,8 +430,7 @@ public final class TestSchema return new CqlTable(keyspace, table, createStatement, - new ReplicationFactor(ReplicationFactor.ReplicationStrategy.NetworkTopologyStrategy, - ImmutableMap.of("DC1", 3)), + rf, allFields, udts, 0); diff --git a/cassandra-four-zero/build.gradle b/cassandra-four-zero/build.gradle index 01bbe7c..cee96b7 100644 --- a/cassandra-four-zero/build.gradle +++ b/cassandra-four-zero/build.gradle @@ -40,6 +40,7 @@ project(':cassandra-four-zero') { testImplementation(project(':cassandra-bridge')) + testImplementation project(path: ':cassandra-analytics-core', configuration: 'testArtifacts') testImplementation("org.junit.jupiter:junit-jupiter-api:${project.junitVersion}") testImplementation("org.junit.jupiter:junit-jupiter-params:${project.junitVersion}") testImplementation("org.junit.jupiter:junit-jupiter-engine:${project.junitVersion}") @@ -57,6 +58,10 @@ project(':cassandra-four-zero') { enabled = false } + test { + useJUnitPlatform() + } + shadowJar { archiveFileName = 'four-zero.jar' // Must match label in CassandraVersion (without extension) zip64 = true diff --git a/cassandra-four-zero/src/main/java/org/apache/cassandra/bridge/CassandraBridgeImplementation.java b/cassandra-four-zero/src/main/java/org/apache/cassandra/bridge/CassandraBridgeImplementation.java index 9b50486..6dabed8 100644 --- a/cassandra-four-zero/src/main/java/org/apache/cassandra/bridge/CassandraBridgeImplementation.java +++ b/cassandra-four-zero/src/main/java/org/apache/cassandra/bridge/CassandraBridgeImplementation.java @@ -133,8 +133,12 @@ import org.apache.cassandra.spark.data.types.VarChar; import org.apache.cassandra.spark.data.types.VarInt; import org.apache.cassandra.spark.reader.CdcScannerBuilder; import org.apache.cassandra.spark.reader.CompactionStreamScanner; +import org.apache.cassandra.spark.reader.IndexEntry; +import org.apache.cassandra.spark.reader.IndexReader; +import org.apache.cassandra.spark.reader.Rid; import org.apache.cassandra.spark.reader.SchemaBuilder; import org.apache.cassandra.spark.reader.StreamScanner; +import org.apache.cassandra.spark.reader.common.IndexIterator; import org.apache.cassandra.spark.sparksql.filters.CdcOffsetFilter; import org.apache.cassandra.spark.sparksql.filters.PartitionKeyFilter; import org.apache.cassandra.spark.sparksql.filters.PruneColumnFilter; @@ -257,19 +261,19 @@ public class CassandraBridgeImplementation extends CassandraBridge } @Override - public StreamScanner getCdcScanner(int partitionId, - @NotNull CqlTable table, - @NotNull Partitioner partitioner, - @NotNull CommitLogProvider commitLogProvider, - @NotNull TableIdLookup tableIdLookup, - @NotNull Stats stats, - @Nullable SparkRangeFilter sparkRangeFilter, - @Nullable CdcOffsetFilter offset, - int minimumReplicasPerMutation, - @NotNull Watermarker watermarker, - @NotNull String jobId, - @NotNull ExecutorService executorService, - @NotNull TimeProvider timeProvider) + public StreamScanner<Rid> getCdcScanner(int partitionId, + @NotNull CqlTable table, + @NotNull Partitioner partitioner, + @NotNull CommitLogProvider commitLogProvider, + @NotNull TableIdLookup tableIdLookup, + @NotNull Stats stats, + @Nullable SparkRangeFilter sparkRangeFilter, + @Nullable CdcOffsetFilter offset, + int minimumReplicasPerMutation, + @NotNull Watermarker watermarker, + @NotNull String jobId, + @NotNull ExecutorService executorService, + @NotNull TimeProvider timeProvider) { // NOTE: Need to use SchemaBuilder to init keyspace if not already set in Cassandra schema instance UUID tableId = tableIdLookup.lookup(table.keyspace(), table.table()); @@ -300,16 +304,16 @@ public class CassandraBridgeImplementation extends CassandraBridge } @Override - public StreamScanner getCompactionScanner(@NotNull CqlTable table, - @NotNull Partitioner partitioner, - @NotNull SSTablesSupplier ssTables, - @Nullable SparkRangeFilter sparkRangeFilter, - @NotNull Collection<PartitionKeyFilter> partitionKeyFilters, - @Nullable PruneColumnFilter columnFilter, - @NotNull TimeProvider timeProvider, - boolean readIndexOffset, - boolean useIncrementalRepair, - @NotNull Stats stats) + public StreamScanner<Rid> getCompactionScanner(@NotNull CqlTable table, + @NotNull Partitioner partitioner, + @NotNull SSTablesSupplier ssTables, + @Nullable SparkRangeFilter sparkRangeFilter, + @NotNull Collection<PartitionKeyFilter> partitionKeyFilters, + @Nullable PruneColumnFilter columnFilter, + @NotNull TimeProvider timeProvider, + boolean readIndexOffset, + boolean useIncrementalRepair, + @NotNull Stats stats) { // NOTE: Need to use SchemaBuilder to init keyspace if not already set in Cassandra Schema instance SchemaBuilder schemaBuilder = new SchemaBuilder(table, partitioner); @@ -326,6 +330,20 @@ public class CassandraBridgeImplementation extends CassandraBridge .build())); } + public StreamScanner<IndexEntry> getPartitionSizeIterator(@NotNull CqlTable table, + @NotNull Partitioner partitioner, + @NotNull SSTablesSupplier ssTables, + @Nullable SparkRangeFilter rangeFilter, + @NotNull TimeProvider timeProvider, + @NotNull Stats stats, + @NotNull ExecutorService executor) + { + //NOTE: need to use SchemaBuilder to init keyspace if not already set in C* Schema instance + SchemaBuilder schemaBuilder = new SchemaBuilder(table, partitioner); + TableMetadata metadata = schemaBuilder.tableMetaData(); + return new IndexIterator<>(ssTables, stats, ((ssTable, isRepairPrimary, consumer) -> new IndexReader(ssTable, metadata, rangeFilter, stats, consumer))); + } + @Override public CassandraVersion getVersion() { diff --git a/cassandra-four-zero/src/main/java/org/apache/cassandra/spark/reader/AbstractStreamScanner.java b/cassandra-four-zero/src/main/java/org/apache/cassandra/spark/reader/AbstractStreamScanner.java index 9cae19f..3e9b72d 100644 --- a/cassandra-four-zero/src/main/java/org/apache/cassandra/spark/reader/AbstractStreamScanner.java +++ b/cassandra-four-zero/src/main/java/org/apache/cassandra/spark/reader/AbstractStreamScanner.java @@ -54,7 +54,7 @@ import org.apache.cassandra.spark.utils.TimeProvider; import org.apache.cassandra.utils.ByteBufferUtil; import org.jetbrains.annotations.NotNull; -public abstract class AbstractStreamScanner implements StreamScanner, Closeable +public abstract class AbstractStreamScanner implements StreamScanner<Rid>, Closeable { // All partitions in the SSTable private UnfilteredPartitionIterator allPartitions; diff --git a/cassandra-four-zero/src/main/java/org/apache/cassandra/spark/reader/CdcScannerBuilder.java b/cassandra-four-zero/src/main/java/org/apache/cassandra/spark/reader/CdcScannerBuilder.java index bd7d17f..a1d9112 100644 --- a/cassandra-four-zero/src/main/java/org/apache/cassandra/spark/reader/CdcScannerBuilder.java +++ b/cassandra-four-zero/src/main/java/org/apache/cassandra/spark/reader/CdcScannerBuilder.java @@ -239,7 +239,7 @@ public class CdcScannerBuilder return null; } - public StreamScanner build() + public StreamScanner<Rid> build() { // Block on futures to read all CommitLog mutations and pass over to SortedStreamScanner List<PartitionUpdateWrapper> updates = futures.values().stream() diff --git a/cassandra-four-zero/src/main/java/org/apache/cassandra/spark/reader/CompressionMetadata.java b/cassandra-four-zero/src/main/java/org/apache/cassandra/spark/reader/CompressionMetadata.java index e820180..1c89853 100644 --- a/cassandra-four-zero/src/main/java/org/apache/cassandra/spark/reader/CompressionMetadata.java +++ b/cassandra-four-zero/src/main/java/org/apache/cassandra/spark/reader/CompressionMetadata.java @@ -34,7 +34,9 @@ import org.apache.cassandra.spark.reader.common.BigLongArray; /** * Holds metadata about compressed file */ -final class CompressionMetadata extends AbstractCompressionMetadata +// CompressionMetadata is mocked in IndexReaderTests and mockito does not support mocking final classes +// CHECKSTYLE IGNORE: FinalClass +class CompressionMetadata extends AbstractCompressionMetadata { private final CompressionParams parameters; diff --git a/cassandra-four-zero/src/main/java/org/apache/cassandra/spark/reader/IndexReader.java b/cassandra-four-zero/src/main/java/org/apache/cassandra/spark/reader/IndexReader.java new file mode 100644 index 0000000..1c57d3a --- /dev/null +++ b/cassandra-four-zero/src/main/java/org/apache/cassandra/spark/reader/IndexReader.java @@ -0,0 +1,324 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cassandra.spark.reader; + +import java.io.DataInputStream; +import java.io.EOFException; +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.math.BigInteger; +import java.nio.ByteBuffer; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.bridge.TokenRange; +import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.dht.IPartitioner; +import org.apache.cassandra.io.sstable.Descriptor; +import org.apache.cassandra.io.sstable.format.Version; +import org.apache.cassandra.schema.TableMetadata; +import org.apache.cassandra.spark.data.FileType; +import org.apache.cassandra.spark.data.IncompleteSSTableException; +import org.apache.cassandra.spark.data.SSTable; +import org.apache.cassandra.spark.reader.common.AbstractCompressionMetadata; +import org.apache.cassandra.spark.reader.common.IIndexReader; +import org.apache.cassandra.spark.sparksql.filters.SparkRangeFilter; +import org.apache.cassandra.spark.stats.Stats; +import org.apache.cassandra.spark.utils.ByteBufferUtils; +import org.apache.cassandra.utils.vint.VIntCoding; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; + +public class IndexReader implements IIndexReader +{ + private static final Logger LOGGER = LoggerFactory.getLogger(IndexReader.class); + + private TokenRange ssTableRange = null; + + public IndexReader(@NotNull SSTable ssTable, + @NotNull TableMetadata metadata, + @Nullable SparkRangeFilter rangeFilter, + @NotNull Stats stats, + @NotNull IndexConsumer consumer) + { + long now = System.nanoTime(); + long startTimeNanos = now; + try + { + File file = SSTableReader.constructFilename(metadata.keyspace, metadata.name, ssTable.getDataFileName()); + Descriptor descriptor = Descriptor.fromFilename(file); + Version version = descriptor.version; + + // if there is a range filter we can use the Summary.db file to seek to approximate start token range location in Index.db file + long skipAhead = -1; + now = System.nanoTime(); + if (rangeFilter != null) + { + SummaryDbUtils.Summary summary = SSTableCache.INSTANCE.keysFromSummary(metadata, ssTable); + if (summary != null) + { + this.ssTableRange = TokenRange.closed(ReaderUtils.tokenToBigInteger(summary.first().getToken()), + ReaderUtils.tokenToBigInteger(summary.last().getToken())); + if (!rangeFilter.overlaps(this.ssTableRange)) + { + LOGGER.info("Skipping non-overlapping Index.db file rangeFilter='[{},{}]' sstableRange='[{},{}]'", + rangeFilter.tokenRange().lowerEndpoint(), rangeFilter.tokenRange().upperEndpoint(), + this.ssTableRange.lowerEndpoint(), this.ssTableRange.upperEndpoint()); + stats.indexFileSkipped(); + return; + } + + skipAhead = summary.summary().getPosition( + SummaryDbUtils.binarySearchSummary(summary.summary(), metadata.partitioner, rangeFilter.tokenRange().lowerEndpoint()) + ); + stats.indexSummaryFileRead(System.nanoTime() - now); + now = System.nanoTime(); + } + } + + // read CompressionMetadata if it exists + CompressionMetadata compressionMetadata = getCompressionMetadata(ssTable, version.hasMaxCompressedLength()); + if (compressionMetadata != null) + { + stats.indexCompressionFileRead(System.nanoTime() - now); + now = System.nanoTime(); + } + + // read through Index.db and consume Partition keys + try (InputStream is = ssTable.openPrimaryIndexStream()) + { + if (is == null) + { + consumer.onFailure(new IncompleteSSTableException(FileType.INDEX)); + return; + } + + consumePrimaryIndex(metadata.partitioner, + is, + ssTable, + compressionMetadata, + rangeFilter, + stats, + skipAhead, + consumer); + stats.indexFileRead(System.nanoTime() - now); + } + } + catch (Throwable t) + { + consumer.onFailure(t); + } + finally + { + consumer.onFinished(System.nanoTime() - startTimeNanos); + } + } + + @Nullable + public static CompressionMetadata getCompressionMetadata(SSTable ssTable, + boolean hasMaxCompressedLength) throws IOException + { + try (InputStream cis = ssTable.openCompressionStream()) + { + if (cis != null) + { + return CompressionMetadata.fromInputStream(cis, hasMaxCompressedLength); + } + } + return null; + } + + @SuppressWarnings("InfiniteLoopStatement") + static void consumePrimaryIndex(@NotNull IPartitioner partitioner, + @NotNull InputStream primaryIndex, + @NotNull SSTable ssTable, + @Nullable CompressionMetadata compressionMetadata, + @Nullable SparkRangeFilter range, + @NotNull Stats stats, + long skipBytes, + @NotNull IndexConsumer consumer) throws IOException + { + long primaryIndexLength = ssTable.length(FileType.INDEX); + long dataDbFileLength = ssTable.length(FileType.DATA); + try (DataInputStream dis = new DataInputStream(primaryIndex)) + { + if (skipBytes > 0) + { + ByteBufferUtils.skipFully(dis, skipBytes); + stats.indexBytesSkipped(skipBytes); + } + + ByteBuffer prevKey = null; + long prevPos = 0; + BigInteger prevToken = null; + boolean started = false; + + long totalBytesRead = 0; + try + { + while (true) + { + // read partition key length + int len = dis.readUnsignedShort(); + + // read partition key & decorate + byte[] buf = new byte[len]; + dis.readFully(buf); + ByteBuffer key = ByteBuffer.wrap(buf); + DecoratedKey decoratedKey = partitioner.decorateKey(key); + BigInteger token = ReaderUtils.tokenToBigInteger(decoratedKey.getToken()); + + // read position & skip promoted index + long pos = ReaderUtils.readPosition(dis); + int promotedIndex = ReaderUtils.skipPromotedIndex(dis); + totalBytesRead += 2 + len + VIntCoding.computeUnsignedVIntSize(pos) + promotedIndex; + + if (prevKey != null && (range == null || range.overlaps(prevToken))) + { + // previous key overlaps with range filter, so consume + started = true; + long uncompressed = pos - prevPos; + long compressed = compressionMetadata == null + ? uncompressed + : calculateCompressedSize(compressionMetadata, dataDbFileLength, prevPos, pos - 1); + consumer.accept(new IndexEntry(prevKey, prevToken, uncompressed, compressed)); + } + else if (started) + { + // we have gone passed the range we care about so exit early + stats.indexBytesSkipped(primaryIndexLength - totalBytesRead - skipBytes); + return; + } + + prevPos = pos; + prevKey = key; + prevToken = token; + } + } + catch (EOFException ignored) + { + // finished + } + finally + { + stats.indexBytesRead(totalBytesRead); + } + + if (prevKey != null && (range == null || range.overlaps(prevToken))) + { + // we reached the end of the file, so consume last key if overlaps + long end = (compressionMetadata == null ? dataDbFileLength : compressionMetadata.getDataLength()); + long uncompressed = end - prevPos; + long compressed = compressionMetadata == null + ? uncompressed + : calculateCompressedSize(compressionMetadata, dataDbFileLength, prevPos, end - 1); + consumer.accept(new IndexEntry(prevKey, prevToken, uncompressed, compressed)); + } + } + } + + /** + * @param compressionMetadata SSTable Compression Metadata + * @param compressedDataLength full compressed length of the Data.db file + * @param start uncompressed start position. + * @param end uncompressed end position. + * @return the compressed size of a partition using the uncompressed start and end offset in the Data.db file to calculate. + */ + public static long calculateCompressedSize(@NotNull CompressionMetadata compressionMetadata, + long compressedDataLength, + long start, + long end) + { + int startIdx = compressionMetadata.chunkIdx(start); + int endIdx = compressionMetadata.chunkIdx(end); + AbstractCompressionMetadata.Chunk startChunk = compressionMetadata.chunkAtIndex(startIdx); + long startLen = chunkCompressedLength(startChunk, compressedDataLength); + // compressed chunk sizes vary, but uncompressed chunk length is the same for all chunks + long uncompressedChunkLen = compressionMetadata.chunkLength(); + + if (startIdx == endIdx) + { + // within the same chunk, so take % of uncompressed length and apply to compressed length + float perc = (end - start) / (float) uncompressedChunkLen; + return Math.round(perc * startLen); + } + + long size = partialCompressedSizeWithinChunk(start, uncompressedChunkLen, startLen, true); + AbstractCompressionMetadata.Chunk endChunk = compressionMetadata.chunkAtIndex(endIdx); + long endLen = chunkCompressedLength(endChunk, compressedDataLength); + + size += partialCompressedSizeWithinChunk(end, uncompressedChunkLen, endLen, false); + + for (int idx = startIdx + 1; idx < endIdx; idx++) + { + // add compressed size of whole intermediate chunks + size += chunkCompressedLength(compressionMetadata.chunkAtIndex(idx), compressedDataLength); + } + + return size; + } + + private static long chunkCompressedLength(AbstractCompressionMetadata.Chunk chunk, long compressedDataLength) + { + // chunk.length < 0 means it is the last chunk so use compressedDataLength to calculate compressed size + return chunk.length >= 0 ? chunk.length : compressedDataLength - chunk.offset; + } + + /** + * Returns the partial compressed size of a partition whose start or end overlaps with a compressed chunk. + * This is an estimate because of the variable compressibility of partitions within the chunk. + * + * @param uncompressedPos uncompressed position in Data.db file + * @param uncompressedChunkLen fixed size uncompressed chunk size + * @param compressedChunkLen compressed chunk size of this chunk + * @param start true if uncompressedPos is start position of partition and false if end position of partition + * @return the estimated compressed size of partition start or end that overlaps with this chunk. + */ + public static int partialCompressedSizeWithinChunk(long uncompressedPos, + long uncompressedChunkLen, + long compressedChunkLen, + boolean start) + { + long mod = uncompressedPos % uncompressedChunkLen; + // if start position then it occupies remaining bytes to end of chunk, if end position it occupies bytes from start of chunk + long usedBytes = start ? (uncompressedChunkLen - mod) : mod; + // percentage of uncompressed bytes that it occupies in the chunk + float perc = usedBytes / (float) uncompressedChunkLen; + // apply percentage to compressed chunk length to give compressed bytes occupied + return Math.round(perc * compressedChunkLen); + } + + public BigInteger firstToken() + { + return ssTableRange != null ? ssTableRange.lowerEndpoint() : null; + } + + public BigInteger lastToken() + { + return ssTableRange != null ? ssTableRange.upperEndpoint() : null; + } + + public boolean ignore() + { + return false; + } +} diff --git a/cassandra-four-zero/src/main/java/org/apache/cassandra/spark/reader/ReaderUtils.java b/cassandra-four-zero/src/main/java/org/apache/cassandra/spark/reader/ReaderUtils.java index 35c29b5..e7bbe81 100644 --- a/cassandra-four-zero/src/main/java/org/apache/cassandra/spark/reader/ReaderUtils.java +++ b/cassandra-four-zero/src/main/java/org/apache/cassandra/spark/reader/ReaderUtils.java @@ -498,13 +498,18 @@ public final class ReaderUtils return VIntCoding.readUnsignedVInt(dis); } - static void skipPromotedIndex(DataInputStream dis) throws IOException + /** + * @return the total bytes skipped + */ + public static int skipPromotedIndex(DataInputStream dis) throws IOException { - int size = (int) VIntCoding.readUnsignedVInt(dis); + final long val = VIntCoding.readUnsignedVInt(dis); + final int size = (int) val; if (size > 0) { ByteBufferUtils.skipBytesFully(dis, size); } + return Math.max(size, 0) + VIntCoding.computeUnsignedVIntSize(val); } static List<PartitionKeyFilter> filterKeyInBloomFilter( diff --git a/cassandra-four-zero/src/test/java/org/apache/cassandra/spark/reader/IndexReaderTests.java b/cassandra-four-zero/src/test/java/org/apache/cassandra/spark/reader/IndexReaderTests.java new file mode 100644 index 0000000..b17b13b --- /dev/null +++ b/cassandra-four-zero/src/test/java/org/apache/cassandra/spark/reader/IndexReaderTests.java @@ -0,0 +1,273 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cassandra.spark.reader; + +import java.io.IOException; +import java.math.BigInteger; +import java.nio.ByteBuffer; +import java.nio.file.Path; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; + +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.bridge.CassandraBridgeImplementation; +import org.apache.cassandra.bridge.TokenRange; +import org.apache.cassandra.schema.TableMetadata; +import org.apache.cassandra.serializers.Int32Serializer; +import org.apache.cassandra.spark.TestDataLayer; +import org.apache.cassandra.spark.TestUtils; +import org.apache.cassandra.spark.data.CqlTable; +import org.apache.cassandra.spark.data.FileType; +import org.apache.cassandra.spark.data.SSTable; +import org.apache.cassandra.spark.data.partitioner.Partitioner; +import org.apache.cassandra.spark.reader.common.AbstractCompressionMetadata; +import org.apache.cassandra.spark.sparksql.filters.SparkRangeFilter; +import org.apache.cassandra.spark.stats.Stats; +import org.apache.cassandra.spark.utils.TemporaryDirectory; +import org.apache.cassandra.spark.utils.test.TestSchema; + +import static org.apache.cassandra.spark.TestUtils.getFileType; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import static org.quicktheories.QuickTheory.qt; +import static org.quicktheories.generators.SourceDSL.arbitrary; + +@SuppressWarnings("SameParameterValue") +public class IndexReaderTests +{ + static final ExecutorService EXECUTOR = + Executors.newFixedThreadPool(4, new ThreadFactoryBuilder().setNameFormat("index-reader-tests-%d") + .setDaemon(true) + .build()); + private static final Logger LOGGER = LoggerFactory.getLogger(IndexReaderTests.class); + private static final CassandraBridgeImplementation BRIDGE = new CassandraBridgeImplementation(); + + @Test + public void testPartialCompressedSizeWithinChunk() + { + assertEquals(64, IndexReader.partialCompressedSizeWithinChunk(0, 1024, 64, true)); + assertEquals(32, IndexReader.partialCompressedSizeWithinChunk(512, 1024, 64, true)); + assertEquals(16, IndexReader.partialCompressedSizeWithinChunk(768, 1024, 64, true)); + assertEquals(2, IndexReader.partialCompressedSizeWithinChunk(992, 1024, 64, true)); + assertEquals(2, IndexReader.partialCompressedSizeWithinChunk(995, 1024, 64, true)); + assertEquals(1, IndexReader.partialCompressedSizeWithinChunk(1008, 1024, 64, true)); + assertEquals(0, IndexReader.partialCompressedSizeWithinChunk(1023, 1024, 64, true)); + assertEquals(64, IndexReader.partialCompressedSizeWithinChunk(1024, 1024, 64, true)); + assertEquals(64, IndexReader.partialCompressedSizeWithinChunk(2048, 1024, 64, true)); + assertEquals(32, IndexReader.partialCompressedSizeWithinChunk(2560, 1024, 64, true)); + + assertEquals(0, IndexReader.partialCompressedSizeWithinChunk(0, 1024, 64, false)); + assertEquals(1, IndexReader.partialCompressedSizeWithinChunk(16, 1024, 64, false)); + assertEquals(32, IndexReader.partialCompressedSizeWithinChunk(512, 1024, 64, false)); + assertEquals(64, IndexReader.partialCompressedSizeWithinChunk(1023, 1024, 64, false)); + assertEquals(32, IndexReader.partialCompressedSizeWithinChunk(2560, 1024, 64, false)); + } + + @Test + public void testCompressedSizeWithinSameChunk() + { + // within the same chunk + assertEquals(64, calculateCompressedSize(128, 256, 0, 512)); + assertEquals(128, calculateCompressedSize(0, 1024, 5, 128)); + assertEquals(25, calculateCompressedSize(32, 64, 5, 800)); + } + + @Test + public void testCompressedSizeMultipleChunks() + { + // partition straddles more than one chunk + assertEquals(448 + 128, calculateCompressedSize(128, 0, 512, 1536, 1, 256)); + assertEquals(112 + (256 * 10) + 32, calculateCompressedSize(128, 0, 128, 11392, 11, 256)); + } + + private static long calculateCompressedSize(long start, long end, int startIdx, int startCompressedChunkSize) + { + return IndexReader.calculateCompressedSize(mockMetaData(start, startIdx, startCompressedChunkSize, end), 160000000, start, end); + } + + private static long calculateCompressedSize(long start, int startIdx, int startCompressedChunkSize, + long end, int endIdx, int endCompressedChunkSize) + { + return IndexReader.calculateCompressedSize( + mockMetaData(start, startIdx, startCompressedChunkSize, end, endIdx, endCompressedChunkSize), 160000000, start, end + ); + } + + private static CompressionMetadata mockMetaData(long start, int startIdx, int startCompressedChunkSize, long end) + { + return mockMetaData(start, startIdx, startCompressedChunkSize, end, startIdx, startCompressedChunkSize); + } + + private static CompressionMetadata mockMetaData(long start, int startIdx, int startCompressedChunkSize, + long end, int endIdx, int endCompressedChunkSize) + { + return mockMetaData(start, startIdx, startCompressedChunkSize, end, endIdx, endCompressedChunkSize, 1024); + } + + private static CompressionMetadata mockMetaData(long start, int startIdx, int startCompressedChunkSize, + long end, int endIdx, int endCompressedChunkSize, + int uncompressedChunkLength) + { + CompressionMetadata metadata = mock(CompressionMetadata.class); + when(metadata.chunkIdx(eq(start))).thenReturn(startIdx); + when(metadata.chunkIdx(eq(end))).thenReturn(endIdx); + when(metadata.chunkLength()).thenReturn(uncompressedChunkLength); + when(metadata.chunkAtIndex(eq(startIdx))).thenReturn(new AbstractCompressionMetadata.Chunk(0, startCompressedChunkSize)); + when(metadata.chunkAtIndex(eq(endIdx))).thenReturn(new AbstractCompressionMetadata.Chunk(0, endCompressedChunkSize)); + for (int idx = startIdx + 1; idx < endIdx; idx++) + { + // let intermediate chunks have same compressed size as end + when(metadata.chunkAtIndex(eq(idx))).thenReturn(new AbstractCompressionMetadata.Chunk(0, endCompressedChunkSize)); + } + return metadata; + } + + @Test + public void testIndexReaderWithCompression() + { + testIndexReader(true); + } + + @Test + public void testIndexReaderWithoutCompression() + { + testIndexReader(false); + } + + private static void testIndexReader(boolean withCompression) + { + qt().forAll(arbitrary().enumValues(Partitioner.class)) + .checkAssert(partitioner -> { + try (TemporaryDirectory directory = new TemporaryDirectory()) + { + Path dir = directory.path(); + int numPartitions = 50000; + BigInteger eighth = partitioner.maxToken().divide(BigInteger.valueOf(8)); + SparkRangeFilter rangeFilter = SparkRangeFilter.create( + TokenRange.closed(partitioner.minToken().add(eighth), + partitioner.maxToken().subtract(eighth)) + ); + TestSchema schema = TestSchema.builder() + .withPartitionKey("a", BRIDGE.aInt()) + .withColumn("b", BRIDGE.blob()) + .withCompression(withCompression) + .build(); + CqlTable table = schema.buildTable(); + TableMetadata metaData = new SchemaBuilder(schema.createStatement, schema.keyspace, schema.rf, partitioner).tableMetaData(); + + // write an SSTable + Map<Integer, Integer> expected = new HashMap<>(); + schema.writeSSTable(dir, BRIDGE, partitioner, (writer) -> { + for (int i = 0; i < numPartitions; i++) + { + BigInteger token = ReaderUtils.tokenToBigInteger( + metaData.partitioner.decorateKey(Int32Serializer.instance.serialize(i)).getToken() + ); + byte[] lowEntropyData = TestUtils.randomLowEntropyData(); + if (rangeFilter.overlaps(token)) + { + expected.put(i, lowEntropyData.length); + } + writer.write(i, ByteBuffer.wrap(lowEntropyData)); + } + }); + assertFalse(expected.isEmpty()); + assertTrue(expected.size() < numPartitions); + + TestDataLayer dataLayer = new TestDataLayer(BRIDGE, getFileType(dir, FileType.DATA).collect(Collectors.toList()), table); + List<SSTable> ssTables = dataLayer.listSSTables().collect(Collectors.toList()); + assertFalse(ssTables.isEmpty()); + AtomicReference<Throwable> error = new AtomicReference<>(); + CountDownLatch latch = new CountDownLatch(ssTables.size()); + AtomicInteger rowCount = new AtomicInteger(0); + + IndexConsumer consumer = new IndexConsumer() + { + public void onFailure(Throwable t) + { + LOGGER.warn("Error reading index file", t); + if (error.get() == null) + { + error.compareAndSet(null, t); + } + } + + public void onFinished(long runtimeNanos) + { + latch.countDown(); + } + + public void accept(IndexEntry indexEntry) + { + // we should only read in-range partition keys + rowCount.getAndIncrement(); + int pk = indexEntry.partitionKey.getInt(); + int blobSize = expected.get(pk); + assertTrue(expected.containsKey(pk)); + assertTrue(indexEntry.compressed > 0); + assertTrue(withCompression + ? indexEntry.compressed < indexEntry.uncompressed * 0.1 + : indexEntry.compressed == indexEntry.uncompressed); + assertTrue((int) indexEntry.uncompressed > blobSize); + // uncompressed size should be proportional to the blob size, with some serialization overhead + assertTrue(((int) indexEntry.uncompressed - blobSize) < 40); + } + }; + + ssTables + .forEach(ssTable -> CompletableFuture.runAsync( + () -> new IndexReader(ssTable, metaData, rangeFilter, Stats.DoNothingStats.INSTANCE, consumer), EXECUTOR) + ); + + try + { + latch.await(); + } + catch (InterruptedException e) + { + throw new RuntimeException(e); + } + assertNull(error.get()); + assertEquals(expected.size(), rowCount.get()); + } + catch (IOException e) + { + throw new RuntimeException(e); + } + }); + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org