This is an automated email from the ASF dual-hosted git repository. ycai pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra-analytics.git
The following commit(s) were added to refs/heads/trunk by this push: new a135322 CASSANDRA-19452 Use constant reference time during bulk read process (#44) a135322 is described below commit a13532272051d4e4608f92d53bdd997103e8ea19 Author: Yifan Cai <52585731+yifa...@users.noreply.github.com> AuthorDate: Tue Mar 5 11:06:36 2024 -0800 CASSANDRA-19452 Use constant reference time during bulk read process (#44) patch by Yifan Cai; reviewed by Francisco Guerrero, James Berragan for CASSANDRA-19452 --- CHANGES.txt | 1 + .../cassandra/spark/data/CassandraDataLayer.java | 28 ++++- .../cassandra/spark/data/LocalDataLayer.java | 7 ++ .../org/apache/cassandra/spark/TestDataLayer.java | 7 ++ .../data/partitioner/JDKSerializationTests.java | 7 ++ .../apache/cassandra/bridge/CassandraBridge.java | 2 - .../org/apache/cassandra/spark/data/DataLayer.java | 13 +-- .../{TimeProvider.java => ReaderTimeProvider.java} | 28 +++-- .../apache/cassandra/spark/utils/TimeProvider.java | 41 ++++++- .../cassandra/spark/utils/test/TestSchema.java | 29 ++++- .../bridge/CassandraBridgeImplementation.java | 7 -- .../spark/reader/AbstractStreamScanner.java | 50 ++++++--- .../spark/reader/CompactionStreamScanner.java | 32 ++++-- .../cassandra/spark/reader/SSTableReaderTests.java | 120 +++++++++++++++++++++ 14 files changed, 311 insertions(+), 61 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index 8215822..92620a9 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 1.0.0 + * Use constant reference time during bulk read process (CASSANDRA-19452) * Update access of ClearSnapshotStrategy (CASSANDRA-19442) * Bulk reader fails to produce a row when regular column values are null (CASSANDRA-19411) * Use XXHash32 for digest calculation of SSTables (CASSANDRA-19369) diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataLayer.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataLayer.java index 40e0436..8ab1dd6 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataLayer.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataLayer.java @@ -87,8 +87,10 @@ import org.apache.cassandra.spark.sparksql.LastModifiedTimestampDecorator; import org.apache.cassandra.spark.sparksql.RowBuilder; import org.apache.cassandra.spark.stats.Stats; import org.apache.cassandra.spark.utils.CqlUtils; +import org.apache.cassandra.spark.utils.ReaderTimeProvider; import org.apache.cassandra.spark.utils.ScalaFunctions; import org.apache.cassandra.spark.utils.ThrowableUtils; +import org.apache.cassandra.spark.utils.TimeProvider; import org.apache.cassandra.spark.validation.CassandraValidation; import org.apache.cassandra.spark.validation.SidecarValidation; import org.apache.cassandra.spark.validation.StartupValidatable; @@ -122,7 +124,6 @@ public class CassandraDataLayer extends PartitionedDataLayer implements StartupV protected TokenPartitioner tokenPartitioner; protected Map<String, AvailabilityHint> availabilityHints; protected Sidecar.ClientConfig sidecarClientConfig; - private SslConfig sslConfig; protected Map<String, BigNumberConfigImpl> bigNumberConfigMap; protected boolean enableStats; protected boolean readIndexOffset; @@ -133,7 +134,11 @@ public class CassandraDataLayer extends PartitionedDataLayer implements StartupV protected String lastModifiedTimestampField; // volatile in order to publish the reference for visibility protected volatile CqlTable cqlTable; + protected transient TimeProvider timeProvider; protected transient SidecarClient sidecar; + + private SslConfig sslConfig; + @VisibleForTesting transient Map<String, SidecarInstance> instanceMap; @@ -178,7 +183,8 @@ public class CassandraDataLayer extends PartitionedDataLayer implements StartupV boolean useIncrementalRepair, @Nullable String lastModifiedTimestampField, List<SchemaFeature> requestedFeatures, - @NotNull Map<String, ReplicationFactor> rfMap) + @NotNull Map<String, ReplicationFactor> rfMap, + TimeProvider timeProvider) { super(consistencyLevel, datacenter); this.snapshotName = snapshotName; @@ -203,6 +209,7 @@ public class CassandraDataLayer extends PartitionedDataLayer implements StartupV aliasLastModifiedTimestamp(this.requestedFeatures, this.lastModifiedTimestampField); } this.rfMap = rfMap; + this.timeProvider = timeProvider; this.maybeQuoteKeyspaceAndTable(); this.initInstanceMap(); this.startupValidate(); @@ -212,8 +219,9 @@ public class CassandraDataLayer extends PartitionedDataLayer implements StartupV { dialHome(options); - LOGGER.info("Starting Cassandra Spark job snapshotName={} keyspace={} table={} dc={}", - snapshotName, keyspace, table, datacenter); + timeProvider = new ReaderTimeProvider(); + LOGGER.info("Starting Cassandra Spark job snapshotName={} keyspace={} table={} dc={} referenceEpoch={}", + snapshotName, keyspace, table, datacenter, timeProvider.referenceEpochInSeconds()); // Load cluster config from options clusterConfig = initializeClusterConfig(options); @@ -381,6 +389,12 @@ public class CassandraDataLayer extends PartitionedDataLayer implements StartupV return throwable != null && (throwable instanceof RetriesExhaustedException || isExhausted(throwable.getCause())); } + @Override + public TimeProvider timeProvider() + { + return timeProvider; + } + @Override public boolean useIncrementalRepair() { @@ -701,6 +715,7 @@ public class CassandraDataLayer extends PartitionedDataLayer implements StartupV aliasLastModifiedTimestamp(this.requestedFeatures, this.lastModifiedTimestampField); } this.rfMap = (Map<String, ReplicationFactor>) in.readObject(); + this.timeProvider = new ReaderTimeProvider(in.readInt()); this.maybeQuoteKeyspaceAndTable(); this.initInstanceMap(); this.startupValidate(); @@ -742,6 +757,7 @@ public class CassandraDataLayer extends PartitionedDataLayer implements StartupV out.writeUTF(feature.optionName()); } out.writeObject(this.rfMap); + out.writeInt(timeProvider.referenceEpochInSeconds()); } private static void writeNullable(ObjectOutputStream out, @Nullable String string) throws IOException @@ -814,6 +830,7 @@ public class CassandraDataLayer extends PartitionedDataLayer implements StartupV .collect(Collectors.toList()); kryo.writeObject(out, listWrapper); kryo.writeObject(out, dataLayer.rfMap); + out.writeInt(dataLayer.timeProvider.referenceEpochInSeconds()); } @SuppressWarnings("unchecked") @@ -852,7 +869,8 @@ public class CassandraDataLayer extends PartitionedDataLayer implements StartupV in.readBoolean(), in.readString(), kryo.readObject(in, SchemaFeaturesListWrapper.class).toList(), - kryo.readObject(in, HashMap.class)); + kryo.readObject(in, HashMap.class), + new ReaderTimeProvider(in.readInt())); } // Wrapper only used internally for Kryo serialization/deserialization diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/LocalDataLayer.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/LocalDataLayer.java index 33a4850..214d5cc 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/LocalDataLayer.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/LocalDataLayer.java @@ -59,6 +59,7 @@ import org.apache.cassandra.spark.sparksql.filters.PartitionKeyFilter; import org.apache.cassandra.spark.sparksql.filters.SparkRangeFilter; import org.apache.cassandra.spark.stats.Stats; import org.apache.cassandra.spark.utils.Throwing; +import org.apache.cassandra.spark.utils.TimeProvider; import org.apache.parquet.Strings; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; @@ -270,6 +271,12 @@ public class LocalDataLayer extends DataLayer implements Serializable return true; } + @Override + public TimeProvider timeProvider() + { + return TimeProvider.DEFAULT; + } + @Override public CqlTable cqlTable() { diff --git a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/TestDataLayer.java b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/TestDataLayer.java index fede4fb..09e401c 100644 --- a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/TestDataLayer.java +++ b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/TestDataLayer.java @@ -46,6 +46,7 @@ import org.apache.cassandra.spark.data.SSTablesSupplier; import org.apache.cassandra.spark.data.partitioner.Partitioner; import org.apache.cassandra.spark.sparksql.filters.PartitionKeyFilter; import org.apache.cassandra.spark.sparksql.filters.SparkRangeFilter; +import org.apache.cassandra.spark.utils.TimeProvider; import org.apache.cassandra.spark.utils.test.TestSSTable; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; @@ -99,6 +100,12 @@ public class TestDataLayer extends DataLayer return true; } + @Override + public TimeProvider timeProvider() + { + return TimeProvider.DEFAULT; + } + @Override protected ExecutorService executorService() { diff --git a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/data/partitioner/JDKSerializationTests.java b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/data/partitioner/JDKSerializationTests.java index 1be563b..5cf6ec7 100644 --- a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/data/partitioner/JDKSerializationTests.java +++ b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/data/partitioner/JDKSerializationTests.java @@ -48,6 +48,7 @@ import org.apache.cassandra.spark.data.PartitionedDataLayer; import org.apache.cassandra.spark.data.ReplicationFactor; import org.apache.cassandra.spark.data.SSTable; import org.apache.cassandra.spark.data.VersionRunner; +import org.apache.cassandra.spark.utils.TimeProvider; import org.apache.cassandra.spark.utils.test.TestSchema; import org.jetbrains.annotations.NotNull; @@ -235,6 +236,12 @@ public class JDKSerializationTests extends VersionRunner return cqlTable; } + @Override + public TimeProvider timeProvider() + { + return TimeProvider.DEFAULT; + } + @Override public ReplicationFactor replicationFactor(String keyspace) { diff --git a/cassandra-bridge/src/main/java/org/apache/cassandra/bridge/CassandraBridge.java b/cassandra-bridge/src/main/java/org/apache/cassandra/bridge/CassandraBridge.java index 6c0240c..c89f2cd 100644 --- a/cassandra-bridge/src/main/java/org/apache/cassandra/bridge/CassandraBridge.java +++ b/cassandra-bridge/src/main/java/org/apache/cassandra/bridge/CassandraBridge.java @@ -84,8 +84,6 @@ public abstract class CassandraBridge @NotNull Partitioner partitioner, @NotNull List<String> keys); - public abstract TimeProvider timeProvider(); - // Compaction Stream Scanner // CHECKSTYLE IGNORE: Method with many parameters public abstract StreamScanner<Rid> getCompactionScanner(@NotNull CqlTable table, diff --git a/cassandra-bridge/src/main/java/org/apache/cassandra/spark/data/DataLayer.java b/cassandra-bridge/src/main/java/org/apache/cassandra/spark/data/DataLayer.java index 19ef6c9..27f0407 100644 --- a/cassandra-bridge/src/main/java/org/apache/cassandra/spark/data/DataLayer.java +++ b/cassandra-bridge/src/main/java/org/apache/cassandra/spark/data/DataLayer.java @@ -164,6 +164,11 @@ public abstract class DataLayer implements Serializable public abstract boolean isInPartition(int partitionId, BigInteger token, ByteBuffer key); + /** + * @return a TimeProvider + */ + public abstract TimeProvider timeProvider(); + public List<PartitionKeyFilter> partitionKeyFiltersInRange( int partitionId, List<PartitionKeyFilter> partitionKeyFilters) throws NoMatchFoundException @@ -282,14 +287,6 @@ public abstract class DataLayer implements Serializable rangeFilter, timeProvider(), stats(), executorService()); } - /** - * @return a TimeProvider that returns the time now in seconds. User can override with their own provider - */ - public TimeProvider timeProvider() - { - return bridge().timeProvider(); - } - /** * @param filters array of push down filters that * @return an array of push filters that are <b>not</b> supported by this data layer diff --git a/cassandra-bridge/src/main/java/org/apache/cassandra/spark/utils/TimeProvider.java b/cassandra-bridge/src/main/java/org/apache/cassandra/spark/utils/ReaderTimeProvider.java similarity index 56% copy from cassandra-bridge/src/main/java/org/apache/cassandra/spark/utils/TimeProvider.java copy to cassandra-bridge/src/main/java/org/apache/cassandra/spark/utils/ReaderTimeProvider.java index 950cb6a..cbc1751 100644 --- a/cassandra-bridge/src/main/java/org/apache/cassandra/spark/utils/TimeProvider.java +++ b/cassandra-bridge/src/main/java/org/apache/cassandra/spark/utils/ReaderTimeProvider.java @@ -20,15 +20,29 @@ package org.apache.cassandra.spark.utils; /** - * Provides current time - */ -@FunctionalInterface -public interface TimeProvider + * A {@link TimeProvider} used by reader. It decides the reference epoch on instantiation and distribute to executors + */ +public class ReaderTimeProvider implements TimeProvider { - TimeProvider INSTANCE = () -> (int) Math.floorDiv(System.currentTimeMillis(), 1000L); + private final int referenceEpochInSeconds; + + public ReaderTimeProvider() + { + this.referenceEpochInSeconds = nowInSeconds(); + } /** - * @return current time in truncated seconds + * Constructor used for deserialization + * @param referenceEpochInSeconds reference epoch to set */ - int nowInTruncatedSeconds(); + public ReaderTimeProvider(int referenceEpochInSeconds) + { + this.referenceEpochInSeconds = referenceEpochInSeconds; + } + + @Override + public int referenceEpochInSeconds() + { + return referenceEpochInSeconds; + } } diff --git a/cassandra-bridge/src/main/java/org/apache/cassandra/spark/utils/TimeProvider.java b/cassandra-bridge/src/main/java/org/apache/cassandra/spark/utils/TimeProvider.java index 950cb6a..481b8f9 100644 --- a/cassandra-bridge/src/main/java/org/apache/cassandra/spark/utils/TimeProvider.java +++ b/cassandra-bridge/src/main/java/org/apache/cassandra/spark/utils/TimeProvider.java @@ -19,16 +19,47 @@ package org.apache.cassandra.spark.utils; +import java.util.concurrent.TimeUnit; + +import com.google.common.annotations.VisibleForTesting; + /** - * Provides current time + * Provides time */ -@FunctionalInterface public interface TimeProvider { - TimeProvider INSTANCE = () -> (int) Math.floorDiv(System.currentTimeMillis(), 1000L); + /** + * The {@code DEFAULT} time provider sets the reference time on class loading. The reference time is fixed for the + * local JVM. If the same reference time for all executors is desired, you should avoid using the {@code DEFAULT} + * and use {@link ReaderTimeProvider} or similar kind that allows to broadcast the reference time value from driver. + * <p></p> + * It should be used for testing only. + */ + @VisibleForTesting + TimeProvider DEFAULT = new TimeProvider() + { + private final int referenceEpochInSeconds = nowInSeconds(); + + @Override + public int referenceEpochInSeconds() + { + return referenceEpochInSeconds; + } + }; + + /** + * @return current time in seconds + */ + default int nowInSeconds() + { + return (int) TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis()); + } /** - * @return current time in truncated seconds + * Get the time value that is used as a reference. It should never change throughout the lifecycle of the provider + * @return a fixed epoch time in seconds + * + * Note that the actual constant value returned is implementation dependent */ - int nowInTruncatedSeconds(); + int referenceEpochInSeconds(); } diff --git a/cassandra-bridge/src/main/java/org/apache/cassandra/spark/utils/test/TestSchema.java b/cassandra-bridge/src/main/java/org/apache/cassandra/spark/utils/test/TestSchema.java index 2e6b0a8..0949940 100644 --- a/cassandra-bridge/src/main/java/org/apache/cassandra/spark/utils/test/TestSchema.java +++ b/cassandra-bridge/src/main/java/org/apache/cassandra/spark/utils/test/TestSchema.java @@ -78,6 +78,7 @@ public final class TestSchema private Integer blobSize = null; private boolean withCompression = true; private boolean quoteIdentifiers = false; + private int ttlSecs = 0; public Builder(CassandraBridge bridge) { @@ -163,6 +164,12 @@ public final class TestSchema return this; } + public Builder withTTL(int ttlSecs) + { + this.ttlSecs = ttlSecs; + return this; + } + public TestSchema build() { if (!partitionKeys.isEmpty()) @@ -192,7 +199,7 @@ public final class TestSchema } } - private final CassandraBridge bridge; + private final CassandraBridge bridge; @NotNull public final String keyspace; public final String table; @@ -249,7 +256,10 @@ public final class TestSchema this.blobSize = builder.blobSize; this.allFields = buildAllFields(partitionKeys, clusteringKeys, columns); this.fieldPositions = calculateFieldPositions(allFields); - this.createStatement = buildCreateStatement(columns, builder.sortOrders, builder.withCompression); + this.createStatement = buildCreateStatement(columns, + builder.sortOrders, + builder.withCompression, + builder.ttlSecs); this.insertStatement = buildInsertStatement(columns, builder.insertFields); this.updateStatement = buildUpdateStatement(); this.deleteStatement = buildDeleteStatement(builder.deleteFields); @@ -363,7 +373,8 @@ public final class TestSchema private String buildCreateStatement(List<CqlField> columns, List<CqlField.SortOrder> sortOrders, - boolean withCompression) + boolean withCompression, + int ttlSecs) { StringBuilder createStmtBuilder = new StringBuilder().append("CREATE TABLE ") .append(maybeQuoteIdentifierIfRequested(keyspace)) @@ -398,9 +409,11 @@ public final class TestSchema createStmtBuilder.append("))"); + createStmtBuilder.append(" WITH comment = 'test table'"); // take 'WITH', so the rest can append 'AND' safely + if (!sortOrders.isEmpty()) { - createStmtBuilder.append(" WITH CLUSTERING ORDER BY ("); + createStmtBuilder.append(" AND CLUSTERING ORDER BY ("); for (int sortOrder = 0; sortOrder < sortOrders.size(); sortOrder++) { createStmtBuilder.append(maybeQuoteIdentifierIfRequested(clusteringKeys.get(sortOrder).name())) @@ -414,9 +427,15 @@ public final class TestSchema createStmtBuilder.append(")"); } + if (!withCompression) { - createStmtBuilder.append(" WITH compression = {'enabled':'false'}"); + createStmtBuilder.append(" AND compression = {'enabled':'false'}"); + } + + if (ttlSecs > 0) + { + createStmtBuilder.append(" AND default_time_to_live = " + ttlSecs); } return createStmtBuilder.append(";") diff --git a/cassandra-four-zero/src/main/java/org/apache/cassandra/bridge/CassandraBridgeImplementation.java b/cassandra-four-zero/src/main/java/org/apache/cassandra/bridge/CassandraBridgeImplementation.java index 4cad69b..56e6d46 100644 --- a/cassandra-four-zero/src/main/java/org/apache/cassandra/bridge/CassandraBridgeImplementation.java +++ b/cassandra-four-zero/src/main/java/org/apache/cassandra/bridge/CassandraBridgeImplementation.java @@ -121,7 +121,6 @@ import org.apache.cassandra.spark.utils.SparkClassLoaderOverride; import org.apache.cassandra.spark.utils.TimeProvider; import org.apache.cassandra.tools.JsonTransformer; import org.apache.cassandra.tools.Util; -import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.UUIDGen; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; @@ -244,12 +243,6 @@ public class CassandraBridgeImplementation extends CassandraBridge .collect(Collectors.toList()); } - @Override - public TimeProvider timeProvider() - { - return FBUtilities::nowInSeconds; - } - @Override public StreamScanner<Rid> getCompactionScanner(@NotNull CqlTable table, @NotNull Partitioner partitioner, diff --git a/cassandra-four-zero/src/main/java/org/apache/cassandra/spark/reader/AbstractStreamScanner.java b/cassandra-four-zero/src/main/java/org/apache/cassandra/spark/reader/AbstractStreamScanner.java index 460ced0..b2ea6be 100644 --- a/cassandra-four-zero/src/main/java/org/apache/cassandra/spark/reader/AbstractStreamScanner.java +++ b/cassandra-four-zero/src/main/java/org/apache/cassandra/spark/reader/AbstractStreamScanner.java @@ -21,6 +21,7 @@ package org.apache.cassandra.spark.reader; import java.io.Closeable; import java.io.IOException; +import java.math.BigInteger; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Iterator; @@ -111,13 +112,33 @@ public abstract class AbstractStreamScanner implements StreamScanner<Rid>, Close @Override public abstract void close() throws IOException; - protected abstract void handleRowTombstone(Row row); + /** + * Handles the row tombstone + * @param token token of the partition that the row belongs to + * @param row row tombstone + */ + protected abstract void handleRowTombstone(BigInteger token, Row row); - protected abstract void handlePartitionTombstone(UnfilteredRowIterator partition); + /** + * Handles the partition tombstone + * @param token token of the partition + * @param partition partition tombstone + */ + protected abstract void handlePartitionTombstone(BigInteger token, UnfilteredRowIterator partition); - protected abstract void handleCellTombstone(); - protected abstract void handleCellTombstoneInComplex(Cell<?> cell); + /** + * Handle the cell tombstone + * @param token token of the partition that the cell belongs to + */ + protected abstract void handleCellTombstone(BigInteger token); + + /** + * Handle the cell tombstone in complex type, e.g. UDT and collections + * @param token token of the partition that the cell belongs to + * @param cell cell tombstone + */ + protected abstract void handleCellTombstoneInComplex(BigInteger token, Cell<?> cell); @Override public void advanceToNextColumn() @@ -152,16 +173,16 @@ public abstract class AbstractStreamScanner implements StreamScanner<Rid>, Close // Advance to next partition partition = allPartitions.next(); + BigInteger token = ReaderUtils.tokenToBigInteger(partition.partitionKey().getToken()); if (partition.partitionLevelDeletion().isLive()) { // Reset rid with new partition key - rid.setPartitionKeyCopy(partition.partitionKey().getKey(), - ReaderUtils.tokenToBigInteger(partition.partitionKey().getToken())); + rid.setPartitionKeyCopy(partition.partitionKey().getKey(), token); } else { // There's a partition-level delete - handlePartitionTombstone(partition); + handlePartitionTombstone(token, partition); return true; } } @@ -230,7 +251,7 @@ public abstract class AbstractStreamScanner implements StreamScanner<Rid>, Close // There is a CQL row level delete if (!row.deletion().isLive()) { - handleRowTombstone(row); + handleRowTombstone(rid.getToken(), row); return true; } @@ -254,6 +275,11 @@ public abstract class AbstractStreamScanner implements StreamScanner<Rid>, Close // either are present) will get emitted columns = row.iterator(); } + else if (unfiltered.isRangeTombstoneMarker()) + { + throw new IllegalStateException("Encountered RangeTombstoneMarker. " + + "It should have been purged in CompactionIterator"); + } else { // As of Cassandra 4, the unfiltered kind can either be row or range tombstone marker, @@ -370,7 +396,7 @@ public abstract class AbstractStreamScanner implements StreamScanner<Rid>, Close null)); if (cell.isTombstone()) { - handleCellTombstone(); + handleCellTombstone(rid.getToken()); } else { @@ -425,13 +451,13 @@ public abstract class AbstractStreamScanner implements StreamScanner<Rid>, Close Cell<?> cell = cells.next(); // Re: isLive vs. isTombstone - isLive considers TTL so that if a cell is expiring soon, // it is handled as tombstone - if (cell.isLive(timeProvider.nowInTruncatedSeconds())) + if (cell.isLive(timeProvider.referenceEpochInSeconds())) { buffer.addCell(cell); } else { - handleCellTombstoneInComplex(cell); + handleCellTombstoneInComplex(rid.getToken(), cell); } // In the case the cell is deleted, the deletion time is also the cell's timestamp maxTimestamp = Math.max(maxTimestamp, cell.timestamp()); @@ -443,7 +469,7 @@ public abstract class AbstractStreamScanner implements StreamScanner<Rid>, Close else { // The entire collection/UDT is deleted - handleCellTombstone(); + handleCellTombstone(rid.getToken()); rid.setTimestamp(deletionTime.markedForDeleteAt()); } diff --git a/cassandra-four-zero/src/main/java/org/apache/cassandra/spark/reader/CompactionStreamScanner.java b/cassandra-four-zero/src/main/java/org/apache/cassandra/spark/reader/CompactionStreamScanner.java index 9c2d156..89a2df3 100644 --- a/cassandra-four-zero/src/main/java/org/apache/cassandra/spark/reader/CompactionStreamScanner.java +++ b/cassandra-four-zero/src/main/java/org/apache/cassandra/spark/reader/CompactionStreamScanner.java @@ -19,6 +19,7 @@ package org.apache.cassandra.spark.reader; +import java.math.BigInteger; import java.util.Arrays; import java.util.Collection; import java.util.List; @@ -26,6 +27,8 @@ import java.util.UUID; import java.util.function.LongPredicate; import java.util.stream.Collectors; +import com.google.common.annotations.VisibleForTesting; + import org.apache.cassandra.db.AbstractCompactionController; import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.DecoratedKey; @@ -54,11 +57,12 @@ public class CompactionStreamScanner extends AbstractStreamScanner private AbstractCompactionStrategy.ScannerList scanners; private CompactionIterator ci; + @VisibleForTesting CompactionStreamScanner(@NotNull TableMetadata cfMetaData, @NotNull Partitioner partitionerType, @NotNull Collection<? extends Scannable> toCompact) { - this(cfMetaData, partitionerType, TimeProvider.INSTANCE, toCompact); + this(cfMetaData, partitionerType, TimeProvider.DEFAULT, toCompact); } public CompactionStreamScanner(@NotNull TableMetadata cfMetaData, @@ -79,33 +83,41 @@ public class CompactionStreamScanner extends AbstractStreamScanner } @Override - protected void handleRowTombstone(Row row) + protected void handleRowTombstone(BigInteger token, Row row) { - throw new IllegalStateException("Row tombstone found, it should have been purged in CompactionIterator"); + throw new IllegalStateException("Row tombstone found. " + + "It should have been purged in CompactionIterator." + + "Partition key token: " + token); } @Override - protected void handlePartitionTombstone(UnfilteredRowIterator partition) + protected void handlePartitionTombstone(BigInteger token, UnfilteredRowIterator partition) { - throw new IllegalStateException("Partition tombstone found, it should have been purged in CompactionIterator"); + throw new IllegalStateException("Partition tombstone found. " + + "It should have been purged in CompactionIterator. " + + "Partition key token: " + token); } @Override - protected void handleCellTombstone() + protected void handleCellTombstone(BigInteger token) { - throw new IllegalStateException("Cell tombstone found, it should have been purged in CompactionIterator"); + throw new IllegalStateException("Cell tombstone found. " + + "It should have been purged in CompactionIterator. " + + "Partition key token: " + token); } @Override - protected void handleCellTombstoneInComplex(Cell<?> cell) + protected void handleCellTombstoneInComplex(BigInteger token, Cell<?> cell) { - // Do nothing: to not introduce behavior change to the SBR code path + throw new IllegalStateException("Cell tombstone in complex type found. " + + "It should have been purged in CompactionIterator. " + + "Partition key token: " + token); } @Override UnfilteredPartitionIterator initializePartitions() { - int nowInSec = timeProvider.nowInTruncatedSeconds(); + int nowInSec = timeProvider.referenceEpochInSeconds(); Keyspace keyspace = Keyspace.openWithoutSSTables(metadata.keyspace); ColumnFamilyStore cfStore = keyspace.getColumnFamilyStore(metadata.name); controller = new PurgingCompactionController(cfStore, CompactionParams.TombstoneOption.NONE); diff --git a/cassandra-four-zero/src/test/java/org/apache/cassandra/spark/reader/SSTableReaderTests.java b/cassandra-four-zero/src/test/java/org/apache/cassandra/spark/reader/SSTableReaderTests.java index 2b87784..1dc26a8 100644 --- a/cassandra-four-zero/src/test/java/org/apache/cassandra/spark/reader/SSTableReaderTests.java +++ b/cassandra-four-zero/src/test/java/org/apache/cassandra/spark/reader/SSTableReaderTests.java @@ -39,6 +39,7 @@ import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Function; @@ -46,8 +47,10 @@ import java.util.stream.Collectors; import java.util.stream.Stream; import com.google.common.collect.ImmutableMap; +import com.google.common.util.concurrent.Uninterruptibles; import org.apache.commons.lang.StringUtils; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -67,6 +70,9 @@ import org.apache.cassandra.io.sstable.ISSTableScanner; import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.schema.TableMetadata; import org.apache.cassandra.serializers.UTF8Serializer; +import org.apache.cassandra.spark.TestDataLayer; +import org.apache.cassandra.spark.TestRunnable; +import org.apache.cassandra.spark.TestUtils; import org.apache.cassandra.spark.data.CqlTable; import org.apache.cassandra.spark.data.FileType; import org.apache.cassandra.spark.data.ReplicationFactor; @@ -78,12 +84,17 @@ import org.apache.cassandra.spark.stats.Stats; import org.apache.cassandra.spark.utils.ByteBufferUtils; import org.apache.cassandra.spark.utils.TemporaryDirectory; import org.apache.cassandra.spark.utils.Throwing; +import org.apache.cassandra.spark.utils.TimeProvider; import org.apache.cassandra.spark.utils.test.TestSSTable; import org.apache.cassandra.spark.utils.test.TestSchema; import org.apache.cassandra.utils.Pair; +import org.apache.spark.sql.catalyst.util.GenericArrayData; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; +import static org.apache.cassandra.spark.TestUtils.countSSTables; +import static org.apache.cassandra.spark.TestUtils.getFileType; +import static org.apache.cassandra.spark.TestUtils.runTest; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotEquals; @@ -99,6 +110,9 @@ public class SSTableReaderTests private static final int ROWS = 50; private static final int COLUMNS = 25; + @TempDir + private Path tempDir; + @Test public void testOpenCompressedRawInputStream() { @@ -1048,6 +1062,112 @@ public class SSTableReaderTests }); } + @Test + public void testCollectionWithTtlUsingConstantReferenceTime() + { + // offset is 0, column values in all rows should be unexpired; thus, reading 10 values + testTtlUsingConstantReferenceTimeHelper(50, 0, 10, 10); + // ensure all rows expires by advancing enough time in the future; thus, reading 0 values + testTtlUsingConstantReferenceTimeHelper(50, 100, 10, 0); + } + + // helper that write rows with ttl, and assert on the compaction result by changing the reference time + private void testTtlUsingConstantReferenceTimeHelper(int ttlSecs, + int timeOffsetSecs, + int rows, + int expectedValues) + { + AtomicInteger referenceEpoch = new AtomicInteger(0); + TimeProvider navigatableTimeProvider = new TimeProvider() + { + @Override + public int referenceEpochInSeconds() + { + return referenceEpoch.get(); + } + }; + + Set<Integer> expectedColValue = new HashSet<>(Arrays.asList(1, 2, 3)); + TestRunnable test = (partitioner, dir, bridge) -> { + TestSchema schema = TestSchema.builder(BRIDGE) + .withPartitionKey("a", BRIDGE.aInt()) + .withColumn("b", BRIDGE.set(BRIDGE.aInt())) + .withTTL(ttlSecs) + .build(); + schema.writeSSTable(dir, BRIDGE, partitioner, (writer) -> { + for (int i = 0; i < rows; i++) + { + writer.write(i, expectedColValue); + } + Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS); + }); + int t1 = navigatableTimeProvider.nowInSeconds(); + assertEquals(1, countSSTables(dir)); + + // open CompactionStreamScanner over SSTables + TableMetadata metaData = new SchemaBuilder(schema.createStatement, + schema.keyspace, + new ReplicationFactor(ReplicationFactor.ReplicationStrategy.SimpleStrategy, + ImmutableMap.of("replication_factor", 1)), + partitioner) + .tableMetaData(); + + CqlTable table = schema.buildTable(); + TestDataLayer dataLayer = new TestDataLayer(BRIDGE, + getFileType(dir, FileType.DATA).collect(Collectors.toList()), + table); + Set<SSTableReader> toCompact = dataLayer.listSSTables() + .map(ssTable -> { + try + { + return openReader(metaData, ssTable); + } + catch (IOException e) + { + throw new RuntimeException(e); + } + }) + .collect(Collectors.toSet()); + + int count = 0; + referenceEpoch.set(t1 + timeOffsetSecs); + try (CompactionStreamScanner scanner = new CompactionStreamScanner(metaData, partitioner, navigatableTimeProvider, toCompact)) + { + // iterate through CompactionStreamScanner verifying it correctly compacts data together + Rid rid = scanner.rid(); + while (scanner.hasNext()) + { + scanner.advanceToNextColumn(); + + // extract column name + ByteBuffer colBuf = rid.getColumnName(); + String colName = ByteBufferUtils.string(ByteBufferUtils.readBytesWithShortLength(colBuf)); + colBuf.get(); + if (StringUtils.isEmpty(colName)) + { + continue; + } + assertEquals("b", colName); + + // extract value column + ByteBuffer b = rid.getValue(); + Set set = new HashSet(Arrays.asList(((GenericArrayData) BRIDGE.set(BRIDGE.aInt()) + .deserialize(b)) + .array())); + assertEquals(expectedColValue, set); + count++; + } + } + assertEquals(expectedValues, count); + }; + + qt() + .forAll(TestUtils.partitioners()) + .checkAssert(partitioner -> { + runTest(partitioner, BRIDGE, test); + }); + } + private static TableMetadata tableMetadata(TestSchema schema, Partitioner partitioner) { return new SchemaBuilder(schema.createStatement, --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org