This is an automated email from the ASF dual-hosted git repository.

ycai pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-analytics.git


The following commit(s) were added to refs/heads/trunk by this push:
     new a135322  CASSANDRA-19452 Use constant reference time during bulk read 
process (#44)
a135322 is described below

commit a13532272051d4e4608f92d53bdd997103e8ea19
Author: Yifan Cai <52585731+yifa...@users.noreply.github.com>
AuthorDate: Tue Mar 5 11:06:36 2024 -0800

    CASSANDRA-19452 Use constant reference time during bulk read process (#44)
    
    patch by Yifan Cai; reviewed by Francisco Guerrero, James Berragan for 
CASSANDRA-19452
---
 CHANGES.txt                                        |   1 +
 .../cassandra/spark/data/CassandraDataLayer.java   |  28 ++++-
 .../cassandra/spark/data/LocalDataLayer.java       |   7 ++
 .../org/apache/cassandra/spark/TestDataLayer.java  |   7 ++
 .../data/partitioner/JDKSerializationTests.java    |   7 ++
 .../apache/cassandra/bridge/CassandraBridge.java   |   2 -
 .../org/apache/cassandra/spark/data/DataLayer.java |  13 +--
 .../{TimeProvider.java => ReaderTimeProvider.java} |  28 +++--
 .../apache/cassandra/spark/utils/TimeProvider.java |  41 ++++++-
 .../cassandra/spark/utils/test/TestSchema.java     |  29 ++++-
 .../bridge/CassandraBridgeImplementation.java      |   7 --
 .../spark/reader/AbstractStreamScanner.java        |  50 ++++++---
 .../spark/reader/CompactionStreamScanner.java      |  32 ++++--
 .../cassandra/spark/reader/SSTableReaderTests.java | 120 +++++++++++++++++++++
 14 files changed, 311 insertions(+), 61 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 8215822..92620a9 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 1.0.0
+ * Use constant reference time during bulk read process (CASSANDRA-19452)
  * Update access of ClearSnapshotStrategy (CASSANDRA-19442)
  * Bulk reader fails to produce a row when regular column values are null 
(CASSANDRA-19411)
  * Use XXHash32 for digest calculation of SSTables (CASSANDRA-19369)
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataLayer.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataLayer.java
index 40e0436..8ab1dd6 100644
--- 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataLayer.java
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataLayer.java
@@ -87,8 +87,10 @@ import 
org.apache.cassandra.spark.sparksql.LastModifiedTimestampDecorator;
 import org.apache.cassandra.spark.sparksql.RowBuilder;
 import org.apache.cassandra.spark.stats.Stats;
 import org.apache.cassandra.spark.utils.CqlUtils;
+import org.apache.cassandra.spark.utils.ReaderTimeProvider;
 import org.apache.cassandra.spark.utils.ScalaFunctions;
 import org.apache.cassandra.spark.utils.ThrowableUtils;
+import org.apache.cassandra.spark.utils.TimeProvider;
 import org.apache.cassandra.spark.validation.CassandraValidation;
 import org.apache.cassandra.spark.validation.SidecarValidation;
 import org.apache.cassandra.spark.validation.StartupValidatable;
@@ -122,7 +124,6 @@ public class CassandraDataLayer extends 
PartitionedDataLayer implements StartupV
     protected TokenPartitioner tokenPartitioner;
     protected Map<String, AvailabilityHint> availabilityHints;
     protected Sidecar.ClientConfig sidecarClientConfig;
-    private SslConfig sslConfig;
     protected Map<String, BigNumberConfigImpl> bigNumberConfigMap;
     protected boolean enableStats;
     protected boolean readIndexOffset;
@@ -133,7 +134,11 @@ public class CassandraDataLayer extends 
PartitionedDataLayer implements StartupV
     protected String lastModifiedTimestampField;
     // volatile in order to publish the reference for visibility
     protected volatile CqlTable cqlTable;
+    protected transient TimeProvider timeProvider;
     protected transient SidecarClient sidecar;
+
+    private SslConfig sslConfig;
+
     @VisibleForTesting
     transient Map<String, SidecarInstance> instanceMap;
 
@@ -178,7 +183,8 @@ public class CassandraDataLayer extends 
PartitionedDataLayer implements StartupV
                                  boolean useIncrementalRepair,
                                  @Nullable String lastModifiedTimestampField,
                                  List<SchemaFeature> requestedFeatures,
-                                 @NotNull Map<String, ReplicationFactor> rfMap)
+                                 @NotNull Map<String, ReplicationFactor> rfMap,
+                                 TimeProvider timeProvider)
     {
         super(consistencyLevel, datacenter);
         this.snapshotName = snapshotName;
@@ -203,6 +209,7 @@ public class CassandraDataLayer extends 
PartitionedDataLayer implements StartupV
             aliasLastModifiedTimestamp(this.requestedFeatures, 
this.lastModifiedTimestampField);
         }
         this.rfMap = rfMap;
