This is an automated email from the ASF dual-hosted git repository.

maedhroz pushed a commit to branch cassandra-5.0
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/cassandra-5.0 by this push:
     new ba5ab993c7 Avoid streams in the common case for UpdateTransaction 
creation
ba5ab993c7 is described below

commit ba5ab993c7a279b014f439a5d06683d3b568d4ff
Author: Caleb Rackliffe <calebrackli...@gmail.com>
AuthorDate: Tue Jun 11 16:20:58 2024 -0500

    Avoid streams in the common case for UpdateTransaction creation
    
    patch by Caleb Rackliffe; reviewed by Jeremiah Jordan and David Capwell for 
CASSANDRA-19675
---
 CHANGES.txt                                        |   1 +
 .../cassandra/index/SecondaryIndexManager.java     | 110 +++++++++++++--------
 2 files changed, 72 insertions(+), 39 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 1396600720..8c7e925aca 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 5.0-beta2
+ * Avoid streams in the common case for UpdateTransaction creation 
(CASSANDRA-19675)
  * Only wait until native_transport_timeout for dispatcher to finish 
(CASSANDRA-19697)
  * Disable chronicle analytics (CASSANDRA-19656)
  * Replace Stream iteration with for-loop for 
StorageProxy::updateCoordinatorWriteLatencyTableMetric (CASSANDRA-19676)
diff --git a/src/java/org/apache/cassandra/index/SecondaryIndexManager.java 
b/src/java/org/apache/cassandra/index/SecondaryIndexManager.java
index ebd2cc0379..ba4132654c 100644
--- a/src/java/org/apache/cassandra/index/SecondaryIndexManager.java
+++ b/src/java/org/apache/cassandra/index/SecondaryIndexManager.java
@@ -1057,16 +1057,20 @@ public class SecondaryIndexManager implements 
IndexRegistry, INotificationConsum
 
                     try (UnfilteredRowIterator partition = page.next())
                     {
-                        Set<Index.Indexer> indexers = 
indexGroups.values().stream()
-                                                             .map(g -> 
g.indexerFor(indexes::contains,
-                                                                               
     key,
-                                                                               
     partition.columns(),
-                                                                               
     nowInSec,
-                                                                               
     ctx,
-                                                                               
     IndexTransaction.Type.UPDATE,
-                                                                               
     null))
-                                                             
.filter(Objects::nonNull)
-                                                             
.collect(Collectors.toSet());
+                        Set<Index.Indexer> indexers = new 
HashSet<>(indexGroups.size());
+
+                        for (Index.Group g : indexGroups.values())
+                        {
+                            Index.Indexer indexerFor = 
g.indexerFor(indexes::contains,
+                                                                    key,
+                                                                    
partition.columns(),
+                                                                    nowInSec,
+                                                                    ctx,
+                                                                    
IndexTransaction.Type.UPDATE,
+                                                                    null);
+                            if (indexerFor != null)
+                                indexers.add(indexerFor);
+                        }
 
                         // Short-circuit empty partitions if static row is 
processed or isn't read
                         if (!readStatic && partition.isEmpty() && 
partition.staticRow().isEmpty())
@@ -1237,11 +1241,14 @@ public class SecondaryIndexManager implements 
IndexRegistry, INotificationConsum
             }
         }
 
-        Set<Index.QueryPlan> queryPlans = indexGroups.values()
-                                                     .stream()
-                                                     .map(g -> 
g.queryPlanFor(rowFilter))
-                                                     .filter(Objects::nonNull)
-                                                     
.collect(Collectors.toSet());
+        Set<Index.QueryPlan> queryPlans = new HashSet<>(indexGroups.size());
+        for (Index.Group g : indexGroups.values())
+        {
+            Index.QueryPlan queryPlan = g.queryPlanFor(rowFilter);
+
+            if (queryPlan != null)
+                queryPlans.add(queryPlan);
+        }
 
         if (queryPlans.isEmpty())
         {
@@ -1260,32 +1267,48 @@ public class SecondaryIndexManager implements 
IndexRegistry, INotificationConsum
         // pay for an additional threadlocal get() rather than build the 
strings unnecessarily
         if (Tracing.isTracing())
         {
+            StringJoiner joiner = new StringJoiner(",");
+
+            for (Index.QueryPlan p : queryPlans)
+                joiner.add(commaSeparated(p.getIndexes()) + ':' + 
p.getEstimatedResultRows());
+
             Tracing.trace("Index mean cardinalities are {}. Scanning with {}.",
-                          queryPlans.stream()
-                                    .map(p -> commaSeparated(p.getIndexes()) + 
':' + p.getEstimatedResultRows())
-                                    .collect(Collectors.joining(",")),
-                          commaSeparated(selected.getIndexes()));
+                          joiner.toString(), 
commaSeparated(selected.getIndexes()));
         }
