This is an automated email from the ASF dual-hosted git repository.

pvillard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new b608e5a2f0 NIFI-12831: Add PutOpenSearchVector and 
QueryOpenSearchVector processors
b608e5a2f0 is described below

commit b608e5a2f0002a0f9c6d6121815b54f6dae67dc4
Author: Mark Bathori <bathori.m...@gmail.com>
AuthorDate: Wed Feb 21 15:13:47 2024 +0100

    NIFI-12831: Add PutOpenSearchVector and QueryOpenSearchVector processors
    
    Signed-off-by: Pierre Villard <pierre.villard...@gmail.com>
    
    This closes #8441.
---
 .../python/vectorstores/OpenSearchVectorUtils.py   | 142 ++++++++++++
 .../python/vectorstores/PutOpenSearchVector.py     | 245 +++++++++++++++++++++
 .../python/vectorstores/QueryOpenSearchVector.py   | 219 ++++++++++++++++++
 .../src/main/python/vectorstores/requirements.txt  |   3 +
 4 files changed, 609 insertions(+)

diff --git 
a/nifi-python-extensions/nifi-text-embeddings-module/src/main/python/vectorstores/OpenSearchVectorUtils.py
 
b/nifi-python-extensions/nifi-text-embeddings-module/src/main/python/vectorstores/OpenSearchVectorUtils.py
new file mode 100644
index 0000000000..a10eaba7c9
--- /dev/null
+++ 
b/nifi-python-extensions/nifi-text-embeddings-module/src/main/python/vectorstores/OpenSearchVectorUtils.py
@@ -0,0 +1,142 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from nifiapi.properties import PropertyDescriptor, StandardValidators, 
ExpressionLanguageScope, PropertyDependency
+from EmbeddingUtils import OPENAI, HUGGING_FACE, EMBEDDING_MODEL
+
+# Space types
+L2 = ("L2 (Euclidean distance)", "l2")
+L1 = ("L1 (Manhattan distance)", "l1")
+LINF = ("L-infinity (chessboard) distance", "linf")
+COSINESIMIL = ("Cosine similarity", "cosinesimil")
+
+HUGGING_FACE_API_KEY = PropertyDescriptor(
+    name="HuggingFace API Key",
+    description="The API Key for interacting with HuggingFace",
+    required=True,
+    sensitive=True,
+    validators=[StandardValidators.NON_EMPTY_VALIDATOR],
+    dependencies=[PropertyDependency(EMBEDDING_MODEL, HUGGING_FACE)]
+)
+HUGGING_FACE_MODEL = PropertyDescriptor(
+    name="HuggingFace Model",
+    description="The name of the HuggingFace model to use",
+    default_value="sentence-transformers/all-MiniLM-L6-v2",
+    required=True,
+    validators=[StandardValidators.NON_EMPTY_VALIDATOR],
+    dependencies=[PropertyDependency(EMBEDDING_MODEL, HUGGING_FACE)]
+)
+OPENAI_API_KEY = PropertyDescriptor(
+    name="OpenAI API Key",
+    description="The API Key for OpenAI in order to create embeddings",
+    required=True,
+    sensitive=True,
+    validators=[StandardValidators.NON_EMPTY_VALIDATOR],
+    dependencies=[PropertyDependency(EMBEDDING_MODEL, OPENAI)]
+)
+OPENAI_API_MODEL = PropertyDescriptor(
+    name="OpenAI Model",
+    description="The API Key for OpenAI in order to create embeddings",
+    default_value="text-embedding-ada-002",
+    required=True,
+    validators=[StandardValidators.NON_EMPTY_VALIDATOR],
+    dependencies=[PropertyDependency(EMBEDDING_MODEL, OPENAI)]
+)
+HTTP_HOST = PropertyDescriptor(
+    name="HTTP Host",
+    description="URL where OpenSearch is hosted.",
+    default_value="http://localhost:9200";,
+    required=True,
+    validators=[StandardValidators.URL_VALIDATOR]
+)
+USERNAME = PropertyDescriptor(
+    name="Username",
+    description="The username to use for authenticating to OpenSearch server",
+    required=False,
+    validators=[StandardValidators.NON_EMPTY_VALIDATOR]
+)
+PASSWORD = PropertyDescriptor(
+    name="Password",
+    description="The password to use for authenticating to OpenSearch server",
+    required=False,
+    sensitive=True,
+    validators=[StandardValidators.NON_EMPTY_VALIDATOR]
+)
+INDEX_NAME = PropertyDescriptor(
+    name="Index Name",
+    description="The name of the OpenSearch index.",
+    sensitive=False,
+    required=True,
+    validators=[StandardValidators.NON_EMPTY_VALIDATOR],
+    expression_language_scope=ExpressionLanguageScope.FLOWFILE_ATTRIBUTES
+)
+VECTOR_FIELD = PropertyDescriptor(
+    name="Vector Field Name",
+    description="The name of field in the document where the embeddings are 
stored. This field need to be a 'knn_vector' typed field.",
+    default_value="vector_field",
+    required=True,
+    validators=[StandardValidators.NON_EMPTY_VALIDATOR],
+    expression_language_scope=ExpressionLanguageScope.FLOWFILE_ATTRIBUTES
+)
+TEXT_FIELD = PropertyDescriptor(
+    name="Text Field Name",
+    description="The name of field in the document where the text is stored.",
+    default_value="text",
+    required=True,
+    validators=[StandardValidators.NON_EMPTY_VALIDATOR],
+    expression_language_scope=ExpressionLanguageScope.FLOWFILE_ATTRIBUTES
+)
+
+
+def create_authentication_params(context):
+    username = context.getProperty(USERNAME).getValue()
+    password = context.getProperty(PASSWORD).getValue()
+
+    params = {"verify_certs": "true"}
+
+    if username is not None and password is not None:
+        params["http_auth"] = (username, password)
+
+    return params
+
+
+def parse_documents(json_lines, id_field_name, file_name):
+    import json
+
+    texts = []
+    metadatas = []
+    ids = []
+    for i, line in enumerate(json_lines.split("\n"), start=1):
+        try:
+            doc = json.loads(line)
+        except Exception as e:
+            raise ValueError(f"Could not parse line {i} as JSON") from e
+
+        text = doc.get('text')
+        metadata = doc.get('metadata')
+        texts.append(text)
+
+        # Remove any null values, or it will cause the embedding to fail
+        filtered_metadata = {key: value for key, value in metadata.items() if 
value is not None}
+        metadatas.append(filtered_metadata)
+
+        doc_id = None
+        if id_field_name is not None:
+            doc_id = metadata.get(id_field_name)
+        if doc_id is None:
+            doc_id = file_name + "-" + str(i)
+        ids.append(doc_id)
+
+    return {"texts": texts, "metadatas": metadatas, "ids": ids}
diff --git 
a/nifi-python-extensions/nifi-text-embeddings-module/src/main/python/vectorstores/PutOpenSearchVector.py
 