+        this.timeProvider = timeProvider;
         this.maybeQuoteKeyspaceAndTable();
         this.initInstanceMap();
         this.startupValidate();
@@ -212,8 +219,9 @@ public class CassandraDataLayer extends 
PartitionedDataLayer implements StartupV
     {
         dialHome(options);
 
-        LOGGER.info("Starting Cassandra Spark job snapshotName={} keyspace={} 
table={} dc={}",
-                    snapshotName, keyspace, table, datacenter);
+        timeProvider = new ReaderTimeProvider();
+        LOGGER.info("Starting Cassandra Spark job snapshotName={} keyspace={} 
table={} dc={} referenceEpoch={}",
+                    snapshotName, keyspace, table, datacenter, 
timeProvider.referenceEpochInSeconds());
 
         // Load cluster config from options
         clusterConfig = initializeClusterConfig(options);
@@ -381,6 +389,12 @@ public class CassandraDataLayer extends 
PartitionedDataLayer implements StartupV
         return throwable != null && (throwable instanceof 
RetriesExhaustedException || isExhausted(throwable.getCause()));
     }
 
+    @Override
+    public TimeProvider timeProvider()
+    {
+        return timeProvider;
+    }
+
     @Override
     public boolean useIncrementalRepair()
     {
@@ -701,6 +715,7 @@ public class CassandraDataLayer extends 
PartitionedDataLayer implements StartupV
             aliasLastModifiedTimestamp(this.requestedFeatures, 
this.lastModifiedTimestampField);
         }
         this.rfMap = (Map<String, ReplicationFactor>) in.readObject();
+        this.timeProvider = new ReaderTimeProvider(in.readInt());
         this.maybeQuoteKeyspaceAndTable();
         this.initInstanceMap();
         this.startupValidate();
@@ -742,6 +757,7 @@ public class CassandraDataLayer extends 
PartitionedDataLayer implements StartupV
             out.writeUTF(feature.optionName());
         }
         out.writeObject(this.rfMap);
+        out.writeInt(timeProvider.referenceEpochInSeconds());
     }
 
     private static void writeNullable(ObjectOutputStream out, @Nullable String 
string) throws IOException
@@ -814,6 +830,7 @@ public class CassandraDataLayer extends 
PartitionedDataLayer implements StartupV
                                                                            
.collect(Collectors.toList());
             kryo.writeObject(out, listWrapper);
             kryo.writeObject(out, dataLayer.rfMap);
+            out.writeInt(dataLayer.timeProvider.referenceEpochInSeconds());
         }
 
         @SuppressWarnings("unchecked")
@@ -852,7 +869,8 @@ public class CassandraDataLayer extends 
PartitionedDataLayer implements StartupV
             in.readBoolean(),
             in.readString(),
             kryo.readObject(in, SchemaFeaturesListWrapper.class).toList(),
-            kryo.readObject(in, HashMap.class));
+            kryo.readObject(in, HashMap.class),
+            new ReaderTimeProvider(in.readInt()));
         }
 
         // Wrapper only used internally for Kryo serialization/deserialization
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/LocalDataLayer.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/LocalDataLayer.java
index 33a4850..214d5cc 100644
--- 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/LocalDataLayer.java
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/LocalDataLayer.java
@@ -59,6 +59,7 @@ import 
org.apache.cassandra.spark.sparksql.filters.PartitionKeyFilter;
 import org.apache.cassandra.spark.sparksql.filters.SparkRangeFilter;
 import org.apache.cassandra.spark.stats.Stats;
 import org.apache.cassandra.spark.utils.Throwing;
+import org.apache.cassandra.spark.utils.TimeProvider;
 import org.apache.parquet.Strings;
 import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
@@ -270,6 +271,12 @@ public class LocalDataLayer extends DataLayer implements 
Serializable
         return true;
     }
 
+    @Override
+    public TimeProvider timeProvider()
+    {
+        return TimeProvider.DEFAULT;
+    }
+
     @Override
     public CqlTable cqlTable()
     {
diff --git 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/TestDataLayer.java
 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/TestDataLayer.java
index fede4fb..09e401c 100644
--- 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/TestDataLayer.java
+++ 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/TestDataLayer.java
@@ -46,6 +46,7 @@ import org.apache.cassandra.spark.data.SSTablesSupplier;
 import org.apache.cassandra.spark.data.partitioner.Partitioner;
 import org.apache.cassandra.spark.sparksql.filters.PartitionKeyFilter;
 import org.apache.cassandra.spark.sparksql.filters.SparkRangeFilter;
+import org.apache.cassandra.spark.utils.TimeProvider;
 import org.apache.cassandra.spark.utils.test.TestSSTable;
 import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
@@ -99,6 +100,12 @@ public class TestDataLayer extends DataLayer
         return true;
     }
 
