This is an automated email from the ASF dual-hosted git repository.
fortino pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/jackrabbit-oak.git
The following commit(s) were added to refs/heads/trunk by this push:
new d7e297eb00 OAK-11695: consistently route the same queries to the same
shards (#2271)
d7e297eb00 is described below
commit d7e297eb0030a699da9c4dc2d8809d80a01d1123
Author: Fabrizio Fortino <[email protected]>
AuthorDate: Thu May 8 11:03:00 2025 +0200
OAK-11695: consistently route the same queries to the same shards (#2271)
* OAK-11695: consistently route the same queries to the same shards
* OAK-11695: improve sha256Hash()
* OAK-11695: change explain to return the full search request, add unit test
---
.../query/async/ElasticResultRowAsyncIterator.java | 13 +++++++++---
.../index/elastic/util/ElasticIndexUtils.java | 23 ++++++++++++++++------
.../plugins/index/elastic/ElasticContentTest.java | 18 +++++++++++++++++
3 files changed, 45 insertions(+), 9 deletions(-)
diff --git
a/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/async/ElasticResultRowAsyncIterator.java
b/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/async/ElasticResultRowAsyncIterator.java
index a039d9bde3..452c8368c1 100644
---
a/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/async/ElasticResultRowAsyncIterator.java
+++
b/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/async/ElasticResultRowAsyncIterator.java
@@ -24,6 +24,7 @@ import
org.apache.jackrabbit.oak.plugins.index.elastic.query.ElasticQueryIterato
import
org.apache.jackrabbit.oak.plugins.index.elastic.query.ElasticRequestHandler;
import
org.apache.jackrabbit.oak.plugins.index.elastic.query.ElasticResponseHandler;
import
org.apache.jackrabbit.oak.plugins.index.elastic.query.async.facets.ElasticFacetProvider;
+import org.apache.jackrabbit.oak.plugins.index.elastic.util.ElasticIndexUtils;
import
org.apache.jackrabbit.oak.plugins.index.search.spi.query.FulltextIndex.FulltextResultRow;
import org.apache.jackrabbit.oak.spi.query.QueryIndex;
import org.apache.jackrabbit.oak.spi.query.QueryIndex.IndexPlan;
@@ -41,8 +42,8 @@ import co.elastic.clients.elasticsearch.core.search.Highlight;
import co.elastic.clients.elasticsearch.core.search.Hit;
import co.elastic.clients.elasticsearch.core.search.SourceConfig;
import co.elastic.clients.elasticsearch.core.search.TotalHitsRelation;
-import co.elastic.clients.json.JsonpUtils;
+import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.BitSet;
import java.util.Collections;
@@ -201,7 +202,7 @@ public class ElasticResultRowAsyncIterator implements
ElasticQueryIterator, Elas
*/
@Override
public String explain() {
- return JsonpUtils.toString(elasticQueryScanner.searchRequest, new
StringBuilder()).toString();
+ return elasticQueryScanner.searchRequest.toString();
}
@Override
@@ -220,6 +221,7 @@ public class ElasticResultRowAsyncIterator implements
ElasticQueryIterator, Elas
private final List<SearchHitListener> searchHitListeners = new
ArrayList<>();
private final List<AggregationListener> aggregationListeners = new
ArrayList<>();
+ private final String sessionId;
private final Query query;
private final SearchRequest searchRequest;
private final @NotNull List<SortOptions> sorts;
@@ -242,6 +244,7 @@ public class ElasticResultRowAsyncIterator implements
ElasticQueryIterator, Elas
ElasticQueryScanner(List<ElasticResponseListener> listeners) {
this.query = elasticRequestHandler.baseQuery();
+ this.sessionId = "oak-" +
ElasticIndexUtils.sha256Hash(this.query.toString().getBytes(StandardCharsets.UTF_8));
this.sorts = elasticRequestHandler.baseSorts();
this.highlight = elasticRequestHandler.highlight();
@@ -275,7 +278,9 @@ public class ElasticResultRowAsyncIterator implements
ElasticQueryIterator, Elas
.highlight(highlight)
// use a smaller size when the query contains
aggregations. This improves performance
// when the client is only interested in
insecure facets
- .size(needsAggregations.get() ?
Math.min(SMALL_RESULT_SET_SIZE, getFetchSize(requests)) :
getFetchSize(requests));
+ .size(needsAggregations.get() ?
Math.min(SMALL_RESULT_SET_SIZE, getFetchSize(requests)) :
getFetchSize(requests))
+ // consistently route the same queries to the
same shard copy (primary or replica) within the shard set
+ .preference(sessionId);
if (needsAggregations.get()) {
builder.aggregations(elasticRequestHandler.aggregations());
}
@@ -404,6 +409,8 @@ public class ElasticResultRowAsyncIterator implements
ElasticQueryIterator, Elas
.query(query)
.highlight(highlight)
.size(getFetchSize(requests++))
+ // consistently route the same queries to the same
shard copy (primary or replica) within the shard set
+ .preference(sessionId)
);
LOG.trace("Kicking new search after query {}", searchReq);
diff --git
a/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/util/ElasticIndexUtils.java
b/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/util/ElasticIndexUtils.java
index c57118e9ad..238c72017c 100644
---
a/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/util/ElasticIndexUtils.java
+++
b/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/util/ElasticIndexUtils.java
@@ -16,6 +16,7 @@
*/
package org.apache.jackrabbit.oak.plugins.index.elastic.util;
+import org.apache.jackrabbit.oak.commons.StringUtils;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -106,16 +107,26 @@ public class ElasticIndexUtils {
public static String idFromPath(@NotNull String path) {
byte[] pathBytes = path.getBytes(StandardCharsets.UTF_8);
if (pathBytes.length > 512) {
- try {
- return new
String(MessageDigest.getInstance("SHA-256").digest(pathBytes),
- StandardCharsets.UTF_8);
- } catch (NoSuchAlgorithmException e) {
- throw new IllegalStateException(e);
- }
+ return sha256Hash(pathBytes);
}
return path;
}
+ /**
+ * Computes the SHA-256 hash of the given byte array and returns it as a
UTF-8 string.
+ *
+ * @param input the byte array to hash
+ * @return the SHA-256 hash as a string
+ */
+ public static String sha256Hash(byte[] input) {
+ try {
+ byte[] digest = MessageDigest.getInstance("SHA-256").digest(input);
+ return StringUtils.convertBytesToHex(digest);
+ } catch (NoSuchAlgorithmException e) {
+ throw new IllegalStateException(e);
+ }
+ }
+
/**
* Converts a given byte array (of doubles) to a list of floats
* @param array given byte array
diff --git
a/oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elastic/ElasticContentTest.java
b/oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elastic/ElasticContentTest.java
index 775339aa24..a2440cad33 100644
---
a/oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elastic/ElasticContentTest.java
+++
b/oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elastic/ElasticContentTest.java
@@ -38,6 +38,7 @@ import static org.hamcrest.CoreMatchers.endsWith;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.not;
import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.containsString;
import static org.junit.Assert.assertTrue;
import static org.mockito.AdditionalMatchers.geq;
import static org.mockito.ArgumentMatchers.anyBoolean;
@@ -179,6 +180,23 @@ public class ElasticContentTest extends
ElasticAbstractQueryTest {
assertEventually(() -> assertThat(countDocuments(index), equalTo(1L)));
}
+ @Test
+ public void shardPreference() throws Exception {
+ IndexDefinitionBuilder builder = createIndex("a").noAsync();
+ builder.includedPaths("/content");
+ builder.indexRule("nt:base").property("a").propertyIndex();
+ setIndex(UUID.randomUUID().toString(), builder);
+ root.commit();
+
+ String query = "select [jcr:path] from [nt:base] where [a] = 'text'";
+ String explain = explain(query);
+ assertThat(explain, containsString("preference=oak-"));
+
+ // preference session should be the same across the same query
+ String explain2 = explain(query);
+ assertThat(explain2, equalTo(explain));
+ }
+
@Test
public void indexWithDefaultFetchSizes() throws Exception {
IndexDefinitionBuilder builder = createIndex("a").noAsync();