b/nifi-python-extensions/nifi-text-embeddings-module/src/main/python/vectorstores/PutOpenSearchVector.py
new file mode 100644
index 0000000000..c0ff29bdb7
--- /dev/null
+++ 
b/nifi-python-extensions/nifi-text-embeddings-module/src/main/python/vectorstores/PutOpenSearchVector.py
@@ -0,0 +1,245 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from langchain.vectorstores import OpenSearchVectorSearch
+from nifiapi.flowfiletransform import FlowFileTransform, 
FlowFileTransformResult
+from nifiapi.properties import PropertyDescriptor, StandardValidators, 
ExpressionLanguageScope, PropertyDependency
+from OpenSearchVectorUtils import (L2, L1, LINF, COSINESIMIL, OPENAI_API_KEY, 
OPENAI_API_MODEL, HUGGING_FACE_API_KEY,
+                                   HUGGING_FACE_MODEL, HTTP_HOST, USERNAME, 
PASSWORD, INDEX_NAME, VECTOR_FIELD,
+                                   TEXT_FIELD, create_authentication_params, 
parse_documents)
+from EmbeddingUtils import EMBEDDING_MODEL, create_embedding_service
+from nifiapi.documentation import use_case, ProcessorConfiguration
+
+
+@use_case(description="Create vectors/embeddings that represent text content 
and send the vectors to OpenSearch",
+          notes="This use case assumes that the data has already been 
formatted in JSONL format with the text to store in OpenSearch provided in the 
'text' field.",
+          keywords=["opensearch", "embedding", "vector", "text", 
"vectorstore", "insert"],
+          configuration="""
+                Configure the 'HTTP Host' to an appropriate URL where 
OpenSearch is accessible.
+                Configure 'Embedding Model' to indicate whether OpenAI 
embeddings should be used or a HuggingFace embedding model should be used: 
'Hugging Face Model' or 'OpenAI Model'
+                Configure the 'OpenAI API Key' or 'HuggingFace API Key', 
depending on the chosen Embedding Model.
+                Set 'Index Name' to the name of your OpenSearch Index.
+                Set 'Vector Field Name' to the name of the field in the 
document which will store the vector data.
+                Set 'Text Field Name' to the name of the field in the document 
which will store the text data.
+
+                If the documents to send to OpenSearch contain a unique 
identifier, set the 'Document ID Field Name' property to the name of the field 
that contains the document ID.
+                This property can be left blank, in which case a unique ID 
will be generated based on the FlowFile's filename.
+
+                If the provided index does not exists in OpenSearch then the 
processor is capable to create it. The 'New Index Strategy' property defines 
+                that the index needs to be created from the default template 
or it should be configured with custom values.
+                """)
+@use_case(description="Update vectors/embeddings in OpenSearch",
+          notes="This use case assumes that the data has already been 
formatted in JSONL format with the text to store in OpenSearch provided in the 
'text' field.",
+          keywords=["opensearch", "embedding", "vector", "text", 
"vectorstore", "update", "upsert"],
+          configuration="""
+                Configure the 'HTTP Host' to an appropriate URL where 
OpenSearch is accessible.
+                Configure 'Embedding Model' to indicate whether OpenAI 
embeddings should be used or a HuggingFace embedding model should be used: 
'Hugging Face Model' or 'OpenAI Model'
+                Configure the 'OpenAI API Key' or 'HuggingFace API Key', 
depending on the chosen Embedding Model.
+                Set 'Index Name' to the name of your OpenSearch Index.
+                Set 'Vector Field Name' to the name of the field in the 
document which will store the vector data.
+                Set 'Text Field Name' to the name of the field in the document 
which will store the text data.
+                Set the 'Document ID Field Name' property to the name of the 
field that contains the identifier of the document in OpenSearch to update.
+                """)
+class PutOpenSearchVector(FlowFileTransform):
+    class Java:
+        implements = ['org.apache.nifi.python.processor.FlowFileTransform']
+
+    class ProcessorDetails:
+        version = '@project.version@'
+        description = """Publishes JSON data to OpenSearch. The Incoming data 
must be in single JSON per Line format, each with two keys: 'text' and 
'metadata'.
+                       The text must be a string, while metadata must be a map 
with strings for values. Any additional fields will be ignored."""
+        tags = ["opensearch", "vector", "vectordb", "vectorstore", 
"embeddings", "ai", "artificial intelligence", "ml",
+                "machine learning", "text", "LLM"]
+
+    # Engine types
+    NMSLIB = ("nmslib (Non-Metric Space Library)", "nmslib")
+    FAISS = ("faiss (Facebook AI Similarity Search)", "faiss")
+    LUCENE = ("lucene", "lucene")
+
+    ENGINE_VALUES = dict([NMSLIB, FAISS, LUCENE])
+
+    # Space types
+    INNERPRODUCT = ("Inner product", "innerproduct")
+
+    NMSLIB_SPACE_TYPE_VALUES = dict([L2, L1, LINF, COSINESIMIL, INNERPRODUCT])
+    FAISS_SPACE_TYPE_VALUES = dict([L2, INNERPRODUCT])
+    LUCENE_SPACE_TYPE_VALUES = dict([L2, COSINESIMIL])
+
+    # New Index Mapping Strategy
+    DEFAULT_INDEX_MAPPING = "Default index mapping"
+    CUSTOM_INDEX_MAPPING = "Custom index mapping"
+
+    DOC_ID_FIELD_NAME = PropertyDescriptor(
+        name="Document ID Field Name",
+        description="""Specifies the name of the field in the 'metadata' 
element of each document where the document's ID can be found.  
+                    If not specified, an ID will be generated based on the 
FlowFile's filename and a one-up number.""",
+        required=False,
+        validators=[StandardValidators.NON_EMPTY_VALIDATOR],
+        expression_language_scope=ExpressionLanguageScope.FLOWFILE_ATTRIBUTES
+    )
+    NEW_INDEX_STRATEGY = PropertyDescriptor(
+        name="New Index Strategy",
+        description="Specifies the Mapping strategy to use for new index 
creation. The default template values are the following: "
+                    "{engine: nmslib, space_type: l2, ef_search: 512, 
ef_construction: 512, m: 16}",
+        allowable_values=[DEFAULT_INDEX_MAPPING, CUSTOM_INDEX_MAPPING],
+        default_value=DEFAULT_INDEX_MAPPING,
+        required=False,
+    )
+    ENGINE = PropertyDescriptor(
+        name="Engine",
+        description="The approximate k-NN library to use for indexing and 
search.",
+        allowable_values=ENGINE_VALUES.keys(),
+        default_value=NMSLIB[0],
+        required=False,
+        dependencies=[PropertyDependency(NEW_INDEX_STRATEGY, 
CUSTOM_INDEX_MAPPING)]
+    )
+    NMSLIB_SPACE_TYPE = PropertyDescriptor(
+        name="NMSLIB Space Type",
+        description="The vector space used to calculate the distance between 
vectors.",
+        allowable_values=NMSLIB_SPACE_TYPE_VALUES.keys(),
+        default_value=L2[0],
+        required=False,
+        dependencies=[PropertyDependency(NEW_INDEX_STRATEGY, 
CUSTOM_INDEX_MAPPING),
+                      PropertyDependency(ENGINE, NMSLIB[0])]
+    )
+    FAISS_SPACE_TYPE = PropertyDescriptor(
+        name="FAISS Space Type",
+        description="The vector space used to calculate the distance between 
vectors.",
+        allowable_values=FAISS_SPACE_TYPE_VALUES.keys(),
+        default_value=L2[0],
+        required=False,
+        dependencies=[PropertyDependency(NEW_INDEX_STRATEGY, 
CUSTOM_INDEX_MAPPING),
+                      PropertyDependency(ENGINE, FAISS[0])]
+    )
+    LUCENE_SPACE_TYPE = PropertyDescriptor(
+        name="Lucene Space Type",
+        description="The vector space used to calculate the distance between 
vectors.",
+        allowable_values=LUCENE_SPACE_TYPE_VALUES.keys(),
+        default_value=L2[0],
+        required=False,
+        dependencies=[PropertyDependency(NEW_INDEX_STRATEGY, 
CUSTOM_INDEX_MAPPING),
+                      PropertyDependency(ENGINE, LUCENE[0])]
+    )
+    EF_SEARCH = PropertyDescriptor(
+        name="EF Search",
+        description="The size of the dynamic list used during k-NN searches. 
Higher values lead to more accurate but slower searches.",
+        default_value="512",
+        required=False,
+        validators=[StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR],
+        dependencies=[PropertyDependency(NEW_INDEX_STRATEGY, 
CUSTOM_INDEX_MAPPING)]
+    )
+    EF_CONSTRUCTION = PropertyDescriptor(
+        name="EF Construction",
+        description="The size of the dynamic list used during k-NN graph 
creation. Higher values lead to a more accurate graph but slower indexing 
speed.",
+        default_value="512",
+        required=False,
+        validators=[StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR],
+        dependencies=[PropertyDependency(NEW_INDEX_STRATEGY, 
CUSTOM_INDEX_MAPPING)]
+    )
+    M = PropertyDescriptor(
+        name="M",
+        description="The number of bidirectional links that the plugin creates 
for each new element. Increasing and "
+                    "decreasing this value can have a large impact on memory 
consumption. Keep this value between 2 and 100.",
+        default_value="16",
+        required=False,
+        
validators=[StandardValidators._standard_validators.createLongValidator(2, 100, 
True)],
+        dependencies=[PropertyDependency(NEW_INDEX_STRATEGY, 
CUSTOM_INDEX_MAPPING)]
+    )
+
+    properties = [EMBEDDING_MODEL,
+                  OPENAI_API_KEY,
+                  OPENAI_API_MODEL,
+                  HUGGING_FACE_API_KEY,
+                  HUGGING_FACE_MODEL,
+                  HTTP_HOST,
+                  USERNAME,
+                  PASSWORD,
+                  INDEX_NAME,
+                  DOC_ID_FIELD_NAME,
+                  VECTOR_FIELD,
+                  TEXT_FIELD,
+                  NEW_INDEX_STRATEGY,
+                  ENGINE,
+                  NMSLIB_SPACE_TYPE,
+                  FAISS_SPACE_TYPE,
+                  LUCENE_SPACE_TYPE,
+                  EF_SEARCH,
+                  EF_CONSTRUCTION,
+                  M]
+
+    embeddings = None
+
+    def __init__(self, **kwargs):
+        pass
+
+    def getPropertyDescriptors(self):
+        return self.properties
+
+    def onScheduled(self, context):
+        self.embeddings = create_embedding_service(context)
+
+    def transform(self, context, flowfile):
+        file_name = flowfile.getAttribute("filename")
+        http_host = 
context.getProperty(HTTP_HOST).evaluateAttributeExpressions(flowfile).getValue()
+        index_name = 
context.getProperty(INDEX_NAME).evaluateAttributeExpressions(flowfile).getValue()
+        id_field_name = 
context.getProperty(self.DOC_ID_FIELD_NAME).evaluateAttributeExpressions(flowfile).getValue()
+        vector_field = 
context.getProperty(VECTOR_FIELD).evaluateAttributeExpressions(flowfile).getValue()
+        text_field = 
context.getProperty(TEXT_FIELD).evaluateAttributeExpressions(flowfile).getValue()
+        new_index_strategy = 
context.getProperty(self.NEW_INDEX_STRATEGY).evaluateAttributeExpressions().getValue()
+
+        params = {"vector_field": vector_field, "text_field": text_field}
+        params.update(create_authentication_params(context))
+
+        if new_index_strategy == self.CUSTOM_INDEX_MAPPING:
+            engine = 
context.getProperty(self.ENGINE).evaluateAttributeExpressions().getValue()
+            params["engine"] = self.ENGINE_VALUES.get(engine)
+
+            if engine == self.NMSLIB[0]:
+                space_type = 
context.getProperty(self.NMSLIB_SPACE_TYPE).evaluateAttributeExpressions().getValue()
+                params["space_type"] = 
self.NMSLIB_SPACE_TYPE_VALUES.get(space_type)
+            if engine == self.FAISS[0]:
+                space_type = 
context.getProperty(self.FAISS_SPACE_TYPE).evaluateAttributeExpressions().getValue()
+                params["space_type"] = 
self.FAISS_SPACE_TYPE_VALUES.get(space_type)
+            if engine == self.LUCENE[0]:
+                space_type = 
context.getProperty(self.LUCENE_SPACE_TYPE).evaluateAttributeExpressions().getValue()
+                params["space_type"] = 
self.LUCENE_SPACE_TYPE_VALUES.get(space_type)
+
+            ef_search = 
context.getProperty(self.EF_SEARCH).evaluateAttributeExpressions().asInteger()
+            params["ef_search"] = ef_search
+
+            ef_construction = 
context.getProperty(self.EF_CONSTRUCTION).evaluateAttributeExpressions().asInteger()
+            params["ef_construction"] = ef_construction
+
+            m = 
context.getProperty(self.M).evaluateAttributeExpressions().asInteger()
+            params["m"] = m
+
+        # Read the FlowFile content as "json-lines".
+        json_lines = flowfile.getContentsAsBytes().decode()
+        parsed_documents = parse_documents(json_lines, id_field_name, 
file_name)
+
+        vectorstore = OpenSearchVectorSearch(
+            opensearch_url=http_host,
+            index_name=index_name,
+            embedding_function=self.embeddings,
+            **params
+        )
+        vectorstore.add_texts(texts=parsed_documents["texts"],
+                              metadatas=parsed_documents["metadatas"],
+                              ids=parsed_documents["ids"],
+                              **params
+                              )
+
+        return FlowFileTransformResult(relationship="success")
+
diff --git 
a/nifi-python-extensions/nifi-text-embeddings-module/src/main/python/vectorstores/QueryOpenSearchVector.py
 