+    @Override
+    public TimeProvider timeProvider()
+    {
+        return TimeProvider.DEFAULT;
+    }
+
     @Override
     protected ExecutorService executorService()
     {
diff --git 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/data/partitioner/JDKSerializationTests.java
 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/data/partitioner/JDKSerializationTests.java
index 1be563b..5cf6ec7 100644
--- 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/data/partitioner/JDKSerializationTests.java
+++ 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/data/partitioner/JDKSerializationTests.java
@@ -48,6 +48,7 @@ import org.apache.cassandra.spark.data.PartitionedDataLayer;
 import org.apache.cassandra.spark.data.ReplicationFactor;
 import org.apache.cassandra.spark.data.SSTable;
 import org.apache.cassandra.spark.data.VersionRunner;
+import org.apache.cassandra.spark.utils.TimeProvider;
 import org.apache.cassandra.spark.utils.test.TestSchema;
 import org.jetbrains.annotations.NotNull;
 
@@ -235,6 +236,12 @@ public class JDKSerializationTests extends VersionRunner
             return cqlTable;
         }
 
+        @Override
+        public TimeProvider timeProvider()
+        {
+            return TimeProvider.DEFAULT;
+        }
+
         @Override
         public ReplicationFactor replicationFactor(String keyspace)
         {
diff --git 
a/cassandra-bridge/src/main/java/org/apache/cassandra/bridge/CassandraBridge.java
 
b/cassandra-bridge/src/main/java/org/apache/cassandra/bridge/CassandraBridge.java
index 6c0240c..c89f2cd 100644
--- 
a/cassandra-bridge/src/main/java/org/apache/cassandra/bridge/CassandraBridge.java
+++ 
b/cassandra-bridge/src/main/java/org/apache/cassandra/bridge/CassandraBridge.java
@@ -84,8 +84,6 @@ public abstract class CassandraBridge
                                                                                
     @NotNull Partitioner partitioner,
                                                                                
     @NotNull List<String> keys);
 
-    public abstract TimeProvider timeProvider();
-
     // Compaction Stream Scanner
     // CHECKSTYLE IGNORE: Method with many parameters
     public abstract StreamScanner<Rid> getCompactionScanner(@NotNull CqlTable 
table,
diff --git 
a/cassandra-bridge/src/main/java/org/apache/cassandra/spark/data/DataLayer.java 
b/cassandra-bridge/src/main/java/org/apache/cassandra/spark/data/DataLayer.java
index 19ef6c9..27f0407 100644
--- 
a/cassandra-bridge/src/main/java/org/apache/cassandra/spark/data/DataLayer.java
+++ 
b/cassandra-bridge/src/main/java/org/apache/cassandra/spark/data/DataLayer.java
@@ -164,6 +164,11 @@ public abstract class DataLayer implements Serializable
 
     public abstract boolean isInPartition(int partitionId, BigInteger token, 
ByteBuffer key);
 
+    /**
+     * @return a TimeProvider
+     */
+    public abstract TimeProvider timeProvider();
+
     public List<PartitionKeyFilter> partitionKeyFiltersInRange(
     int partitionId,
     List<PartitionKeyFilter> partitionKeyFilters) throws NoMatchFoundException
@@ -282,14 +287,6 @@ public abstract class DataLayer implements Serializable
                                                  rangeFilter, timeProvider(), 
stats(), executorService());
     }
 
-    /**
-     * @return a TimeProvider that returns the time now in seconds. User can 
override with their own provider
-     */
-    public TimeProvider timeProvider()
-    {
-        return bridge().timeProvider();
-    }
-
     /**
      * @param filters array of push down filters that
      * @return an array of push filters that are <b>not</b> supported by this 
data layer
diff --git 
a/cassandra-bridge/src/main/java/org/apache/cassandra/spark/utils/TimeProvider.java
 
b/cassandra-bridge/src/main/java/org/apache/cassandra/spark/utils/ReaderTimeProvider.java
similarity index 56%
copy from 
cassandra-bridge/src/main/java/org/apache/cassandra/spark/utils/TimeProvider.java
copy to 
cassandra-bridge/src/main/java/org/apache/cassandra/spark/utils/ReaderTimeProvider.java
index 950cb6a..cbc1751 100644
--- 
a/cassandra-bridge/src/main/java/org/apache/cassandra/spark/utils/TimeProvider.java
+++ 
b/cassandra-bridge/src/main/java/org/apache/cassandra/spark/utils/ReaderTimeProvider.java
@@ -20,15 +20,29 @@
 package org.apache.cassandra.spark.utils;
 
 /**
- * Provides current time
-  */
-@FunctionalInterface
-public interface TimeProvider
+ * A {@link TimeProvider} used by reader. It decides the reference epoch on 
instantiation and distribute to executors
+ */
+public class ReaderTimeProvider implements TimeProvider
 {
-    TimeProvider INSTANCE = () -> (int) 
Math.floorDiv(System.currentTimeMillis(), 1000L);
+    private final int referenceEpochInSeconds;
+
+    public ReaderTimeProvider()
+    {
+        this.referenceEpochInSeconds = nowInSeconds();
+    }
 
     /**
-     * @return current time in truncated seconds
+     * Constructor used for deserialization
+     * @param referenceEpochInSeconds reference epoch to set
      */
-    int nowInTruncatedSeconds();
+    public ReaderTimeProvider(int referenceEpochInSeconds)
+    {
+        this.referenceEpochInSeconds = referenceEpochInSeconds;
+    }
+
+    @Override
+    public int referenceEpochInSeconds()
+    {
+        return referenceEpochInSeconds;
+    }
 }
