This is an automated email from the ASF dual-hosted git repository.

maedhroz pushed a commit to branch cassandra-5.0
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/cassandra-5.0 by this push:
     new 273975ef26 Prioritize built indexes in IndexStatusManager
273975ef26 is described below

commit 273975ef26e5de1a10a98c0df364cb67fa7830e3
Author: Arun Ganesh <20590666+ark...@users.noreply.github.com>
AuthorDate: Tue Jul 23 20:55:13 2024 -0700

    Prioritize built indexes in IndexStatusManager
    
    Patch by Arun Ganesh; reviewed by Caleb Rackliffe and Zhao Yang for 
CASSANDRA-19400
---
 CHANGES.txt                                        |   1 +
 .../apache/cassandra/index/IndexStatusManager.java |  16 +
 .../cassandra/index/internal/CassandraIndex.java   |   7 +
 .../cassandra/index/IndexStatusManagerTest.java    | 450 +++++++++++++++++++++
 4 files changed, 474 insertions(+)

diff --git a/CHANGES.txt b/CHANGES.txt
index 48ef4700da..fbb989e13b 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 5.0.1
+ * Prioritize built indexes in IndexStatusManager (CASSANDRA-19400)
  * Add java.base/java.lang.reflect among opens for jvm11-client.options 
(CASSANDRA-19780)
 
 
diff --git a/src/java/org/apache/cassandra/index/IndexStatusManager.java 
b/src/java/org/apache/cassandra/index/IndexStatusManager.java
index 13a51831f9..1c0f5887db 100644
--- a/src/java/org/apache/cassandra/index/IndexStatusManager.java
+++ b/src/java/org/apache/cassandra/index/IndexStatusManager.java
@@ -19,6 +19,7 @@
 package org.apache.cassandra.index;
 
 import java.util.Collections;
+import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
@@ -41,6 +42,7 @@ import org.apache.cassandra.gms.Gossiper;
 import org.apache.cassandra.gms.VersionedValue;
 import org.apache.cassandra.locator.Endpoints;
 import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.locator.Replica;
 import org.apache.cassandra.serializers.MarshalException;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.FBUtilities;
@@ -84,18 +86,32 @@ public class IndexStatusManager
      */
     public <E extends Endpoints<E>> E filterForQuery(E liveEndpoints, Keyspace 
keyspace, Index.QueryPlan indexQueryPlan, ConsistencyLevel level)
     {
+        // UNKNOWN states are transient/rare; only a few replicas should have 
this state at any time. See CASSANDRA-19400
+        Set<Replica> queryableNonSucceeded = new HashSet<>(4);
+
         E queryableEndpoints = liveEndpoints.filter(replica -> {
 
+            boolean allBuilt = true;
             for (Index index : indexQueryPlan.getIndexes())
             {
                 Index.Status status = getIndexStatus(replica.endpoint(), 
keyspace.getName(), index.getIndexMetadata().name);
                 if (!index.isQueryable(status))
                     return false;
+
+                if (status != Index.Status.BUILD_SUCCEEDED)
+                    allBuilt = false;
             }
 
+            if (!allBuilt)
+                queryableNonSucceeded.add(replica);
+
             return true;
         });
 
+        // deprioritize replicas with queryable but non-succeeded indexes
+        if (!queryableNonSucceeded.isEmpty() && queryableNonSucceeded.size() 
!= queryableEndpoints.size())
+            queryableEndpoints = 
queryableEndpoints.sorted(Comparator.comparingInt(e -> 
queryableNonSucceeded.contains(e) ? 1 : -1));
+
         int initial = liveEndpoints.size();
         int filtered = queryableEndpoints.size();
 
diff --git a/src/java/org/apache/cassandra/index/internal/CassandraIndex.java 
b/src/java/org/apache/cassandra/index/internal/CassandraIndex.java
index 1f39ea502b..20c1a05328 100644
--- a/src/java/org/apache/cassandra/index/internal/CassandraIndex.java
+++ b/src/java/org/apache/cassandra/index/internal/CassandraIndex.java
@@ -207,6 +207,13 @@ public abstract class CassandraIndex implements Index
         };
     }
 