b/nifi-python-extensions/nifi-text-embeddings-module/src/main/python/vectorstores/QueryOpenSearchVector.py
new file mode 100644
index 0000000000..488c01d197
--- /dev/null
+++ 
b/nifi-python-extensions/nifi-text-embeddings-module/src/main/python/vectorstores/QueryOpenSearchVector.py
@@ -0,0 +1,219 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from langchain.vectorstores import OpenSearchVectorSearch
+from nifiapi.flowfiletransform import FlowFileTransform, 
FlowFileTransformResult
+from nifiapi.properties import PropertyDescriptor, StandardValidators, 
ExpressionLanguageScope, PropertyDependency
+from OpenSearchVectorUtils import (L2, L1, LINF, COSINESIMIL, OPENAI_API_KEY, 
OPENAI_API_MODEL, HUGGING_FACE_API_KEY, HUGGING_FACE_MODEL, HTTP_HOST,
+                                   USERNAME, PASSWORD, INDEX_NAME, 
VECTOR_FIELD, TEXT_FIELD, create_authentication_params)
+from QueryUtils import OUTPUT_STRATEGY, RESULTS_FIELD, INCLUDE_METADATAS, 
INCLUDE_DISTANCES, QueryUtils
+import json
+from EmbeddingUtils import EMBEDDING_MODEL, create_embedding_service
+
+class QueryOpenSearchVector(FlowFileTransform):
+    class Java:
+        implements = ['org.apache.nifi.python.processor.FlowFileTransform']
+
+    class ProcessorDetails:
+        version = '@project.version@'
+        description = "Queries OpenSearch in order to gather a specified 
number of documents that are most closely related to the given query."
+        tags = ["opensearch", "vector", "vectordb", "vectorstore", 
"embeddings", "ai", "artificial intelligence", "ml",
+                "machine learning", "text", "LLM"]
+
+    # Search types
+    APPROXIMATE_SEARCH = ("Approximate Search", "approximate_search")
+    SCRIPT_SCORING_SEARCH = ("Script Scoring Search", "script_scoring")
+    PAINLESS_SCRIPTING_SEARCH = ("Painless Scripting Search", 
"painless_scripting")
+
+    SEARCH_TYPE_VALUES = dict([APPROXIMATE_SEARCH, SCRIPT_SCORING_SEARCH, 
PAINLESS_SCRIPTING_SEARCH])
+
+    # Script Scoring Search space types
+    HAMMINGBIT = ("Hamming distance", "hammingbit")
+
+    SCRIPT_SCORING_SPACE_TYPE_VALUES = dict([L2, L1, LINF, COSINESIMIL, 
HAMMINGBIT])
+
+    # Painless Scripting Search space types
+    L2_SQUARED = ("L2 (Euclidean distance)", "l2Squared")
+    L1_NORM = ("L1 (Manhattan distance)", "l1Norm")
+    COSINE_SIMILARITY = ("Cosine similarity", "cosineSimilarity")
+
+    PAINLESS_SCRIPTING_SPACE_TYPE_VALUES = dict([L2_SQUARED, L1_NORM, 
COSINE_SIMILARITY])
+
+    QUERY = PropertyDescriptor(
+        name="Query",
+        description="The text of the query to send to OpenSearch.",
+        required=True,
+        validators=[StandardValidators.NON_EMPTY_VALIDATOR],
+        expression_language_scope=ExpressionLanguageScope.FLOWFILE_ATTRIBUTES
+    )
+    NUMBER_OF_RESULTS = PropertyDescriptor(
+        name="Number of Results",
+        description="The number of results to return from OpenSearch",
+        default_value="10",
+        required=True,
+        validators=[StandardValidators.POSITIVE_INTEGER_VALIDATOR],
+        expression_language_scope=ExpressionLanguageScope.FLOWFILE_ATTRIBUTES
+    )
+    SEARCH_TYPE = PropertyDescriptor(
+        name="Search Type",
+        description="Specifies the type of the search to be performed.",
+        allowable_values=SEARCH_TYPE_VALUES.keys(),
+        default_value=APPROXIMATE_SEARCH[0],
+        required=True
+    )
+    SCRIPT_SCORING_SPACE_TYPE = PropertyDescriptor(
+        name="Script Scoring Space Type",
+        description="Used to measure the distance between two points in order 
to determine the k-nearest neighbors.",
+        allowable_values=SCRIPT_SCORING_SPACE_TYPE_VALUES.keys(),
+        default_value=L2[0],
+        required=False,
+        dependencies=[PropertyDependency(SEARCH_TYPE, 
SCRIPT_SCORING_SEARCH[0])]
+    )
+    PAINLESS_SCRIPTING_SPACE_TYPE = PropertyDescriptor(
+        name="Painless Scripting Space Type",
+        description="Used to measure the distance between two points in order 
to determine the k-nearest neighbors.",
+        allowable_values=PAINLESS_SCRIPTING_SPACE_TYPE_VALUES.keys(),
+        default_value=L2_SQUARED[0],
+        required=False,
+        dependencies=[PropertyDependency(SEARCH_TYPE, 
PAINLESS_SCRIPTING_SEARCH[0])]
+    )
+    BOOLEAN_FILTER = PropertyDescriptor(
+        name="Boolean Filter",
+        description="A Boolean filter is a post filter consists of a Boolean 
query that contains a k-NN query and a filter. "
+                    "The value of the field must be a JSON representation of 
the filter.",
+        required=False,
+        validators=[StandardValidators.NON_EMPTY_VALIDATOR],
+        dependencies=[PropertyDependency(SEARCH_TYPE, APPROXIMATE_SEARCH[0])]
+    )
+    EFFICIENT_FILTER = PropertyDescriptor(
+        name="Efficient Filter",
+        description="The Lucene Engine or Faiss Engine decides whether to 
perform an exact k-NN search with "
+                    "pre-filtering or an approximate search with modified 
post-filtering. The value of the field must "
+                    "be a JSON representation of the filter.",
+        required=False,
+        validators=[StandardValidators.NON_EMPTY_VALIDATOR],
+        dependencies=[PropertyDependency(SEARCH_TYPE, APPROXIMATE_SEARCH[0])]
+    )
+    PRE_FILTER = PropertyDescriptor(
+        name="Pre Filter",
+        description="Script Score query to pre-filter documents before 
identifying nearest neighbors. The value of "
+                    "the field must be a JSON representation of the filter.",
+        default_value="{\"match_all\": {}}",
+        required=False,
+        validators=[StandardValidators.NON_EMPTY_VALIDATOR],
+        dependencies=[PropertyDependency(SEARCH_TYPE, 
SCRIPT_SCORING_SEARCH[0], PAINLESS_SCRIPTING_SEARCH[0])]
+    )
+
+    properties = [EMBEDDING_MODEL,
+                  OPENAI_API_KEY,
+                  OPENAI_API_MODEL,
+                  HUGGING_FACE_API_KEY,
+                  HUGGING_FACE_MODEL,
+                  HTTP_HOST,
+                  USERNAME,
+                  PASSWORD,
+                  INDEX_NAME,
+                  QUERY,
+                  VECTOR_FIELD,
+                  TEXT_FIELD,
+                  NUMBER_OF_RESULTS,
+                  SEARCH_TYPE,
+                  SCRIPT_SCORING_SPACE_TYPE,
+                  PAINLESS_SCRIPTING_SPACE_TYPE,
+                  BOOLEAN_FILTER,
+                  EFFICIENT_FILTER,
+                  PRE_FILTER,
+                  OUTPUT_STRATEGY,
+                  RESULTS_FIELD,
+                  INCLUDE_METADATAS,
+                  INCLUDE_DISTANCES]
+
+    embeddings = None
+    query_utils = None
+
+    def __init__(self, **kwargs):
+        pass
+
+    def getPropertyDescriptors(self):
+        return self.properties
+
+    def onScheduled(self, context):
+        # initialize embedding service
+        self.embeddings = create_embedding_service(context)
+        self.query_utils = QueryUtils(context)
+
+    def transform(self, context, flowfile):
+        http_host = 
context.getProperty(HTTP_HOST).evaluateAttributeExpressions(flowfile).getValue()
+        index_name = 
context.getProperty(INDEX_NAME).evaluateAttributeExpressions(flowfile).getValue()
+        query = 
context.getProperty(self.QUERY).evaluateAttributeExpressions(flowfile).getValue()
+        num_results = 
context.getProperty(self.NUMBER_OF_RESULTS).evaluateAttributeExpressions(flowfile).asInteger()
+        vector_field = 
context.getProperty(VECTOR_FIELD).evaluateAttributeExpressions(flowfile).getValue()
+        text_field = 
context.getProperty(TEXT_FIELD).evaluateAttributeExpressions(flowfile).getValue()
+        search_type = 
context.getProperty(self.SEARCH_TYPE).evaluateAttributeExpressions().getValue()
+
+        params = {"vector_field": vector_field,
+                  "text_field": text_field,
+                  "search_type": self.SEARCH_TYPE_VALUES.get(search_type)}
+        params.update(create_authentication_params(context))
+
+        if search_type == self.APPROXIMATE_SEARCH[0]:
+            boolean_filter = 
context.getProperty(self.BOOLEAN_FILTER).evaluateAttributeExpressions().getValue()
+            if boolean_filter is not None:
+                params["boolean_filter"] = json.loads(boolean_filter)
+
+            efficient_filter = 
context.getProperty(self.EFFICIENT_FILTER).evaluateAttributeExpressions().getValue()
+            if efficient_filter is not None:
+                params["efficient_filter"] = json.loads(efficient_filter)
+        else:
+            pre_filter = 
context.getProperty(self.PRE_FILTER).evaluateAttributeExpressions().getValue()
+            if pre_filter is not None:
+                params["pre_filter"] = json.loads(pre_filter)
+            if search_type == self.SCRIPT_SCORING_SEARCH[0]:
+                space_type = 
context.getProperty(self.SCRIPT_SCORING_SPACE_TYPE).evaluateAttributeExpressions().getValue()
+                params["space_type"] = 
self.SCRIPT_SCORING_SPACE_TYPE_VALUES.get(space_type)
+            elif search_type == self.PAINLESS_SCRIPTING_SEARCH[0]:
+                space_type = 
context.getProperty(self.PAINLESS_SCRIPTING_SPACE_TYPE).evaluateAttributeExpressions().getValue()
+                params["space_type"] = 
self.PAINLESS_SCRIPTING_SPACE_TYPE_VALUES.get(space_type)
+
+        vectorstore = OpenSearchVectorSearch(index_name=index_name,
+                                             
embedding_function=self.embeddings,
+                                             opensearch_url=http_host,
+                                             **params
+                                             )
+
+        results = vectorstore.similarity_search_with_score(query=query, 
k=num_results, **params)
+
+        documents = []
+        for result in results:
+            documents.append(result[0].page_content)
+
+        if context.getProperty(INCLUDE_METADATAS):
+            metadatas = []
+            for result in results:
+                metadatas.append(result[0].metadata)
+        else:
+            metadatas = None
+
+        if context.getProperty(INCLUDE_DISTANCES):
+            distances = []
+            for result in results:
+                distances.append(result[1])
+        else:
+            distances = None
+
+        (output_contents, mime_type) = self.query_utils.create_json(flowfile, 
documents, metadatas, None, distances, None)
+        attributes = {"mime.type": mime_type}
+
+        return FlowFileTransformResult(relationship="success", 
contents=output_contents, attributes=attributes)
diff --git 
a/nifi-python-extensions/nifi-text-embeddings-module/src/main/python/vectorstores/requirements.txt
 
b/nifi-python-extensions/nifi-text-embeddings-module/src/main/python/vectorstores/requirements.txt
index ad78d29d03..fbefc24508 100644
--- 
a/nifi-python-extensions/nifi-text-embeddings-module/src/main/python/vectorstores/requirements.txt
+++ 
b/nifi-python-extensions/nifi-text-embeddings-module/src/main/python/vectorstores/requirements.txt
@@ -27,3 +27,6 @@ requests
 pinecone-client==3.0.1
 tiktoken
 langchain==0.1.11
+
+# OpenSearch requirements
+opensearch-py==2.5.0

Reply via email to