nfsantos commented on code in PR #1804:
URL: https://github.com/apache/jackrabbit-oak/pull/1804#discussion_r1822557661
##########
oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/ElasticIndexDefinition.java:
##########
@@ -135,6 +141,10 @@ public class ElasticIndexDefinition extends
IndexDefinition {
private static final String SIMILARITY_TAGS_BOOST = "similarityTagsBoost";
private static final float SIMILARITY_TAGS_BOOST_DEFAULT = 0.5f;
+ protected static final String INFERENCE_CONFIG = "inference";
+
+ public static final String INFERENCE = ":inference";
Review Comment:
nitpick: this field should be above, together with the remaining `public
static final` fields.
##########
oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticBulkProcessorHandler.java:
##########
@@ -179,10 +179,34 @@ private void checkFailures() throws IOException {
}
}
- public void update(String id, ElasticDocument document) throws IOException
{
+ /**
+ * Indexes a document in the bulk processor. The document is identified by
the given id. If the document already exists it will be replaced by the new one.
+ * @param id the document id
+ * @param document the document to index
+ * @throws IOException if an error happened while processing the bulk
request
+ */
+ public void index(String id, ElasticDocument document) throws IOException {
add(BulkOperation.of(op -> op.index(idx ->
idx.index(indexName).id(id).document(document))), id);
}
+ public void update(String id, ElasticDocument document) throws IOException
{
+ add(BulkOperation.of(op ->
+ op.update(uf -> uf.index(indexName).id(id).action(uaf ->
uaf.doc(document).docAsUpsert(true)))
+ ), id);
+ // when updating a document we need to remove the properties that are
not present in the new document
+ // to do so we need to keep track of the properties that are present
in the document before the update
+ // and add a specific script bulk operation to remove them
+ // Document and script updates cannot be done in the same bulk
operation
+ if (!document.getPropertiesToRemove().isEmpty()) {
+ String script = document.getPropertiesToRemove().stream()
+ .map(p -> "ctx._source.remove('" + p + "')")
+ .collect(Collectors.joining(";"));
+ add(BulkOperation.of(op ->
+ op.update(uf -> uf.index(indexName).id(id).action(uaf ->
uaf.script(s -> s.source(script))))
+ ), id);
+ }
+ }
Review Comment:
This update is broken up in two operations, when it logically is just one
and should be atomic. Could this cause issues if there are other processes
updating the same document? For instance, after the first operation is applied,
some other process adds properties to the document, which would then be
incorrectly removed by the second part of the method?
##########
oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/ElasticIndexDefinition.java:
##########
@@ -381,4 +408,238 @@ protected IndexDefinition createInstance(NodeState
indexDefnStateToUse) {
return new ElasticIndexDefinition(root, indexDefnStateToUse,
indexPath, indexPrefix);
}
}
+
+ /**
+ * Represents the inference configuration for an Elasticsearch index
definition.
+ * This class holds the properties and queries used for inference.
+ */
+ public static class InferenceDefinition {
+
+ /**
+ * List of properties used for inference.
+ */
+ public List<Property> properties;
+
+ /**
+ * List of queries used for inference.
+ */
+ public List<Query> queries;
+
+ public InferenceDefinition() { }
+
+ /**
+ * Constructs an InferenceDefinition from a given NodeState.
+ *
+ * @param inferenceNode the NodeState containing the inference
configuration
+ */
+ public InferenceDefinition(NodeState inferenceNode) {
+ if (inferenceNode.hasChildNode("properties")) {
+ this.properties =
StreamSupport.stream(inferenceNode.getChildNode("properties").getChildNodeEntries().spliterator(),
false)
+ .map(cne -> new Property(cne.getName(),
cne.getNodeState()))
+ .collect(Collectors.toList());
+ }
+ if (inferenceNode.hasChildNode("queries")) {
+ this.queries =
StreamSupport.stream(inferenceNode.getChildNode("queries").getChildNodeEntries().spliterator(),
false)
Review Comment:
`CollectionUtils.toStream`... ?
##########
oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/ElasticIndexDefinition.java:
##########
@@ -381,4 +408,238 @@ protected IndexDefinition createInstance(NodeState
indexDefnStateToUse) {
return new ElasticIndexDefinition(root, indexDefnStateToUse,
indexPath, indexPrefix);
}
}
+
+ /**
+ * Represents the inference configuration for an Elasticsearch index
definition.
+ * This class holds the properties and queries used for inference.
+ */
+ public static class InferenceDefinition {
+
+ /**
+ * List of properties used for inference.
+ */
+ public List<Property> properties;
+
+ /**
+ * List of queries used for inference.
+ */
+ public List<Query> queries;
+
+ public InferenceDefinition() { }
+
+ /**
+ * Constructs an InferenceDefinition from a given NodeState.
+ *
+ * @param inferenceNode the NodeState containing the inference
configuration
+ */
+ public InferenceDefinition(NodeState inferenceNode) {
+ if (inferenceNode.hasChildNode("properties")) {
+ this.properties =
StreamSupport.stream(inferenceNode.getChildNode("properties").getChildNodeEntries().spliterator(),
false)
Review Comment:
Consider using the `toStream()` methods in CollectionUtils, they were added
recently.
https://github.com/apache/jackrabbit-oak/blob/a7057be4cdc7a673c9f098ca3de8d67e6ed10a23/oak-commons/src/main/java/org/apache/jackrabbit/oak/commons/collections/CollectionUtils.java#L304
https://github.com/apache/jackrabbit-oak/blob/a7057be4cdc7a673c9f098ca3de8d67e6ed10a23/oak-commons/src/main/java/org/apache/jackrabbit/oak/commons/collections/CollectionUtils.java#L320
##########
oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/ElasticIndexDefinition.java:
##########
@@ -381,4 +408,238 @@ protected IndexDefinition createInstance(NodeState
indexDefnStateToUse) {
return new ElasticIndexDefinition(root, indexDefnStateToUse,
indexPath, indexPrefix);
}
}
+
+ /**
+ * Represents the inference configuration for an Elasticsearch index
definition.
+ * This class holds the properties and queries used for inference.
+ */
+ public static class InferenceDefinition {
+
+ /**
+ * List of properties used for inference.
+ */
+ public List<Property> properties;
+
+ /**
+ * List of queries used for inference.
+ */
+ public List<Query> queries;
+
+ public InferenceDefinition() { }
+
+ /**
+ * Constructs an InferenceDefinition from a given NodeState.
+ *
+ * @param inferenceNode the NodeState containing the inference
configuration
+ */
+ public InferenceDefinition(NodeState inferenceNode) {
+ if (inferenceNode.hasChildNode("properties")) {
+ this.properties =
StreamSupport.stream(inferenceNode.getChildNode("properties").getChildNodeEntries().spliterator(),
false)
+ .map(cne -> new Property(cne.getName(),
cne.getNodeState()))
+ .collect(Collectors.toList());
+ }
+ if (inferenceNode.hasChildNode("queries")) {
+ this.queries =
StreamSupport.stream(inferenceNode.getChildNode("queries").getChildNodeEntries().spliterator(),
false)
+ .map(cne -> new Query(cne.getName(),
cne.getNodeState()))
+ .collect(Collectors.toList());
+ }
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ InferenceDefinition that = (InferenceDefinition) o;
+ return Objects.equals(properties, that.properties) &&
Objects.equals(queries, that.queries);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(properties, queries);
+ }
+
+ /**
+ * Represents a property used for inference.
+ */
+ public static class Property {
+ /**
+ * The name of the property. It will be prefixed with {@link
ElasticIndexDefinition#INFERENCE}. when stored in the index.
+ */
+ public String name;
+ /**
+ * The model used for inference. Default is "semantic". This will
be used by the inference service to determine the model to use.
+ */
+ public String model;
+ /**
+ * The fields used for inference. These fields will be used to
generate the vector for the inference.
+ */
+ public List<String> fields;
+ /**
+ * The number of dimensions for the vector generated for the
inference.
+ */
+ public int dims;
+ /**
+ * The similarity function used for the inference. Default is
"cosine".
+ */
+ public String similarity;
+
+ public Property() {}
+
+ /**
+ * Constructs a Property from a given NodeState.
+ *
+ * @param name the name of the property
+ * @param inferenceNode the NodeState containing the property
configuration
+ */
+ public Property(String name, NodeState inferenceNode) {
+ this.name = ElasticIndexDefinition.INFERENCE + "." + name;
+ this.model = getOptionalValue(inferenceNode, "model",
"semantic");
+ this.fields = Arrays.asList(getOptionalValues(inferenceNode,
"fields", Type.STRINGS, String.class, new String[0]));
+ this.dims = getOptionalValue(inferenceNode, "dims", 1024);
+ this.similarity = getOptionalValue(inferenceNode,
"similarity", "cosine");
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ Property property = (Property) o;
+ return dims == property.dims && Objects.equals(name,
property.name) &&
+ Objects.equals(model, property.model) &&
+ Objects.equals(fields, property.fields) &&
+ Objects.equals(similarity, property.similarity);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(name, model, fields, dims, similarity);
+ }
+ }
+
+ /**
+ * Represents a query used for inference.
+ */
+ public static class Query {
+ /**
+ * The name of the query.
+ */
+ public String name;
+ /**
+ * The service URL used for the query.
+ */
+ public String serviceUrl;
+ /**
+ * The model used for the query. Default is "semantic". It needs
to match with one of the models used for the properties.
+ */
+ public String model;
+ /**
+ * The prefix used for the query. If the input string starts with
this prefix, the query will be executed. Default is null (no prefix).
+ */
+ public String prefix;
+ /**
+ * The minimum number of terms required for the query to be
executed. Default is 2.
+ */
+ public int minTerms;
+ /**
+ * The number of candidates to be returned by the query. Default
is 100.
+ */
+ public int numCandidates;
+ /**
+ * The type of the query. Default is "hybrid". Currently not used
+ */
+ public String type; // this can be hybrid or vector
+ /**
+ * The similarity threshold used for the query. Default is 0.5.
+ */
+ public float similarityThreshold;
+ /**
+ * The timeout for the query in milliseconds. Default is 5000.
+ */
+ public long timeout;
+
+ public Query() {}
Review Comment:
Same comment as for the constructor of the class Properties.
##########
oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/ElasticIndexDefinition.java:
##########
@@ -381,4 +408,238 @@ protected IndexDefinition createInstance(NodeState
indexDefnStateToUse) {
return new ElasticIndexDefinition(root, indexDefnStateToUse,
indexPath, indexPrefix);
}
}
+
+ /**
+ * Represents the inference configuration for an Elasticsearch index
definition.
+ * This class holds the properties and queries used for inference.
+ */
+ public static class InferenceDefinition {
+
+ /**
+ * List of properties used for inference.
+ */
+ public List<Property> properties;
+
+ /**
+ * List of queries used for inference.
+ */
+ public List<Query> queries;
+
+ public InferenceDefinition() { }
+
+ /**
+ * Constructs an InferenceDefinition from a given NodeState.
+ *
+ * @param inferenceNode the NodeState containing the inference
configuration
+ */
+ public InferenceDefinition(NodeState inferenceNode) {
+ if (inferenceNode.hasChildNode("properties")) {
+ this.properties =
StreamSupport.stream(inferenceNode.getChildNode("properties").getChildNodeEntries().spliterator(),
false)
+ .map(cne -> new Property(cne.getName(),
cne.getNodeState()))
+ .collect(Collectors.toList());
+ }
+ if (inferenceNode.hasChildNode("queries")) {
+ this.queries =
StreamSupport.stream(inferenceNode.getChildNode("queries").getChildNodeEntries().spliterator(),
false)
+ .map(cne -> new Query(cne.getName(),
cne.getNodeState()))
+ .collect(Collectors.toList());
+ }
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ InferenceDefinition that = (InferenceDefinition) o;
+ return Objects.equals(properties, that.properties) &&
Objects.equals(queries, that.queries);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(properties, queries);
+ }
+
+ /**
+ * Represents a property used for inference.
+ */
+ public static class Property {
+ /**
+ * The name of the property. It will be prefixed with {@link
ElasticIndexDefinition#INFERENCE}. when stored in the index.
Review Comment:
Is the `.` part of the prefix? Initially I thought the `.` before `when` was
a typo in the sentence. Maybe put `{@link ElasticIndexDefinition#INFERENCE}.`
in double quotes.
##########
oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/inference/InferenceService.java:
##########
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.index.elastic.query.inference;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import java.io.BufferedReader;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.net.HttpURLConnection;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * EXPERIMENTAL: A service that sends text to an inference service and
receives embeddings in return.
+ * The embeddings are cached to avoid repeated calls to the inference service.
+ */
+public class InferenceService {
+
+ private final URL url;
+ private final Cache<String, List<Float>> cache;
+
+ public InferenceService(String url, int cacheSize) {
+ try {
+ this.url = new URL(url);
+ } catch (MalformedURLException e) {
+ throw new IllegalArgumentException("Invalid URL: " + url, e);
+ }
+ this.cache = new Cache<>(cacheSize);
+ }
+
+ public List<Float> embeddings(String text, int timeoutMillis) {
+ if (cache.containsKey(text)) {
+ return cache.get(text);
+ }
+
+ HttpURLConnection connection;
+ try {
+ // Create a connection.
+ connection = (HttpURLConnection) url.openConnection();
Review Comment:
Missing a finally block closing the connection (I think the method is
`connection.disconnect()`).
And this is a real blast from the past! :) It's not important for now, as
the code is already done, but I would suggest in these cases to use the Java 11
HTTP Client.
##########
oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/ElasticIndexDefinition.java:
##########
@@ -381,4 +408,238 @@ protected IndexDefinition createInstance(NodeState
indexDefnStateToUse) {
return new ElasticIndexDefinition(root, indexDefnStateToUse,
indexPath, indexPrefix);
}
}
+
+ /**
+ * Represents the inference configuration for an Elasticsearch index
definition.
+ * This class holds the properties and queries used for inference.
+ */
+ public static class InferenceDefinition {
+
+ /**
+ * List of properties used for inference.
+ */
+ public List<Property> properties;
+
+ /**
+ * List of queries used for inference.
+ */
+ public List<Query> queries;
+
+ public InferenceDefinition() { }
+
+ /**
+ * Constructs an InferenceDefinition from a given NodeState.
+ *
+ * @param inferenceNode the NodeState containing the inference
configuration
+ */
+ public InferenceDefinition(NodeState inferenceNode) {
+ if (inferenceNode.hasChildNode("properties")) {
+ this.properties =
StreamSupport.stream(inferenceNode.getChildNode("properties").getChildNodeEntries().spliterator(),
false)
+ .map(cne -> new Property(cne.getName(),
cne.getNodeState()))
+ .collect(Collectors.toList());
+ }
+ if (inferenceNode.hasChildNode("queries")) {
+ this.queries =
StreamSupport.stream(inferenceNode.getChildNode("queries").getChildNodeEntries().spliterator(),
false)
+ .map(cne -> new Query(cne.getName(),
cne.getNodeState()))
+ .collect(Collectors.toList());
+ }
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ InferenceDefinition that = (InferenceDefinition) o;
+ return Objects.equals(properties, that.properties) &&
Objects.equals(queries, that.queries);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(properties, queries);
+ }
+
+ /**
+ * Represents a property used for inference.
+ */
+ public static class Property {
+ /**
+ * The name of the property. It will be prefixed with {@link
ElasticIndexDefinition#INFERENCE}. when stored in the index.
+ */
+ public String name;
+ /**
+ * The model used for inference. Default is "semantic". This will
be used by the inference service to determine the model to use.
+ */
+ public String model;
+ /**
+ * The fields used for inference. These fields will be used to
generate the vector for the inference.
+ */
+ public List<String> fields;
+ /**
+ * The number of dimensions for the vector generated for the
inference.
+ */
+ public int dims;
+ /**
+ * The similarity function used for the inference. Default is
"cosine".
+ */
+ public String similarity;
+
+ public Property() {}
Review Comment:
If this constructor is used to create an instance, the fields will be left
set to null. In this case the comments above will be wrong, as the values will
not match the defaults mentioned in the comments. Maybe the class should not
have a no-args constructor.
##########
oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/ElasticRequestHandler.java:
##########
@@ -578,10 +588,72 @@ private boolean visitTerm(String propertyName, String
text, String boost, boolea
return true;
}
});
-
+
return Query.of(q -> q.bool(result.get()));
}
+ private ObjectBuilder<BoolQuery> inference(BoolQuery.Builder b, String
propertyName, String text, PlanResult pr, boolean dbEnabled) {
+ ElasticIndexDefinition.InferenceDefinition.Query q = null;
+ // select first query eligible for the given text
+ // TODO: evaluate if/how to handle multiple queries
+ String queryText = text;
+ for (ElasticIndexDefinition.InferenceDefinition.Query query :
elasticIndexDefinition.inferenceDefinition.queries) {
+ if (query.isEligibleForInput(queryText)) {
+ queryText = query.rewrite(queryText);
+ if (query.hasMinTerms(queryText)) {
+ q = query;
+ break;
+ }
+ }
+ }
+
+ QueryStringQuery.Builder qsqBuilder = fullTextQuery(queryText,
getElasticFieldName(propertyName), pr, dbEnabled);
+
+ // the query can be null if no inference query is eligible for the
given text or the min terms are not met
+ // in this case, we fall back to the default full-text query
+ if (q != null) {
+ LOG.info("Using inference query: {}", q);
+ try {
+ // let's retrieve the fields with the same model as the query
+ final ElasticIndexDefinition.InferenceDefinition.Query query =
q;
+ List<ElasticIndexDefinition.InferenceDefinition.Property>
properties = elasticIndexDefinition.inferenceDefinition.properties.stream()
+ .filter(pd -> pd.model.equals(query.model))
+ .collect(Collectors.toList());
+ if (!properties.isEmpty()) {
+ InferenceService inferenceService =
InferenceServiceManager.getInstance(q.serviceUrl, q.model);
+ List<Float> embeddings =
inferenceService.embeddings(queryText, (int) q.timeout);
+ if (embeddings != null) {
+ for
(ElasticIndexDefinition.InferenceDefinition.Property p : properties) {
+ //
https://www.elastic.co/guide/en/elasticsearch/reference/current/query-dsl-knn-query.html
+ KnnQuery.Builder knnQueryBuilder = new
KnnQuery.Builder();
+ knnQueryBuilder.field(p.name + ".value");
+ knnQueryBuilder.numCandidates(q.numCandidates);
+ knnQueryBuilder.queryVector(embeddings);
+ knnQueryBuilder.similarity(q.similarityThreshold);
+ b.should(s -> s.knn(knnQueryBuilder.build()));
+ }
+ int tokens = text.split(" ").length;
Review Comment:
Split on `\s+`?
##########
oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/ElasticIndexDefinition.java:
##########
@@ -381,4 +408,238 @@ protected IndexDefinition createInstance(NodeState
indexDefnStateToUse) {
return new ElasticIndexDefinition(root, indexDefnStateToUse,
indexPath, indexPrefix);
}
}
+
+ /**
+ * Represents the inference configuration for an Elasticsearch index
definition.
+ * This class holds the properties and queries used for inference.
+ */
+ public static class InferenceDefinition {
+
+ /**
+ * List of properties used for inference.
+ */
+ public List<Property> properties;
+
+ /**
+ * List of queries used for inference.
+ */
+ public List<Query> queries;
+
+ public InferenceDefinition() { }
+
+ /**
+ * Constructs an InferenceDefinition from a given NodeState.
+ *
+ * @param inferenceNode the NodeState containing the inference
configuration
+ */
+ public InferenceDefinition(NodeState inferenceNode) {
+ if (inferenceNode.hasChildNode("properties")) {
+ this.properties =
StreamSupport.stream(inferenceNode.getChildNode("properties").getChildNodeEntries().spliterator(),
false)
+ .map(cne -> new Property(cne.getName(),
cne.getNodeState()))
+ .collect(Collectors.toList());
+ }
+ if (inferenceNode.hasChildNode("queries")) {
+ this.queries =
StreamSupport.stream(inferenceNode.getChildNode("queries").getChildNodeEntries().spliterator(),
false)
+ .map(cne -> new Query(cne.getName(),
cne.getNodeState()))
+ .collect(Collectors.toList());
+ }
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ InferenceDefinition that = (InferenceDefinition) o;
+ return Objects.equals(properties, that.properties) &&
Objects.equals(queries, that.queries);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(properties, queries);
+ }
+
+ /**
+ * Represents a property used for inference.
+ */
+ public static class Property {
+ /**
+ * The name of the property. It will be prefixed with {@link
ElasticIndexDefinition#INFERENCE}. when stored in the index.
+ */
+ public String name;
+ /**
+ * The model used for inference. Default is "semantic". This will
be used by the inference service to determine the model to use.
+ */
+ public String model;
+ /**
+ * The fields used for inference. These fields will be used to
generate the vector for the inference.
+ */
+ public List<String> fields;
+ /**
+ * The number of dimensions for the vector generated for the
inference.
+ */
+ public int dims;
+ /**
+ * The similarity function used for the inference. Default is
"cosine".
+ */
+ public String similarity;
+
+ public Property() {}
+
+ /**
+ * Constructs a Property from a given NodeState.
+ *
+ * @param name the name of the property
+ * @param inferenceNode the NodeState containing the property
configuration
+ */
+ public Property(String name, NodeState inferenceNode) {
+ this.name = ElasticIndexDefinition.INFERENCE + "." + name;
+ this.model = getOptionalValue(inferenceNode, "model",
"semantic");
+ this.fields = Arrays.asList(getOptionalValues(inferenceNode,
"fields", Type.STRINGS, String.class, new String[0]));
+ this.dims = getOptionalValue(inferenceNode, "dims", 1024);
+ this.similarity = getOptionalValue(inferenceNode,
"similarity", "cosine");
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ Property property = (Property) o;
+ return dims == property.dims && Objects.equals(name,
property.name) &&
+ Objects.equals(model, property.model) &&
+ Objects.equals(fields, property.fields) &&
+ Objects.equals(similarity, property.similarity);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(name, model, fields, dims, similarity);
+ }
+ }
+
+ /**
+ * Represents a query used for inference.
+ */
+ public static class Query {
+ /**
+ * The name of the query.
+ */
+ public String name;
+ /**
+ * The service URL used for the query.
+ */
+ public String serviceUrl;
+ /**
+ * The model used for the query. Default is "semantic". It needs
to match with one of the models used for the properties.
+ */
+ public String model;
+ /**
+ * The prefix used for the query. If the input string starts with
this prefix, the query will be executed. Default is null (no prefix).
+ */
+ public String prefix;
+ /**
+ * The minimum number of terms required for the query to be
executed. Default is 2.
+ */
+ public int minTerms;
+ /**
+ * The number of candidates to be returned by the query. Default
is 100.
+ */
+ public int numCandidates;
+ /**
+ * The type of the query. Default is "hybrid". Currently not used
+ */
+ public String type; // this can be hybrid or vector
+ /**
+ * The similarity threshold used for the query. Default is 0.5.
+ */
+ public float similarityThreshold;
+ /**
+ * The timeout for the query in milliseconds. Default is 5000.
+ */
+ public long timeout;
+
+ public Query() {}
+
+ /**
+ * Constructs a Query from a given NodeState.
+ *
+ * @param name the name of the query
+ * @param queryNode the NodeState containing the query
configuration
+ */
+ public Query(String name, NodeState queryNode) {
+ this.name = name;
+ this.serviceUrl = getOptionalValue(queryNode, "serviceUrl",
null);
+ this.model = getOptionalValue(queryNode, "model", "semantic");
+ this.prefix = getOptionalValue(queryNode, "prefix", null);
+ this.minTerms = getOptionalValue(queryNode, "minTerms", 2);
+ this.numCandidates = getOptionalValue(queryNode,
"numCandidates", 100);
+ this.type = getOptionalValue(queryNode, "type", "hybrid");
+ this.similarityThreshold = getOptionalValue(queryNode,
"similarityThreshold", 0.5f);
+ this.timeout = getOptionalValue(queryNode, "timeout", 5000L);
+ }
+
+ /**
+ * Returns {@code true} if the input string is eligible for this
query.
+ * @param input the input string
+ * @return {@code true} if the input string is eligible for this
query
+ */
+ public boolean isEligibleForInput(String input) {
+ return prefix == null || input.startsWith(prefix);
+ }
+
+ /**
+ * Rewrites the input string by removing the prefix.
+ *
+ * @param input the input string
+ * @return the rewritten input string
+ */
+ public String rewrite(String input) {
+ return prefix == null ? input :
input.substring(prefix.length());
+ }
+
+ /**
+ * Checks if the input string has the minimum number of terms
required for this query.
+ *
+ * @param input the input string
+ * @return true if the input string has the minimum number of terms
+ */
+ public boolean hasMinTerms(String input) {
+ return minTerms <= input.split(" ").length;
Review Comment:
Can the input string have whitespace separators other than a single space?
Like tabs, newlines or multiple whitespaces? Maybe use `\s+` to break up the
string?
##########
oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/inference/InferenceServiceManager.java:
##########
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.index.elastic.query.inference;
+
+import org.jetbrains.annotations.NotNull;
+
+import java.util.concurrent.ConcurrentHashMap;
+
+public class InferenceServiceManager {
+
+ private static final String INFERENCE_CACHE_SIZE =
"oak.inference.cache.size";
Review Comment:
When reading a configuration property, I like to always log the value. This
makes it much easier to debug issues. See classes ConfiguHelper or
SystemPropertySupplier.
##########
oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/ElasticRequestHandler.java:
##########
@@ -578,10 +588,72 @@ private boolean visitTerm(String propertyName, String
text, String boost, boolea
return true;
}
});
-
+
return Query.of(q -> q.bool(result.get()));
}
+ private ObjectBuilder<BoolQuery> inference(BoolQuery.Builder b, String
propertyName, String text, PlanResult pr, boolean dbEnabled) {
+ ElasticIndexDefinition.InferenceDefinition.Query q = null;
+ // select first query eligible for the given text
+ // TODO: evaluate if/how to handle multiple queries
+ String queryText = text;
+ for (ElasticIndexDefinition.InferenceDefinition.Query query :
elasticIndexDefinition.inferenceDefinition.queries) {
+ if (query.isEligibleForInput(queryText)) {
+ queryText = query.rewrite(queryText);
+ if (query.hasMinTerms(queryText)) {
+ q = query;
+ break;
+ }
+ }
+ }
+
+ QueryStringQuery.Builder qsqBuilder = fullTextQuery(queryText,
getElasticFieldName(propertyName), pr, dbEnabled);
+
+ // the query can be null if no inference query is eligible for the
given text or the min terms are not met
+ // in this case, we fall back to the default full-text query
+ if (q != null) {
+ LOG.info("Using inference query: {}", q);
+ try {
+ // let's retrieve the fields with the same model as the query
+ final ElasticIndexDefinition.InferenceDefinition.Query query =
q;
+ List<ElasticIndexDefinition.InferenceDefinition.Property>
properties = elasticIndexDefinition.inferenceDefinition.properties.stream()
+ .filter(pd -> pd.model.equals(query.model))
+ .collect(Collectors.toList());
+ if (!properties.isEmpty()) {
+ InferenceService inferenceService =
InferenceServiceManager.getInstance(q.serviceUrl, q.model);
+ List<Float> embeddings =
inferenceService.embeddings(queryText, (int) q.timeout);
+ if (embeddings != null) {
+ for
(ElasticIndexDefinition.InferenceDefinition.Property p : properties) {
+ //
https://www.elastic.co/guide/en/elasticsearch/reference/current/query-dsl-knn-query.html
+ KnnQuery.Builder knnQueryBuilder = new
KnnQuery.Builder();
+ knnQueryBuilder.field(p.name + ".value");
+ knnQueryBuilder.numCandidates(q.numCandidates);
+ knnQueryBuilder.queryVector(embeddings);
+ knnQueryBuilder.similarity(q.similarityThreshold);
+ b.should(s -> s.knn(knnQueryBuilder.build()));
+ }
+ int tokens = text.split(" ").length;
+ double qsBoost;
+ if (tokens > 1) {
+ qsBoost = 1.0d / (5 * tokens);
+ } else {
+ qsBoost = 1.0d;
+ }
Review Comment:
It's a matter of taste, but here I think the ternary operator would be more
clear.
##########
oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticDocumentMaker.java:
##########
@@ -57,6 +58,7 @@ protected ElasticDocument finalizeDoc(ElasticDocument doc,
boolean dirty, boolea
// Evaluate path restrictions is enabled by default in elastic. Always
index ancestors.
// When specifically disabled, we will keep indexing it, but the field
won't be used at query time
doc.indexAncestors(path);
+ doc.setLastUpdated(Instant.now().toEpochMilli());
Review Comment:
As this method is an hot spot, we could use directly
System.currentTimeMillis(), which should be much faster than creating an
Instant and converting it to millis.
##########
oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/inference/InferenceService.java:
##########
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.index.elastic.query.inference;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import java.io.BufferedReader;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.net.HttpURLConnection;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * EXPERIMENTAL: A service that sends text to an inference service and
receives embeddings in return.
+ * The embeddings are cached to avoid repeated calls to the inference service.
+ */
+public class InferenceService {
+
+ private final URL url;
+ private final Cache<String, List<Float>> cache;
+
+ public InferenceService(String url, int cacheSize) {
+ try {
+ this.url = new URL(url);
+ } catch (MalformedURLException e) {
+ throw new IllegalArgumentException("Invalid URL: " + url, e);
+ }
+ this.cache = new Cache<>(cacheSize);
+ }
+
+ public List<Float> embeddings(String text, int timeoutMillis) {
+ if (cache.containsKey(text)) {
+ return cache.get(text);
+ }
+
+ HttpURLConnection connection;
Review Comment:
Not thread safe. Should it be?
And if it should be thread safe, this does not protect against 2 or more
threads calling this method for the same text at the same time, it will make
the HTTP call several times for the same value.
##########
oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/inference/InferenceServiceManager.java:
##########
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.index.elastic.query.inference;
+
+import org.jetbrains.annotations.NotNull;
+
+import java.util.concurrent.ConcurrentHashMap;
+
+public class InferenceServiceManager {
+
+ private static final String INFERENCE_CACHE_SIZE =
"oak.inference.cache.size";
+
+ private static final ConcurrentHashMap<String, InferenceService> SERVICES
= new ConcurrentHashMap<>();
+
+ public static InferenceService getInstance(@NotNull String url, String
model) {
+ String k = model == null ? url : url + "|" + model;
+ return SERVICES.computeIfAbsent(k, key -> new InferenceService(url,
Integer.getInteger(INFERENCE_CACHE_SIZE, 100)));
Review Comment:
This cache is not limited in size, this can be dangerous if calling this
method with many variations of url and model. I suggest adding a guard here,
just to warn or limit if the size unexpectedly grows too large, to avoid
surprises.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]