+    @Override
+    public boolean isQueryable(Status status)
+    {
+        // consider unknown status as queryable, because gossip may not be 
up-to-date for newly joining nodes.
+        return status == Status.BUILD_SUCCEEDED || status == Status.UNKNOWN || 
status == Status.FULL_REBUILD_STARTED;
+    }
+
     @Override
     public void validate(ReadCommand command) throws InvalidRequestException
     {
diff --git a/test/unit/org/apache/cassandra/index/IndexStatusManagerTest.java 
b/test/unit/org/apache/cassandra/index/IndexStatusManagerTest.java
new file mode 100644
index 0000000000..d08fec974a
--- /dev/null
+++ b/test/unit/org/apache/cassandra/index/IndexStatusManagerTest.java
@@ -0,0 +1,450 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.index;
+
+import java.net.UnknownHostException;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.stream.Collectors;
+
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.exceptions.ReadFailureException;
+import org.apache.cassandra.gms.VersionedValue;
+import org.apache.cassandra.locator.AbstractNetworkTopologySnitch;
+import org.apache.cassandra.locator.AbstractReplicationStrategy;
+import org.apache.cassandra.locator.EndpointsForRange;
+import org.apache.cassandra.locator.IEndpointSnitch;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.locator.NetworkTopologyStrategy;
+import org.apache.cassandra.locator.Replica;
+import org.apache.cassandra.locator.ReplicaUtils;
+import org.apache.cassandra.locator.TokenMetadata;
+import org.apache.cassandra.schema.IndexMetadata;
+import org.apache.cassandra.utils.JsonUtils;
+
+import static org.apache.cassandra.locator.ReplicaUtils.full;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.junit.Assert.assertArrayEquals;
+
+public class IndexStatusManagerTest
+{
+    static class Testcase
+    {
+        String keyspace;
+        int numRequired;
+        AbstractReplicationStrategy replicationStrategy;
+        Map<InetAddressAndPort, Map<String, Index.Status>> indexStatus;
+        EndpointsForRange expected;
+
+        Testcase(Builder builder)
+        {
+            keyspace = builder.keyspace;
+            numRequired = builder.numRequired;
+            replicationStrategy = builder.replicationStrategy;
+            indexStatus = builder.indexStatus;
+            expected = builder.expected;
+        }
+
+        static class Builder
+        {
+            String keyspace;
+            int numRequired;
+            AbstractReplicationStrategy replicationStrategy;
+            Map<InetAddressAndPort, Map<String, Index.Status>> indexStatus;
+            EndpointsForRange expected;
+
+            Builder keyspace(String ks)
+            {
+                keyspace = ks;
+                return this;
+            }
+
+            Builder numRequired(int required)
+            {
+                numRequired = required;
+                return this;
+            }
+
+            Builder replicationStrategy(AbstractReplicationStrategy strategy)
+            {
+                replicationStrategy = strategy;
+                return this;
+            }
+
+            Builder indexStatus(Map<InetAddressAndPort, Map<String, 
Index.Status>> status)
+            {
+                indexStatus = status;
+                return this;
+            }
+
+            Builder expected(EndpointsForRange endpoints)
+            {
+                expected = endpoints;
+                return this;
+            }
+
+            Testcase build()
+            {
+                return new Testcase(this);
+            }
+        }
+    }
+
+    IEndpointSnitch snitch = new AbstractNetworkTopologySnitch()
+    {
+        public String getRack(InetAddressAndPort endpoint)
+        {
+            return "rack";
+        }
+
+        public String getDatacenter(InetAddressAndPort endpoint)
+        {
+            return "DC";
+        }
+    };
+
+    @Test
+    public void shouldPrioritizeSuccessfulEndpoints() throws 
UnknownHostException
+    {
+        runTest(new Testcase.Builder()
+                .keyspace("ks1")
+                .replicationStrategy(
+                        new NetworkTopologyStrategy("ks1", new 
TokenMetadata(), snitch, Map.of("DC", "5"))
+                )
+                .indexStatus(Map.of(
+                        InetAddressAndPort.getByName("127.0.0.251"),
+                        Map.of(
+                                "ks1.idx1", Index.Status.BUILD_SUCCEEDED,
+                                "ks1.idx2", Index.Status.BUILD_SUCCEEDED,
+                                "ks1.idx3", Index.Status.BUILD_SUCCEEDED
+                        ),
+                        InetAddressAndPort.getByName("127.0.0.252"),
+                        Map.of(
+                                "ks1.idx1", Index.Status.BUILD_SUCCEEDED,
+                                "ks1.idx2", Index.Status.BUILD_SUCCEEDED,
+                                "ks1.idx3", Index.Status.BUILD_SUCCEEDED
+                        ),
+                        InetAddressAndPort.getByName("127.0.0.253"),
+                        Map.of(
+                                "ks1.idx1", Index.Status.UNKNOWN,
+                                "ks1.idx2", Index.Status.BUILD_SUCCEEDED,
+                                "ks1.idx3", Index.Status.BUILD_SUCCEEDED
+                        ),
+                        InetAddressAndPort.getByName("127.0.0.254"),
+                        Map.of(
+                                "ks1.idx1", Index.Status.BUILD_SUCCEEDED,
+                                "ks1.idx2", Index.Status.BUILD_SUCCEEDED,
+                                "ks1.idx3", Index.Status.BUILD_SUCCEEDED
+                        ),
+                        InetAddressAndPort.getByName("127.0.0.255"),
+                        Map.of(
+                                "ks1.idx1", Index.Status.BUILD_SUCCEEDED,
+                                "ks1.idx2", Index.Status.BUILD_SUCCEEDED,
+                                "ks1.idx3", Index.Status.UNKNOWN
+                        )
+                ))
+                .numRequired(2)
+                .expected(EndpointsForRange.of(
+                        // successful
+                        full(InetAddressAndPort.getByName("127.0.0.251")),
+                        full(InetAddressAndPort.getByName("127.0.0.252")),
+                        full(InetAddressAndPort.getByName("127.0.0.254")),
+
+                        // queryable, but unknown
+                        full(InetAddressAndPort.getByName("127.0.0.253")),
+                        full(InetAddressAndPort.getByName("127.0.0.255"))
+                ))
+                .build()
+        );
+    }
+
+    @Test
+    public void shouldNotPrioritizeWhenNoSuccessfulEndpoints() throws 
UnknownHostException
+    {
+        runTest(new Testcase.Builder()
+                .keyspace("ks1")
+                .replicationStrategy(
+                        new NetworkTopologyStrategy("ks1", new 
TokenMetadata(), snitch, Map.of("DC", "5"))
+                )
+                .indexStatus(Map.of(
+                        InetAddressAndPort.getByName("127.0.0.251"),
+                        Map.of(
+                                "ks1.idx1", Index.Status.UNKNOWN,
+                                "ks1.idx2", Index.Status.BUILD_SUCCEEDED,
+                                "ks1.idx3", Index.Status.UNKNOWN
+                        ),
+                        InetAddressAndPort.getByName("127.0.0.252"),
+                        Map.of(
+                                "ks1.idx1", Index.Status.BUILD_SUCCEEDED,
+                                "ks1.idx2", Index.Status.UNKNOWN,
+                                "ks1.idx3", Index.Status.BUILD_SUCCEEDED
+                        ),
+                        InetAddressAndPort.getByName("127.0.0.253"),
+                        Map.of(
+                                "ks1.idx1", Index.Status.UNKNOWN,
+                                "ks1.idx2", Index.Status.BUILD_SUCCEEDED,
+                                "ks1.idx3", Index.Status.BUILD_SUCCEEDED
+                        ),
+                        InetAddressAndPort.getByName("127.0.0.254"),
+                        Map.of(
+                                "ks1.idx1", Index.Status.BUILD_SUCCEEDED,
+                                "ks1.idx2", Index.Status.BUILD_SUCCEEDED,
+                                "ks1.idx3", Index.Status.UNKNOWN
+                        ),
+                        InetAddressAndPort.getByName("127.0.0.255"),
+                        Map.of(
+                                "ks1.idx1", Index.Status.UNKNOWN,
+                                "ks1.idx2", Index.Status.UNKNOWN,
+                                "ks1.idx3", Index.Status.UNKNOWN
+                        )
+                ))
+                .numRequired(2)
+                .expected(EndpointsForRange.of(
+                        // unmodified order
+                        full(InetAddressAndPort.getByName("127.0.0.251")),
+                        full(InetAddressAndPort.getByName("127.0.0.252")),
+                        full(InetAddressAndPort.getByName("127.0.0.253")),
+                        full(InetAddressAndPort.getByName("127.0.0.254")),
+                        full(InetAddressAndPort.getByName("127.0.0.255"))
+                ))
+                .build()
+        );
+    }
+
+    @Test
+    public void shouldFilterOutNonQueryableEndpoints() throws 
UnknownHostException
+    {
+        runTest(new Testcase.Builder()
+                .keyspace("ks1")
+                .replicationStrategy(
+                        new NetworkTopologyStrategy("ks1", new 
TokenMetadata(), snitch, Map.of("DC", "5"))
+                )
+                .indexStatus(Map.of(
+                        InetAddressAndPort.getByName("127.0.0.251"),
+                        Map.of(
+                                "ks1.idx1", Index.Status.UNKNOWN,
+                                "ks1.idx2", Index.Status.BUILD_SUCCEEDED,
+                                "ks1.idx3", Index.Status.FULL_REBUILD_STARTED
+                        ),
+                        InetAddressAndPort.getByName("127.0.0.252"),
+                        Map.of(
+                                "ks1.idx1", Index.Status.BUILD_SUCCEEDED,
+                                "ks1.idx2", Index.Status.UNKNOWN,
+                                "ks1.idx3", Index.Status.BUILD_SUCCEEDED
+                        ),
+                        InetAddressAndPort.getByName("127.0.0.253"),
+                        Map.of(
+                                "ks1.idx1", Index.Status.BUILD_SUCCEEDED,
+                                "ks1.idx2", Index.Status.BUILD_SUCCEEDED,
+                                "ks1.idx3", Index.Status.BUILD_SUCCEEDED
+                        ),
+                        InetAddressAndPort.getByName("127.0.0.254"),
+                        Map.of(
+                                "ks1.idx1", Index.Status.BUILD_FAILED,
+                                "ks1.idx2", Index.Status.BUILD_SUCCEEDED,
+                                "ks1.idx3", Index.Status.UNKNOWN
+                        ),
+                        InetAddressAndPort.getByName("127.0.0.255"),
+                        Map.of(
+                                "ks1.idx1", Index.Status.UNKNOWN,
+                                "ks1.idx2", Index.Status.UNKNOWN,
+                                "ks1.idx3", Index.Status.UNKNOWN
+                        )
+                ))
+                .numRequired(1)
+                .expected(EndpointsForRange.of(
+                        // successful
+                        full(InetAddressAndPort.getByName("127.0.0.253")),
+
+                        // queryable, but unknown
+                        full(InetAddressAndPort.getByName("127.0.0.252")),
+                        full(InetAddressAndPort.getByName("127.0.0.255"))
+                ))
+                .build()
+        );
+    }
+
+    @Test
+    public void shouldThrowWhenNotEnoughQueryableEndpoints()
+    {
+        assertThatThrownBy(() ->
+                runTest(new Testcase.Builder()
+                        .keyspace("ks1")
+                        .replicationStrategy(
+                                new NetworkTopologyStrategy("ks1", new 
TokenMetadata(), snitch, Map.of("DC", "5"))
+                        )
+                        .indexStatus(Map.of(
+                                InetAddressAndPort.getByName("127.0.0.251"),
+                                Map.of(
+                                        "ks1.idx1", 
Index.Status.BUILD_SUCCEEDED,
+                                        "ks1.idx2", Index.Status.UNKNOWN,
+                                        "ks1.idx3", 
Index.Status.BUILD_SUCCEEDED
+                                ),
+                                InetAddressAndPort.getByName("127.0.0.252"),
+                                Map.of(
+                                        "ks1.idx1", 
Index.Status.BUILD_SUCCEEDED,
+                                        "ks1.idx2", Index.Status.BUILD_FAILED,
+                                        "ks1.idx3", 
Index.Status.BUILD_SUCCEEDED
+                                ),
+                                InetAddressAndPort.getByName("127.0.0.253"),
+                                Map.of(
+                                        "ks1.idx1", 
Index.Status.BUILD_SUCCEEDED,
+                                        "ks1.idx2", 
Index.Status.BUILD_SUCCEEDED,
+                                        "ks1.idx3", 
Index.Status.BUILD_SUCCEEDED
+                                ),
+                                InetAddressAndPort.getByName("127.0.0.254"),
+                                Map.of(
+                                        "ks1.idx1", 
Index.Status.BUILD_SUCCEEDED,
+                                        "ks1.idx2", 
Index.Status.BUILD_SUCCEEDED,
+                                        "ks1.idx3", Index.Status.BUILD_FAILED
+                                ),
+                                InetAddressAndPort.getByName("127.0.0.255"),
+                                Map.of(
+                                        "ks1.idx1", Index.Status.UNKNOWN,
+                                        "ks1.idx2", 
Index.Status.BUILD_SUCCEEDED,
+                                        "ks1.idx3", 
Index.Status.BUILD_SUCCEEDED
+                                )
+                        ))
+                        .numRequired(4)
+                        .build()))
+                .isInstanceOf(ReadFailureException.class)
+                .hasMessageStartingWith("Operation failed")
+                .hasMessageContaining("INDEX_NOT_AVAILABLE from 
/127.0.0.252:7000")
+                .hasMessageContaining("INDEX_NOT_AVAILABLE from 
/127.0.0.254:7000");
+    }
+
+    @Test
+    public void shouldThrowWhenNoQueryableEndpoints()
+    {
+        assertThatThrownBy(() ->
+                runTest(new Testcase.Builder()
+                        .keyspace("ks1")
+                        .replicationStrategy(
+                                new NetworkTopologyStrategy("ks1", new 
TokenMetadata(), snitch, Map.of("DC", "3"))
+                        )
+                        .indexStatus(Map.of(
+                                InetAddressAndPort.getByName("127.0.0.253"),
+                                Map.of(
+                                        "ks1.idx1", Index.Status.DROPPED,
+                                        "ks1.idx2", 
Index.Status.BUILD_SUCCEEDED,
+                                        "ks1.idx3", 
Index.Status.BUILD_SUCCEEDED
+                                ),
+                                InetAddressAndPort.getByName("127.0.0.254"),
+                                Map.of(
+                                        "ks1.idx1", 
Index.Status.BUILD_SUCCEEDED,
+                                        "ks1.idx2", 
Index.Status.BUILD_SUCCEEDED,
+                                        "ks1.idx3", Index.Status.BUILD_FAILED
+                                ),
+                                InetAddressAndPort.getByName("127.0.0.255"),
+                                Map.of(
+                                        "ks1.idx1", Index.Status.UNKNOWN,
+                                        "ks1.idx2", 
Index.Status.FULL_REBUILD_STARTED,
+                                        "ks1.idx3", 
Index.Status.BUILD_SUCCEEDED
+                                )
+                        ))
+                        .numRequired(1)
+                        .build()))
+                .isInstanceOf(ReadFailureException.class)
+                .hasMessageStartingWith("Operation failed")
+                .hasMessageContaining("INDEX_NOT_AVAILABLE from 
/127.0.0.253:7000")
+                .hasMessageContaining("INDEX_NOT_AVAILABLE from 
/127.0.0.254:7000")
+                .hasMessageContaining("INDEX_NOT_AVAILABLE from 
/127.0.0.255:7000");
+    }
+
+    void runTest(Testcase testcase)
+    {
+        // get all endpoints from indexStatus
+        Set<Replica> replicas = testcase.indexStatus.keySet()
+                .stream()
+                .map(ReplicaUtils::full)
+                .collect(Collectors.toSet());
+
+        // get all indexes from one of the entries of indexStatus
+        Set<Index> indexes = testcase.indexStatus.entrySet().iterator().next()
+                .getValue()
+                .keySet()
+                .stream()
+                .map(identifier -> mockedIndex(identifier.split("\\.")[1]))
+                .collect(Collectors.toSet());
+
+        // send indexStatus for each endpoint
+        testcase.indexStatus.forEach((endpoint, indexStatus) ->
+                IndexStatusManager.instance.receivePeerIndexStatus(
+                        endpoint,
+                        
VersionedValue.unsafeMakeVersionedValue(JsonUtils.writeAsJsonString(indexStatus),
 1)
+                ));
+
+        // sort the replicas here, so that we can assert the order later
+        EndpointsForRange endpoints = EndpointsForRange.copyOf(new 
TreeSet<>(replicas));
+        Keyspace ks = mockedKeyspace(testcase.keyspace, 
testcase.replicationStrategy);
+        Index.QueryPlan qp = mockedQueryPlan(indexes);
+        ConsistencyLevel cl = mockedConsistencyLevel(testcase.numRequired);
+
+        EndpointsForRange actual = 
IndexStatusManager.instance.filterForQuery(endpoints, ks, qp, cl);
+
+        assertArrayEquals(
+                testcase.expected.stream().toArray(),
+                actual.stream().toArray()
+        );
+    }
+
+    Keyspace mockedKeyspace(String name, AbstractReplicationStrategy 
replicationStrategy)
+    {
+        Keyspace mock = Mockito.mock(Keyspace.class);
+        Mockito.when(mock.getName()).thenReturn(name);
+        
Mockito.when(mock.getReplicationStrategy()).thenReturn(replicationStrategy);
+        return mock;
+    }
+
+    Index mockedIndex(String name)
+    {
+        Index mock = Mockito.mock(Index.class);
+
+        Mockito.when(mock.getIndexMetadata())
+                .thenReturn(IndexMetadata.fromSchemaMetadata(name, 
IndexMetadata.Kind.KEYS, null));
+
+        Mockito.when(mock.isQueryable(Mockito.any()))
+                .thenAnswer(invocationOnMock ->
+                {
+                    Index.Status status = invocationOnMock.getArgument(0);
+                    return (status == Index.Status.BUILD_SUCCEEDED || status 
== Index.Status.UNKNOWN);
+                });
+
+        return mock;
+    }
+
+    Index.QueryPlan mockedQueryPlan(Set<Index> indexes)
+    {
+        Index.QueryPlan mock = Mockito.mock(Index.QueryPlan.class);
+        Mockito.when(mock.getIndexes()).thenReturn(indexes);
+        return mock;
+    }
+
+    ConsistencyLevel mockedConsistencyLevel(int required)
+    {
+        ConsistencyLevel mock = Mockito.mock(ConsistencyLevel.class);
+        Mockito.when(mock.blockFor(Mockito.any())).thenReturn(required);
+        return mock;
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to