This is an automated email from the ASF dual-hosted git repository.

adelapena pushed a commit to branch cassandra-5.0
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/cassandra-5.0 by this push:
     new e8fb4b2f63 Ensure that empty SAI column indexes do not fail on 
validation after full-SSTable streaming
e8fb4b2f63 is described below

commit e8fb4b2f63b32f337447992f9eb57a12e2afc0e4
Author: Andrés de la Peña <a.penya.gar...@gmail.com>
AuthorDate: Mon Nov 13 12:46:53 2023 +0000

    Ensure that empty SAI column indexes do not fail on validation after 
full-SSTable streaming
    
    patch by Andrés de la Peña; reviewed by Caleb Rackliffe for CASSANDRA-19017
    
    Co-authored-by: Andrés de la Peña <a.penya.gar...@gmail.com>
    Co-authored-by: Caleb Rackliffe <calebrackli...@gmail.com>
---
 CHANGES.txt                                        |  1 +
 .../sai/disk/v1/ColumnCompletionMarkerUtil.java    | 74 +++++++++++++++++++++
 .../index/sai/disk/v1/MemtableIndexWriter.java     |  6 +-
 .../index/sai/disk/v1/SSTableIndexWriter.java      |  4 +-
 .../index/sai/disk/v1/V1OnDiskFormat.java          | 77 +++++++++++++---------
 .../distributed/test/sai/IndexStreamingTest.java   | 23 +++----
 6 files changed, 140 insertions(+), 45 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 9a4e2fa287..d744f50fa1 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 5.0-beta1
+ * Ensure that empty SAI column indexes do not fail on validation after 
full-SSTable streaming (CASSANDRA-19017)
  * SAI in-memory index should check max term size (CASSANDRA-18926)
  * Set default disk_access_mode to mmap_index_only (CASSANDRA-19021)
  * Exclude net.java.dev.jna:jna dependency from dependencies of 
org.caffinitas.ohc:ohc-core (CASSANDRA-18992)
diff --git 
a/src/java/org/apache/cassandra/index/sai/disk/v1/ColumnCompletionMarkerUtil.java
 