diff --git 
a/cassandra-bridge/src/main/java/org/apache/cassandra/spark/utils/TimeProvider.java
 
b/cassandra-bridge/src/main/java/org/apache/cassandra/spark/utils/TimeProvider.java
index 950cb6a..481b8f9 100644
--- 
a/cassandra-bridge/src/main/java/org/apache/cassandra/spark/utils/TimeProvider.java
+++ 
b/cassandra-bridge/src/main/java/org/apache/cassandra/spark/utils/TimeProvider.java
@@ -19,16 +19,47 @@
 
 package org.apache.cassandra.spark.utils;
 
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.annotations.VisibleForTesting;
+
 /**
- * Provides current time
+ * Provides time
   */
-@FunctionalInterface
 public interface TimeProvider
 {
-    TimeProvider INSTANCE = () -> (int) 
Math.floorDiv(System.currentTimeMillis(), 1000L);
+    /**
+     * The {@code DEFAULT} time provider sets the reference time on class 
loading. The reference time is fixed for the
+     * local JVM. If the same reference time for all executors is desired, you 
should avoid using the {@code DEFAULT}
+     * and use {@link ReaderTimeProvider} or similar kind that allows to 
broadcast the reference time value from driver.
+     * <p></p>
+     * It should be used for testing only.
+     */
+    @VisibleForTesting
+    TimeProvider DEFAULT = new TimeProvider()
+    {
+        private final int referenceEpochInSeconds = nowInSeconds();
+
+        @Override
+        public int referenceEpochInSeconds()
+        {
+            return referenceEpochInSeconds;
+        }
+    };
+
+    /**
+     * @return current time in seconds
+     */
+    default int nowInSeconds()
+    {
+        return (int) 
TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis());
+    }
 
     /**
-     * @return current time in truncated seconds
+     * Get the time value that is used as a reference. It should never change 
throughout the lifecycle of the provider
+     * @return a fixed epoch time in seconds
+     *
+     * Note that the actual constant value returned is implementation dependent
      */
-    int nowInTruncatedSeconds();
+    int referenceEpochInSeconds();
 }
diff --git 
a/cassandra-bridge/src/main/java/org/apache/cassandra/spark/utils/test/TestSchema.java
 
b/cassandra-bridge/src/main/java/org/apache/cassandra/spark/utils/test/TestSchema.java
index 2e6b0a8..0949940 100644
--- 
a/cassandra-bridge/src/main/java/org/apache/cassandra/spark/utils/test/TestSchema.java
+++ 
b/cassandra-bridge/src/main/java/org/apache/cassandra/spark/utils/test/TestSchema.java
@@ -78,6 +78,7 @@ public final class TestSchema
         private Integer blobSize = null;
         private boolean withCompression = true;
         private boolean quoteIdentifiers = false;
+        private int ttlSecs = 0;
 
         public Builder(CassandraBridge bridge)
         {
@@ -163,6 +164,12 @@ public final class TestSchema
             return this;
         }
 
+        public Builder withTTL(int ttlSecs)
+        {
+            this.ttlSecs = ttlSecs;
+            return this;
+        }
+
         public TestSchema build()
         {
             if (!partitionKeys.isEmpty())
@@ -192,7 +199,7 @@ public final class TestSchema
         }
     }
 
-        private final CassandraBridge bridge;
+    private final CassandraBridge bridge;
     @NotNull
     public final String keyspace;
     public final String table;
@@ -249,7 +256,10 @@ public final class TestSchema
         this.blobSize = builder.blobSize;
         this.allFields = buildAllFields(partitionKeys, clusteringKeys, 
columns);
         this.fieldPositions = calculateFieldPositions(allFields);
-        this.createStatement = buildCreateStatement(columns, 
builder.sortOrders, builder.withCompression);
+        this.createStatement = buildCreateStatement(columns,
+                                                    builder.sortOrders,
+                                                    builder.withCompression,
+                                                    builder.ttlSecs);
         this.insertStatement = buildInsertStatement(columns, 
builder.insertFields);
         this.updateStatement = buildUpdateStatement();
         this.deleteStatement = buildDeleteStatement(builder.deleteFields);
