This is an automated email from the ASF dual-hosted git repository. maedhroz pushed a commit to branch cassandra-5.0 in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/cassandra-5.0 by this push: new ba5ab993c7 Avoid streams in the common case for UpdateTransaction creation ba5ab993c7 is described below commit ba5ab993c7a279b014f439a5d06683d3b568d4ff Author: Caleb Rackliffe <calebrackli...@gmail.com> AuthorDate: Tue Jun 11 16:20:58 2024 -0500 Avoid streams in the common case for UpdateTransaction creation patch by Caleb Rackliffe; reviewed by Jeremiah Jordan and David Capwell for CASSANDRA-19675 --- CHANGES.txt | 1 + .../cassandra/index/SecondaryIndexManager.java | 110 +++++++++++++-------- 2 files changed, 72 insertions(+), 39 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index 1396600720..8c7e925aca 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 5.0-beta2 + * Avoid streams in the common case for UpdateTransaction creation (CASSANDRA-19675) * Only wait until native_transport_timeout for dispatcher to finish (CASSANDRA-19697) * Disable chronicle analytics (CASSANDRA-19656) * Replace Stream iteration with for-loop for StorageProxy::updateCoordinatorWriteLatencyTableMetric (CASSANDRA-19676) diff --git a/src/java/org/apache/cassandra/index/SecondaryIndexManager.java b/src/java/org/apache/cassandra/index/SecondaryIndexManager.java index ebd2cc0379..ba4132654c 100644 --- a/src/java/org/apache/cassandra/index/SecondaryIndexManager.java +++ b/src/java/org/apache/cassandra/index/SecondaryIndexManager.java @@ -1057,16 +1057,20 @@ public class SecondaryIndexManager implements IndexRegistry, INotificationConsum try (UnfilteredRowIterator partition = page.next()) { - Set<Index.Indexer> indexers = indexGroups.values().stream() - .map(g -> g.indexerFor(indexes::contains, - key, - partition.columns(), - nowInSec, - ctx, - IndexTransaction.Type.UPDATE, - null)) - .filter(Objects::nonNull) - .collect(Collectors.toSet()); + Set<Index.Indexer> indexers = new HashSet<>(indexGroups.size()); + + for (Index.Group g : indexGroups.values()) + { + Index.Indexer indexerFor = g.indexerFor(indexes::contains, + key, + partition.columns(), + nowInSec, + ctx, + IndexTransaction.Type.UPDATE, + null); + if (indexerFor != null) + indexers.add(indexerFor); + } // Short-circuit empty partitions if static row is processed or isn't read if (!readStatic && partition.isEmpty() && partition.staticRow().isEmpty()) @@ -1237,11 +1241,14 @@ public class SecondaryIndexManager implements IndexRegistry, INotificationConsum } } - Set<Index.QueryPlan> queryPlans = indexGroups.values() - .stream() - .map(g -> g.queryPlanFor(rowFilter)) - .filter(Objects::nonNull) - .collect(Collectors.toSet()); + Set<Index.QueryPlan> queryPlans = new HashSet<>(indexGroups.size()); + for (Index.Group g : indexGroups.values()) + { + Index.QueryPlan queryPlan = g.queryPlanFor(rowFilter); + + if (queryPlan != null) + queryPlans.add(queryPlan); + } if (queryPlans.isEmpty()) { @@ -1260,32 +1267,48 @@ public class SecondaryIndexManager implements IndexRegistry, INotificationConsum // pay for an additional threadlocal get() rather than build the strings unnecessarily if (Tracing.isTracing()) { + StringJoiner joiner = new StringJoiner(","); + + for (Index.QueryPlan p : queryPlans) + joiner.add(commaSeparated(p.getIndexes()) + ':' + p.getEstimatedResultRows()); + Tracing.trace("Index mean cardinalities are {}. Scanning with {}.", - queryPlans.stream() - .map(p -> commaSeparated(p.getIndexes()) + ':' + p.getEstimatedResultRows()) - .collect(Collectors.joining(",")), - commaSeparated(selected.getIndexes())); + joiner.toString(), commaSeparated(selected.getIndexes())); } + return selected; } private static String commaSeparated(Collection<Index> indexes) { - return indexes.stream().map(i -> i.getIndexMetadata().name).collect(Collectors.joining(",")); + StringJoiner joiner = new StringJoiner(","); + + for (Index i : indexes) + joiner.add(i.getIndexMetadata().name); + + return joiner.toString(); } public Optional<Index> getBestIndexFor(RowFilter.Expression expression) { - return indexes.values().stream().filter((i) -> i.supportsExpression(expression.column(), expression.operator())).findFirst(); + for (Index i : indexes.values()) + { + if (i.supportsExpression(expression.column(), expression.operator())) + { + return Optional.of(i); + } + } + + return Optional.empty(); } public <T extends Index> Optional<T> getBestIndexFor(RowFilter.Expression expression, Class<T> indexType) { - return indexes.values() - .stream() - .filter(i -> indexType.isInstance(i) && i.supportsExpression(expression.column(), expression.operator())) - .map(indexType::cast) - .findFirst(); + for (Index i : indexes.values()) + if (indexType.isInstance(i) && i.supportsExpression(expression.column(), expression.operator())) + return Optional.of(indexType.cast(i)); + + return Optional.empty(); } /** @@ -1384,7 +1407,11 @@ public class SecondaryIndexManager implements IndexRegistry, INotificationConsum public Index.Group getIndexGroup(Index index) { - return indexGroups.values().stream().filter(g -> g.containsIndex(index)).findAny().orElse(null); + for (Index.Group g : indexGroups.values()) + if (g.containsIndex(index)) + return g; + + return null; } /* @@ -1401,18 +1428,23 @@ public class SecondaryIndexManager implements IndexRegistry, INotificationConsum if (!hasIndexes()) return UpdateTransaction.NO_OP; - Index.Indexer[] indexers = listIndexGroups().stream() - .map(g -> g.indexerFor(writableIndexSelector(), - update.partitionKey(), - update.columns(), - nowInSec, - ctx, - IndexTransaction.Type.UPDATE, - memtable)) - .filter(Objects::nonNull) - .toArray(Index.Indexer[]::new); - - return indexers.length == 0 ? UpdateTransaction.NO_OP : new WriteTimeTransaction(indexers); + List<Index.Indexer> indexers = new ArrayList<>(indexGroups.size()); + + for (Index.Group g : indexGroups.values()) + { + Index.Indexer indexer = g.indexerFor(writableIndexSelector(), + update.partitionKey(), + update.columns(), + nowInSec, + ctx, + IndexTransaction.Type.UPDATE, + memtable); + if (indexer != null) + indexers.add(indexer); + } + + return indexers.isEmpty() ? UpdateTransaction.NO_OP + : new WriteTimeTransaction(indexers.toArray(Index.Indexer[]::new)); } private Predicate<Index> writableIndexSelector() --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org