b/src/java/org/apache/cassandra/index/sai/disk/v1/ColumnCompletionMarkerUtil.java
new file mode 100644
index 0000000000..760083ad9d
--- /dev/null
+++ 
b/src/java/org/apache/cassandra/index/sai/disk/v1/ColumnCompletionMarkerUtil.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.index.sai.disk.v1;
+
+import java.io.IOException;
+
+import org.apache.cassandra.index.sai.IndexContext;
+import org.apache.cassandra.index.sai.disk.format.IndexComponent;
+import org.apache.cassandra.index.sai.disk.format.IndexDescriptor;
+import org.apache.cassandra.index.sai.disk.io.IndexOutputWriter;
+import org.apache.lucene.store.IndexInput;
+import org.apache.lucene.store.IndexOutput;
+
+/**
+ * Utility class for creating and reading the column completion marker, {@link 
IndexComponent#COLUMN_COMPLETION_MARKER}.
+ * </p>
+ * The file has a header and a footer, as written by {@link 
SAICodecUtils#writeHeader(IndexOutput)} and
+ * {@link SAICodecUtils#writeFooter(IndexOutput)}. The only content of the 
file is a single byte indicating whether the
+ * column index is empty or not. If the index is empty the completion marker 
will be the only per-index component.
+ */
+public class ColumnCompletionMarkerUtil
+{
+    private static final byte EMPTY = (byte) 1;
+    private static final byte NOT_EMPTY = (byte) 0;
+
+    /**
+     * Creates a column index completion marker for the specified column 
index, storing in it whether the index is empty.
+     *
+     * @param descriptor the index descriptor
+     * @param context the column index context
+     * @param isEmpty whether the index is empty
+     */
+    public static void create(IndexDescriptor descriptor, IndexContext 
context, boolean isEmpty) throws IOException
+    {
+        try (IndexOutputWriter output = 
descriptor.openPerIndexOutput(IndexComponent.COLUMN_COMPLETION_MARKER, context))
+        {
+            SAICodecUtils.writeHeader(output);
+            output.writeByte(isEmpty ? EMPTY : NOT_EMPTY);
+            SAICodecUtils.writeFooter(output);
+        }
+    }
+
+    /**
+     * Reads the column index completion marker and returns whether if the 
index is empty.
+     *
+     * @param descriptor the index descriptor
+     * @param context the column index context
+     * @return {@code true} if the index is empty, {@code false} otherwise.
+     */
+    public static boolean isEmptyIndex(IndexDescriptor descriptor, 
IndexContext context) throws IOException
+    {
+        try (IndexInput input = 
descriptor.openPerIndexInput(IndexComponent.COLUMN_COMPLETION_MARKER, context))
+        {
+            SAICodecUtils.checkHeader(input); // consume header
+            return input.readByte() == EMPTY;
+        }
+    }
+}
diff --git 
a/src/java/org/apache/cassandra/index/sai/disk/v1/MemtableIndexWriter.java 
b/src/java/org/apache/cassandra/index/sai/disk/v1/MemtableIndexWriter.java
index 10d30531f5..cd1495c3f6 100644
--- a/src/java/org/apache/cassandra/index/sai/disk/v1/MemtableIndexWriter.java
+++ b/src/java/org/apache/cassandra/index/sai/disk/v1/MemtableIndexWriter.java
@@ -101,7 +101,8 @@ public class MemtableIndexWriter implements 
PerColumnIndexWriter
                 logger.debug(indexContext.logMessage("No indexed rows to flush 
from SSTable {}."), indexDescriptor.sstableDescriptor);
                 // Write a completion marker even though we haven't written 
anything to the index,
                 // so we won't try to build the index again for the SSTable
-                
indexDescriptor.createComponentOnDisk(IndexComponent.COLUMN_COMPLETION_MARKER, 
indexContext);
+                ColumnCompletionMarkerUtil.create(indexDescriptor, 
indexContext, true);
+
                 return;
             }
 
@@ -204,7 +205,8 @@ public class MemtableIndexWriter implements 
PerColumnIndexWriter
 
     private void completeIndexFlush(long cellCount, long startTime, Stopwatch 
stopwatch) throws IOException
     {
-        
indexDescriptor.createComponentOnDisk(IndexComponent.COLUMN_COMPLETION_MARKER, 
indexContext);
+        // create a completion marker indicating that the index is complete 
and not-empty
+        ColumnCompletionMarkerUtil.create(indexDescriptor, indexContext, 
false);
 
         indexContext.getIndexMetrics().memtableIndexFlushCount.inc();
 
diff --git 
a/src/java/org/apache/cassandra/index/sai/disk/v1/SSTableIndexWriter.java 
b/src/java/org/apache/cassandra/index/sai/disk/v1/SSTableIndexWriter.java
index 6843206462..3f85c30225 100644
--- a/src/java/org/apache/cassandra/index/sai/disk/v1/SSTableIndexWriter.java
+++ b/src/java/org/apache/cassandra/index/sai/disk/v1/SSTableIndexWriter.java
@@ -134,7 +134,9 @@ public class SSTableIndexWriter implements 
PerColumnIndexWriter
             }
 
             writeSegmentsMetadata();
-            
indexDescriptor.createComponentOnDisk(IndexComponent.COLUMN_COMPLETION_MARKER, 
indexContext);
+
+            // write column index completion marker, indicating whether the 
index is empty
+            ColumnCompletionMarkerUtil.create(indexDescriptor, indexContext, 
segments.isEmpty());
         }
         finally
         {
diff --git 
a/src/java/org/apache/cassandra/index/sai/disk/v1/V1OnDiskFormat.java 
b/src/java/org/apache/cassandra/index/sai/disk/v1/V1OnDiskFormat.java
index e4f8737de6..323d81131f 100644
--- a/src/java/org/apache/cassandra/index/sai/disk/v1/V1OnDiskFormat.java
+++ b/src/java/org/apache/cassandra/index/sai/disk/v1/V1OnDiskFormat.java
@@ -24,6 +24,7 @@ import java.util.EnumSet;
 import java.util.Set;
 
 import com.google.common.annotations.VisibleForTesting;
+
 import org.apache.cassandra.utils.Throwables;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -188,21 +189,7 @@ public class V1OnDiskFormat implements OnDiskFormat
         {
             if (isNotBuildCompletionMarker(indexComponent))
             {
-                try (IndexInput input = 
indexDescriptor.openPerSSTableInput(indexComponent))
-                {
-                    if (checksum)
-                        SAICodecUtils.validateChecksum(input);
-                    else
-                        SAICodecUtils.validate(input);
-                }
-                catch (Exception e)
-                {
-                    logger.warn(indexDescriptor.logMessage("{} failed for 
index component {} on SSTable {}."),
-                                                           checksum ? 
"Checksum validation" : "Validation",
-                                                           indexComponent,
-                                                           
indexDescriptor.sstableDescriptor);
-                    rethrowIOException(e);
-                }
+                validateIndexComponent(indexDescriptor, null, indexComponent, 
checksum);
             }
         }
     }
@@ -210,29 +197,57 @@ public class V1OnDiskFormat implements OnDiskFormat
     @Override
     public void validatePerColumnIndexComponents(IndexDescriptor 
indexDescriptor, IndexContext indexContext, boolean checksum)
     {
+        // determine if the index is empty, which would be encoded in the 
column completion marker
+        boolean isEmptyIndex = false;
+        if 
(indexDescriptor.hasComponent(IndexComponent.COLUMN_COMPLETION_MARKER, 
indexContext))
+        {
+            // first validate the file...
+            validateIndexComponent(indexDescriptor, indexContext, 
IndexComponent.COLUMN_COMPLETION_MARKER, checksum);
+
+            // ...then read to check if the index is empty
+            try
+            {
+                isEmptyIndex = 
ColumnCompletionMarkerUtil.isEmptyIndex(indexDescriptor, indexContext);
+            }
+            catch (IOException e)
+            {
+                rethrowIOException(e);
+            }
+        }
+
         for (IndexComponent indexComponent : 
perColumnIndexComponents(indexContext))
         {
-            if (isNotBuildCompletionMarker(indexComponent))
+            if (!isEmptyIndex && isNotBuildCompletionMarker(indexComponent))
             {
-                try (IndexInput input = 
indexDescriptor.openPerIndexInput(indexComponent, indexContext))
-                {
-                    if (checksum)
-                        SAICodecUtils.validateChecksum(input);
-                    else
-                        SAICodecUtils.validate(input);
-                }
-                catch (Exception e)
-                {
-                    logger.warn(indexDescriptor.logMessage("{} failed for 
index component {} on SSTable {}"),
-                                                           checksum ? 
"Checksum validation" : "Validation",
-                                                           indexComponent,
-                                                           
indexDescriptor.sstableDescriptor);
-                    rethrowIOException(e);
-                }
+                validateIndexComponent(indexDescriptor, indexContext, 
indexComponent, checksum);
             }
         }
     }
 
+    private static void validateIndexComponent(IndexDescriptor indexDescriptor,
+                                               IndexContext indexContext,
+                                               IndexComponent indexComponent,
+                                               boolean checksum)
+    {
+        try (IndexInput input = indexContext == null
+                                ? 
indexDescriptor.openPerSSTableInput(indexComponent)
+                                : 
indexDescriptor.openPerIndexInput(indexComponent, indexContext))
+        {
+            if (checksum)
+                SAICodecUtils.validateChecksum(input);
+            else
+                SAICodecUtils.validate(input);
+        }
+        catch (Exception e)
+        {
+            logger.warn(indexDescriptor.logMessage("{} failed for index 
component {} on SSTable {}"),
+                        checksum ? "Checksum validation" : "Validation",
+                        indexComponent,
+                        indexDescriptor.sstableDescriptor);
+            rethrowIOException(e);
+        }
+    }
+
     private static void rethrowIOException(Exception e)
     {
         if (e instanceof IOException)
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/sai/IndexStreamingTest.java
 
b/test/distributed/org/apache/cassandra/distributed/test/sai/IndexStreamingTest.java
index e637d56b87..c3f1f42073 100644
--- 
a/test/distributed/org/apache/cassandra/distributed/test/sai/IndexStreamingTest.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/sai/IndexStreamingTest.java
@@ -97,16 +97,10 @@ public class IndexStreamingTest extends TestBaseImpl
 
             int numSSTableComponents = isWide ? 
V1OnDiskFormat.WIDE_PER_SSTABLE_COMPONENTS.size() : 
V1OnDiskFormat.SKINNY_PER_SSTABLE_COMPONENTS.size();
             int numIndexComponents = isLiteral ? 
V1OnDiskFormat.LITERAL_COMPONENTS.size() : 
V1OnDiskFormat.NUMERIC_COMPONENTS.size();
-            int numComponents = sstableStreamingComponentsCount() + 
numSSTableComponents + numIndexComponents;
+            int numComponents = sstableStreamingComponentsCount() + 
numSSTableComponents + numIndexComponents + 1;
 
-            if (isLiteral)
-                cluster.schemaChange(withKeyspace(
-                    "CREATE CUSTOM INDEX ON %s.test(literal) USING 
'StorageAttachedIndex';"
-                ));
-            else
-                cluster.schemaChange(withKeyspace(
-                "CREATE CUSTOM INDEX ON %s.test(numeric) USING 
'StorageAttachedIndex';"
-                ));
+            cluster.schemaChange(withKeyspace("CREATE INDEX ON 
%s.test(literal) USING 'sai';"));
+            cluster.schemaChange(withKeyspace("CREATE INDEX ON 
%s.test(numeric) USING 'sai';"));
 
             cluster.stream().forEach(i ->
                 i.nodetoolResult("disableautocompaction", 
KEYSPACE).asserts().success()
@@ -115,12 +109,19 @@ public class IndexStreamingTest extends TestBaseImpl
             IInvokableInstance second = cluster.get(2);
             long sstableCount = 10;
             long expectedFiles = isZeroCopyStreaming ? sstableCount * 
numComponents : sstableCount;
+
             for (int i = 0; i < sstableCount; i++)
             {
                 if (isWide)
-                    first.executeInternal(withKeyspace("insert into 
%s.test(pk, ck, literal, numeric, b) values (?, ?, ?, ?, ?)"), i, i, "v" + i, 
i, BLOB);
+                {
+                    String insertTemplate = "INSERT INTO %s.test(pk, ck, " + 
(isLiteral ? "literal" : "numeric") + ", b) VALUES (?, ?, ?, ?)";
+                    first.executeInternal(withKeyspace(insertTemplate), i, i, 
isLiteral ? "v" + i : Integer.valueOf(i), BLOB);
+                }
                 else
-                    first.executeInternal(withKeyspace("insert into 
%s.test(pk, literal, numeric, b) values (?, ?, ?, ?)"), i, "v" + i, i, BLOB);
+                {
+                    String insertTemplate = "INSERT INTO %s.test(pk, " + 
(isLiteral ? "literal" : "numeric") + ", b) VALUES (?, ?, ?)";
+                    first.executeInternal(withKeyspace(insertTemplate), i, 
isLiteral ? "v" + i : Integer.valueOf(i), BLOB);
+                }
                 first.flush(KEYSPACE);
             }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to