@@ -363,7 +373,8 @@ public final class TestSchema
 
     private String buildCreateStatement(List<CqlField> columns,
                                         List<CqlField.SortOrder> sortOrders,
-                                        boolean withCompression)
+                                        boolean withCompression,
+                                        int ttlSecs)
     {
         StringBuilder createStmtBuilder = new StringBuilder().append("CREATE 
TABLE ")
                                                              
.append(maybeQuoteIdentifierIfRequested(keyspace))
@@ -398,9 +409,11 @@ public final class TestSchema
 
         createStmtBuilder.append("))");
 
+        createStmtBuilder.append(" WITH comment = 'test table'"); // take 
'WITH', so the rest can append 'AND' safely
+
         if (!sortOrders.isEmpty())
         {
-            createStmtBuilder.append(" WITH CLUSTERING ORDER BY (");
+            createStmtBuilder.append(" AND CLUSTERING ORDER BY (");
             for (int sortOrder = 0; sortOrder < sortOrders.size(); sortOrder++)
             {
                 
createStmtBuilder.append(maybeQuoteIdentifierIfRequested(clusteringKeys.get(sortOrder).name()))
@@ -414,9 +427,15 @@ public final class TestSchema
             createStmtBuilder.append(")");
         }
 
+
         if (!withCompression)
         {
-            createStmtBuilder.append(" WITH compression = 
{'enabled':'false'}");
+            createStmtBuilder.append(" AND compression = {'enabled':'false'}");
+        }
+
+        if (ttlSecs > 0)
+        {
+            createStmtBuilder.append(" AND default_time_to_live = " + ttlSecs);
         }
 
         return createStmtBuilder.append(";")
diff --git 
a/cassandra-four-zero/src/main/java/org/apache/cassandra/bridge/CassandraBridgeImplementation.java
 
b/cassandra-four-zero/src/main/java/org/apache/cassandra/bridge/CassandraBridgeImplementation.java
index 4cad69b..56e6d46 100644
--- 
a/cassandra-four-zero/src/main/java/org/apache/cassandra/bridge/CassandraBridgeImplementation.java
+++ 
b/cassandra-four-zero/src/main/java/org/apache/cassandra/bridge/CassandraBridgeImplementation.java
@@ -121,7 +121,6 @@ import 
org.apache.cassandra.spark.utils.SparkClassLoaderOverride;
 import org.apache.cassandra.spark.utils.TimeProvider;
 import org.apache.cassandra.tools.JsonTransformer;
 import org.apache.cassandra.tools.Util;
-import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.UUIDGen;
 import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
@@ -244,12 +243,6 @@ public class CassandraBridgeImplementation extends 
CassandraBridge
                     .collect(Collectors.toList());
     }
 
-    @Override
-    public TimeProvider timeProvider()
-    {
-        return FBUtilities::nowInSeconds;
-    }
-
     @Override
     public StreamScanner<Rid> getCompactionScanner(@NotNull CqlTable table,
                                                    @NotNull Partitioner 
partitioner,
diff --git 
a/cassandra-four-zero/src/main/java/org/apache/cassandra/spark/reader/AbstractStreamScanner.java
 
b/cassandra-four-zero/src/main/java/org/apache/cassandra/spark/reader/AbstractStreamScanner.java
index 460ced0..b2ea6be 100644
--- 
a/cassandra-four-zero/src/main/java/org/apache/cassandra/spark/reader/AbstractStreamScanner.java
+++ 
b/cassandra-four-zero/src/main/java/org/apache/cassandra/spark/reader/AbstractStreamScanner.java
@@ -21,6 +21,7 @@ package org.apache.cassandra.spark.reader;
 
 import java.io.Closeable;
 import java.io.IOException;
+import java.math.BigInteger;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Iterator;
@@ -111,13 +112,33 @@ public abstract class AbstractStreamScanner implements 
StreamScanner<Rid>, Close
     @Override
     public abstract void close() throws IOException;
 
-    protected abstract void handleRowTombstone(Row row);
+    /**
+     * Handles the row tombstone
+     * @param token token of the partition that the row belongs to
+     * @param row row tombstone
+     */
+    protected abstract void handleRowTombstone(BigInteger token, Row row);
 
-    protected abstract void handlePartitionTombstone(UnfilteredRowIterator 
partition);
+    /**
+     * Handles the partition tombstone
+     * @param token token of the partition
+     * @param partition partition tombstone
+     */
+    protected abstract void handlePartitionTombstone(BigInteger token, 
UnfilteredRowIterator partition);
 
-    protected abstract void handleCellTombstone();
 
-    protected abstract void handleCellTombstoneInComplex(Cell<?> cell);
+    /**
+     * Handle the cell tombstone
+     * @param token token of the partition that the cell belongs to
+     */
+    protected abstract void handleCellTombstone(BigInteger token);
+
+    /**
+     * Handle the cell tombstone in complex type, e.g. UDT and collections
+     * @param token token of the partition that the cell belongs to
+     * @param cell cell tombstone
+     */
+    protected abstract void handleCellTombstoneInComplex(BigInteger token, 
Cell<?> cell);
 
     @Override
     public void advanceToNextColumn()
@@ -152,16 +173,16 @@ public abstract class AbstractStreamScanner implements 
StreamScanner<Rid>, Close
                         // Advance to next partition
                         partition = allPartitions.next();
 
+                        BigInteger token = 
ReaderUtils.tokenToBigInteger(partition.partitionKey().getToken());
                         if (partition.partitionLevelDeletion().isLive())
                         {
                             // Reset rid with new partition key
-                            
rid.setPartitionKeyCopy(partition.partitionKey().getKey(),
-                                                    
ReaderUtils.tokenToBigInteger(partition.partitionKey().getToken()));
+                            
rid.setPartitionKeyCopy(partition.partitionKey().getKey(), token);
                         }
                         else
                         {
                             // There's a partition-level delete
-                            handlePartitionTombstone(partition);
+                            handlePartitionTombstone(token, partition);
                             return true;
                         }
                     }
@@ -230,7 +251,7 @@ public abstract class AbstractStreamScanner implements 
StreamScanner<Rid>, Close
                     // There is a CQL row level delete
                     if (!row.deletion().isLive())
                     {
-                        handleRowTombstone(row);
+                        handleRowTombstone(rid.getToken(), row);
                         return true;
                     }
 
@@ -254,6 +275,11 @@ public abstract class AbstractStreamScanner implements 
StreamScanner<Rid>, Close
                     // either are present) will get emitted
                     columns = row.iterator();
                 }
+                else if (unfiltered.isRangeTombstoneMarker())
+                {
+                    throw new IllegalStateException("Encountered 
RangeTombstoneMarker. " +
+                                                    "It should have been 
purged in CompactionIterator");
+                }
                 else
                 {
                     // As of Cassandra 4, the unfiltered kind can either be 
row or range tombstone marker,
@@ -370,7 +396,7 @@ public abstract class AbstractStreamScanner implements 
StreamScanner<Rid>, Close
                                                              null));
             if (cell.isTombstone())
             {
-                handleCellTombstone();
+                handleCellTombstone(rid.getToken());
             }
             else
             {
@@ -425,13 +451,13 @@ public abstract class AbstractStreamScanner implements 
StreamScanner<Rid>, Close
                     Cell<?> cell = cells.next();
                     // Re: isLive vs. isTombstone - isLive considers TTL so 
that if a cell is expiring soon,
                     // it is handled as tombstone
-                    if (cell.isLive(timeProvider.nowInTruncatedSeconds()))
+                    if (cell.isLive(timeProvider.referenceEpochInSeconds()))
                     {
                         buffer.addCell(cell);
                     }
                     else
                     {
-                        handleCellTombstoneInComplex(cell);
+                        handleCellTombstoneInComplex(rid.getToken(), cell);
                     }
                     // In the case the cell is deleted, the deletion time is 
also the cell's timestamp
                     maxTimestamp = Math.max(maxTimestamp, cell.timestamp());
@@ -443,7 +469,7 @@ public abstract class AbstractStreamScanner implements 
StreamScanner<Rid>, Close
             else
             {
                 // The entire collection/UDT is deleted
-                handleCellTombstone();
+                handleCellTombstone(rid.getToken());
                 rid.setTimestamp(deletionTime.markedForDeleteAt());
             }
 
diff --git 
a/cassandra-four-zero/src/main/java/org/apache/cassandra/spark/reader/CompactionStreamScanner.java
 
b/cassandra-four-zero/src/main/java/org/apache/cassandra/spark/reader/CompactionStreamScanner.java
index 9c2d156..89a2df3 100644
--- 
a/cassandra-four-zero/src/main/java/org/apache/cassandra/spark/reader/CompactionStreamScanner.java
+++ 
b/cassandra-four-zero/src/main/java/org/apache/cassandra/spark/reader/CompactionStreamScanner.java
@@ -19,6 +19,7 @@
 
 package org.apache.cassandra.spark.reader;
 
+import java.math.BigInteger;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.List;
@@ -26,6 +27,8 @@ import java.util.UUID;
 import java.util.function.LongPredicate;
 import java.util.stream.Collectors;
 
+import com.google.common.annotations.VisibleForTesting;
+
 import org.apache.cassandra.db.AbstractCompactionController;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.DecoratedKey;
@@ -54,11 +57,12 @@ public class CompactionStreamScanner extends 
AbstractStreamScanner
     private AbstractCompactionStrategy.ScannerList scanners;
     private CompactionIterator ci;
 
+    @VisibleForTesting
     CompactionStreamScanner(@NotNull TableMetadata cfMetaData,
                             @NotNull Partitioner partitionerType,
                             @NotNull Collection<? extends Scannable> toCompact)
     {
-        this(cfMetaData, partitionerType, TimeProvider.INSTANCE, toCompact);
+        this(cfMetaData, partitionerType, TimeProvider.DEFAULT, toCompact);
     }
 
     public CompactionStreamScanner(@NotNull TableMetadata cfMetaData,
@@ -79,33 +83,41 @@ public class CompactionStreamScanner extends 
AbstractStreamScanner
     }
 
     @Override
-    protected void handleRowTombstone(Row row)
+    protected void handleRowTombstone(BigInteger token, Row row)
     {
-        throw new IllegalStateException("Row tombstone found, it should have 
been purged in CompactionIterator");
+        throw new IllegalStateException("Row tombstone found. " +
+                                        "It should have been purged in 
CompactionIterator."  +
+                                        "Partition key token: " + token);
     }
 
     @Override
-    protected void handlePartitionTombstone(UnfilteredRowIterator partition)
+    protected void handlePartitionTombstone(BigInteger token, 
UnfilteredRowIterator partition)
     {
-        throw new IllegalStateException("Partition tombstone found, it should 
have been purged in CompactionIterator");
+        throw new IllegalStateException("Partition tombstone found. " +
+                                        "It should have been purged in 
CompactionIterator. " +
+                                        "Partition key token: " + token);
     }
 
     @Override
-    protected void handleCellTombstone()
+    protected void handleCellTombstone(BigInteger token)
     {
-        throw new IllegalStateException("Cell tombstone found, it should have 
been purged in CompactionIterator");
+        throw new IllegalStateException("Cell tombstone found. " +
+                                        "It should have been purged in 
CompactionIterator. " +
+                                        "Partition key token: " + token);
     }
 
     @Override
-    protected void handleCellTombstoneInComplex(Cell<?> cell)
+    protected void handleCellTombstoneInComplex(BigInteger token, Cell<?> cell)
     {
-        // Do nothing: to not introduce behavior change to the SBR code path
+        throw new IllegalStateException("Cell tombstone in complex type found. 
" +
+                                        "It should have been purged in 
CompactionIterator. " +
+                                        "Partition key token: " + token);
     }
 
     @Override
     UnfilteredPartitionIterator initializePartitions()
     {
-        int nowInSec = timeProvider.nowInTruncatedSeconds();
+        int nowInSec = timeProvider.referenceEpochInSeconds();
         Keyspace keyspace = Keyspace.openWithoutSSTables(metadata.keyspace);
         ColumnFamilyStore cfStore = 
keyspace.getColumnFamilyStore(metadata.name);
         controller = new PurgingCompactionController(cfStore, 
CompactionParams.TombstoneOption.NONE);
diff --git 
a/cassandra-four-zero/src/test/java/org/apache/cassandra/spark/reader/SSTableReaderTests.java
 
b/cassandra-four-zero/src/test/java/org/apache/cassandra/spark/reader/SSTableReaderTests.java
index 2b87784..1dc26a8 100644
--- 
a/cassandra-four-zero/src/test/java/org/apache/cassandra/spark/reader/SSTableReaderTests.java
+++ 
b/cassandra-four-zero/src/test/java/org/apache/cassandra/spark/reader/SSTableReaderTests.java
@@ -39,6 +39,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Function;
@@ -46,8 +47,10 @@ import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
 import com.google.common.collect.ImmutableMap;
+import com.google.common.util.concurrent.Uninterruptibles;
 import org.apache.commons.lang.StringUtils;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -67,6 +70,9 @@ import org.apache.cassandra.io.sstable.ISSTableScanner;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.serializers.UTF8Serializer;
+import org.apache.cassandra.spark.TestDataLayer;
+import org.apache.cassandra.spark.TestRunnable;
+import org.apache.cassandra.spark.TestUtils;
 import org.apache.cassandra.spark.data.CqlTable;
 import org.apache.cassandra.spark.data.FileType;
 import org.apache.cassandra.spark.data.ReplicationFactor;
@@ -78,12 +84,17 @@ import org.apache.cassandra.spark.stats.Stats;
 import org.apache.cassandra.spark.utils.ByteBufferUtils;
 import org.apache.cassandra.spark.utils.TemporaryDirectory;
 import org.apache.cassandra.spark.utils.Throwing;
+import org.apache.cassandra.spark.utils.TimeProvider;
 import org.apache.cassandra.spark.utils.test.TestSSTable;
 import org.apache.cassandra.spark.utils.test.TestSchema;
 import org.apache.cassandra.utils.Pair;
+import org.apache.spark.sql.catalyst.util.GenericArrayData;
 import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
 
+import static org.apache.cassandra.spark.TestUtils.countSSTables;
+import static org.apache.cassandra.spark.TestUtils.getFileType;
+import static org.apache.cassandra.spark.TestUtils.runTest;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotEquals;
@@ -99,6 +110,9 @@ public class SSTableReaderTests
     private static final int ROWS = 50;
     private static final int COLUMNS = 25;
 
+    @TempDir
+    private Path tempDir;
+
     @Test
     public void testOpenCompressedRawInputStream()
     {
@@ -1048,6 +1062,112 @@ public class SSTableReaderTests
             });
     }
 
+    @Test
+    public void testCollectionWithTtlUsingConstantReferenceTime()
+    {
+        // offset is 0, column values in all rows should be unexpired; thus, 
reading 10 values
+        testTtlUsingConstantReferenceTimeHelper(50, 0, 10, 10);
+        // ensure all rows expires by advancing enough time in the future; 
thus, reading 0 values
+        testTtlUsingConstantReferenceTimeHelper(50, 100, 10, 0);
+    }
+
+    // helper that write rows with ttl, and assert on the compaction result by 
changing the reference time
+    private void testTtlUsingConstantReferenceTimeHelper(int ttlSecs,
+                                                         int timeOffsetSecs,
+                                                         int rows,
+                                                         int expectedValues)
+    {
+        AtomicInteger referenceEpoch = new AtomicInteger(0);
+        TimeProvider navigatableTimeProvider = new TimeProvider()
+        {
+            @Override
+            public int referenceEpochInSeconds()
+            {
+                return referenceEpoch.get();
+            }
+        };
+
+        Set<Integer> expectedColValue = new HashSet<>(Arrays.asList(1, 2, 3));
+        TestRunnable test = (partitioner, dir, bridge) -> {
+            TestSchema schema = TestSchema.builder(BRIDGE)
+                                          .withPartitionKey("a", BRIDGE.aInt())
+                                          .withColumn("b", 
BRIDGE.set(BRIDGE.aInt()))
+                                          .withTTL(ttlSecs)
+                                          .build();
+            schema.writeSSTable(dir, BRIDGE, partitioner, (writer) -> {
+                for (int i = 0; i < rows; i++)
+                {
+                    writer.write(i, expectedColValue);
+                }
+                Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
+            });
+            int t1 = navigatableTimeProvider.nowInSeconds();
+            assertEquals(1, countSSTables(dir));
+
+            // open CompactionStreamScanner over SSTables
+            TableMetadata metaData = new SchemaBuilder(schema.createStatement,
+                                                       schema.keyspace,
+                                                       new 
ReplicationFactor(ReplicationFactor.ReplicationStrategy.SimpleStrategy,
+                                                                             
ImmutableMap.of("replication_factor", 1)),
+                                                       partitioner)
+                                     .tableMetaData();
+
+            CqlTable table = schema.buildTable();
+            TestDataLayer dataLayer = new TestDataLayer(BRIDGE,
+                                                        getFileType(dir, 
FileType.DATA).collect(Collectors.toList()),
+                                                        table);
+            Set<SSTableReader> toCompact = dataLayer.listSSTables()
+                                                    .map(ssTable -> {
+                                                        try
+                                                        {
+                                                            return 
openReader(metaData, ssTable);
+                                                        }
+                                                        catch (IOException e)
+                                                        {
+                                                            throw new 
RuntimeException(e);
+                                                        }
+                                                    })
+                                                    
.collect(Collectors.toSet());
+
+            int count = 0;
+            referenceEpoch.set(t1 + timeOffsetSecs);
+            try (CompactionStreamScanner scanner = new 
CompactionStreamScanner(metaData, partitioner, navigatableTimeProvider, 
toCompact))
+            {
+                // iterate through CompactionStreamScanner verifying it 
correctly compacts data together
+                Rid rid = scanner.rid();
+                while (scanner.hasNext())
+                {
+                    scanner.advanceToNextColumn();
+
+                    // extract column name
+                    ByteBuffer colBuf = rid.getColumnName();
+                    String colName = 
ByteBufferUtils.string(ByteBufferUtils.readBytesWithShortLength(colBuf));
+                    colBuf.get();
+                    if (StringUtils.isEmpty(colName))
+                    {
+                        continue;
+                    }
+                    assertEquals("b", colName);
+
+                    // extract value column
+                    ByteBuffer b = rid.getValue();
+                    Set set = new HashSet(Arrays.asList(((GenericArrayData) 
BRIDGE.set(BRIDGE.aInt())
+                                                                               
      .deserialize(b))
+                                                           .array()));
+                    assertEquals(expectedColValue, set);
+                    count++;
+                }
+            }
+            assertEquals(expectedValues, count);
+        };
+
+        qt()
+        .forAll(TestUtils.partitioners())
+        .checkAssert(partitioner -> {
+            runTest(partitioner, BRIDGE, test);
+        });
+    }
+
     private static TableMetadata tableMetadata(TestSchema schema, Partitioner 
partitioner)
     {
         return new SchemaBuilder(schema.createStatement,


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org


Reply via email to