+
         return selected;
     }
 
     private static String commaSeparated(Collection<Index> indexes)
     {
-        return indexes.stream().map(i -> 
i.getIndexMetadata().name).collect(Collectors.joining(","));
+        StringJoiner joiner = new StringJoiner(",");
+
+        for (Index i : indexes)
+            joiner.add(i.getIndexMetadata().name);
+
+        return joiner.toString();
     }
 
     public Optional<Index> getBestIndexFor(RowFilter.Expression expression)
     {
-        return indexes.values().stream().filter((i) -> 
i.supportsExpression(expression.column(), expression.operator())).findFirst();
+        for (Index i : indexes.values())
+        {
+            if (i.supportsExpression(expression.column(), 
expression.operator()))
+            {
+                return Optional.of(i);
+            }
+        }
+
+        return Optional.empty();
     }
 
     public <T extends Index> Optional<T> getBestIndexFor(RowFilter.Expression 
expression, Class<T> indexType)
     {
-        return indexes.values()
-                      .stream()
-                      .filter(i -> indexType.isInstance(i) && 
i.supportsExpression(expression.column(), expression.operator()))
-                      .map(indexType::cast)
-                      .findFirst();
+        for (Index i : indexes.values())
+            if (indexType.isInstance(i) && 
i.supportsExpression(expression.column(), expression.operator()))
+                return Optional.of(indexType.cast(i));
+
+        return Optional.empty();
     }
 
     /**
@@ -1384,7 +1407,11 @@ public class SecondaryIndexManager implements 
IndexRegistry, INotificationConsum
 
     public Index.Group getIndexGroup(Index index)
     {
-        return indexGroups.values().stream().filter(g -> 
g.containsIndex(index)).findAny().orElse(null);
+        for (Index.Group g : indexGroups.values())
+            if (g.containsIndex(index))
+                return g;
+
+        return null;
     }
 
     /*
@@ -1401,18 +1428,23 @@ public class SecondaryIndexManager implements 
IndexRegistry, INotificationConsum
         if (!hasIndexes())
             return UpdateTransaction.NO_OP;
 
-        Index.Indexer[] indexers = listIndexGroups().stream()
-                                          .map(g -> 
g.indexerFor(writableIndexSelector(),
-                                                                 
update.partitionKey(),
-                                                                 
update.columns(),
-                                                                 nowInSec,
-                                                                 ctx,
-                                                                 
IndexTransaction.Type.UPDATE,
-                                                                 memtable))
-                                          .filter(Objects::nonNull)
-                                          .toArray(Index.Indexer[]::new);
-
-        return indexers.length == 0 ? UpdateTransaction.NO_OP : new 
WriteTimeTransaction(indexers);
+        List<Index.Indexer> indexers = new ArrayList<>(indexGroups.size());
+
+        for (Index.Group g : indexGroups.values())
+        {
+            Index.Indexer indexer = g.indexerFor(writableIndexSelector(),
+                                                 update.partitionKey(),
+                                                 update.columns(),
+                                                 nowInSec,
+                                                 ctx,
+                                                 IndexTransaction.Type.UPDATE,
+                                                 memtable);
+            if (indexer != null)
+                indexers.add(indexer);
+        }
+
+        return indexers.isEmpty() ? UpdateTransaction.NO_OP
+                                  : new 
WriteTimeTransaction(indexers.toArray(Index.Indexer[]::new));
     }
 
     private Predicate<Index> writableIndexSelector()


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to