This is an automated email from the ASF dual-hosted git repository.

joewitt pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-python-extensions.git


The following commit(s) were added to refs/heads/main by this push:
     new 26adfa6  NIFI-13448 Added initial Python Processors This closes #3
26adfa6 is described below

commit 26adfa6767416be7296f3c33b90f44561b6f257f
Author: exceptionfactory <exceptionfact...@apache.org>
AuthorDate: Wed Jun 26 13:01:11 2024 -0500

    NIFI-13448 Added initial Python Processors
    This closes #3
    
    - Added pyproject.toml with source bundles directory
    - Added Check Formatting step with Hatch
    - Added Build Distribution step with Hatch
    
    Signed-off-by: David Handermann <exceptionfact...@apache.org>
    
    NIFI-13448 Excluded GitHub and Check Licenses
---
 .github/workflows/build.yml                        |   4 +
 .ratignore                                         |   1 +
 README.md                                          |  20 ++
 pyproject.toml                                     |  74 +++++
 src/__about__.py                                   |   3 +
 src/__init__.py                                    |   1 +
 src/extensions/chunking/ChunkDocument.py           | 277 ++++++++++++++++++
 src/extensions/chunking/ParseDocument.py           | 314 +++++++++++++++++++++
 src/extensions/chunking/__init__.py                |   1 +
 src/extensions/openai/PromptChatGPT.py             | 219 ++++++++++++++
 src/extensions/openai/__init__.py                  |   1 +
 src/extensions/vectorstores/ChromaUtils.py         | 138 +++++++++
 src/extensions/vectorstores/EmbeddingUtils.py      | 165 +++++++++++
 .../vectorstores/OpenSearchVectorUtils.py          | 130 +++++++++
 src/extensions/vectorstores/PutChroma.py           | 128 +++++++++
 src/extensions/vectorstores/PutOpenSearchVector.py | 267 ++++++++++++++++++
 src/extensions/vectorstores/PutPinecone.py         | 220 +++++++++++++++
 src/extensions/vectorstores/PutQdrant.py           | 151 ++++++++++
 src/extensions/vectorstores/QdrantUtils.py         | 112 ++++++++
 src/extensions/vectorstores/QueryChroma.py         | 181 ++++++++++++
 .../vectorstores/QueryOpenSearchVector.py          | 242 ++++++++++++++++
 src/extensions/vectorstores/QueryPinecone.py       | 205 ++++++++++++++
 src/extensions/vectorstores/QueryQdrant.py         | 161 +++++++++++
 src/extensions/vectorstores/QueryUtils.py          | 171 +++++++++++
 src/extensions/vectorstores/__init__.py            |   1 +
 src/extensions/vectorstores/requirements.txt       |  24 ++
 26 files changed, 3211 insertions(+)

diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml
index 15b5e4a..3ba97bb 100644
--- a/.github/workflows/build.yml
+++ b/.github/workflows/build.yml
@@ -43,3 +43,7 @@ jobs:
         run: |
           python -m pip install --upgrade pip
           pip install hatch
+      - name: Check Formatting
+        run: hatch fmt --check
+      - name: Build Distribution
+        run: hatch build
diff --git a/.ratignore b/.ratignore
index af3daed..3ca979f 100644
--- a/.ratignore
+++ b/.ratignore
@@ -14,3 +14,4 @@ share/python-wheels/*
 .idea/*
 .git/*
 .cache/*
+.ruff_cache/*
diff --git a/README.md b/README.md
index 9ff87aa..85e459c 100644
--- a/README.md
+++ b/README.md
@@ -2,15 +2,35 @@
 
 
[![license](https://img.shields.io/github/license/apache/nifi-python-extensions)](https://github.com/apache/nifi-python-extensions/blob/main/LICENSE)
 
[![build](https://github.com/apache/nifi-python-extensions/actions/workflows/build.yml/badge.svg)](https://github.com/apache/nifi-python-extensions/actions/workflows/build.yml)
+[![Hatch](https://img.shields.io/badge/%F0%9F%A5%9A-Hatch-4051b5.svg)](https://github.com/pypa/hatch)
+[![Ruff](https://img.shields.io/endpoint?url=https://raw.githubusercontent.com/astral-sh/ruff/main/assets/badge/v2.json)](https://github.com/astral-sh/ruff)
 
 The [Apache NiFi](https://nifi.apache.org) Python Extensions repository 
contains Processors implemented in [Python](https://www.python.org/)
 for deployment in Apache NiFi 2.
 
+## Building
+
+This project uses [Hatch](https://hatch.pypa.io) to build distribution 
packages.
+
+```
+hatch build
+```
+
+The build command creates a source distribution in the `dist` directory.
+
+The source distribution contains an `extensions` directory can be copied into 
Apache NiFi to use the packaged Processors.
+
 ## Developing
 
 The Apache NiFi [Python Developer's 
Guide](https://nifi.apache.org/documentation/nifi-2.0.0-M3/html/python-developer-guide.html)
 provides the API and implementation guidelines for Python Processors.
 
+The Hatch format command supports evaluating Python Processors against 
configured rules.
+
+```
+hatch fmt --check
+```
+
 ## Documentation
 
 The Apache NiFi [Documentation](https://nifi.apache.org/documentation/) 
includes reference information for project capabilities.
diff --git a/pyproject.toml b/pyproject.toml
new file mode 100644
index 0000000..f47f4e3
--- /dev/null
+++ b/pyproject.toml
@@ -0,0 +1,74 @@
+# SPDX-License-Identifier: Apache-2.0
+
+[build-system]
+requires = ["hatchling"]
+build-backend = "hatchling.build"
+
+[project]
+name = "nifi-python-extensions"
+dynamic = ["version"]
+description = "Apache NiFi Processors implemented in Python"
+requires-python = ">=3.11"
+keywords = ["apache", "nifi", "extensions", "processors"]
+readme = "README.md"
+authors = [
+    { name = "Apache NiFi Developers", email = "d...@nifi.apache.org" },
+]
+maintainers = [
+    { name = "Apache NiFi Developers", email = "d...@nifi.apache.org" },
+]
+classifiers = [
+    "Development Status :: 5 - Production/Stable",
+    "License :: OSI Approved :: Apache Software License",
+    "Intended Audience :: Developers",
+    "Programming Language :: Python",
+    "Programming Language :: Python :: 3.11",
+    "Programming Language :: Python :: 3.12",
+    "Framework :: Hatch",
+]
+
+[project.urls]
+Homepage = "https://nifi.apache.org";
+Issues = "https://issues.apache.org/jira/projects/NIFI/issues";
+Source = "https://github.com/apache/nifi-python-extensions";
+
+[tool.hatch.version]
+path = "src/__about__.py"
+
+[[tool.hatch.envs.all.matrix]]
+python = ["3.11", "3.12"]
+
+[tool.hatch.build.targets.wheel]
+packages = ["src/extensions"]
+
+[tool.hatch.build.targets.sdist]
+exclude = [
+    ".asf.yaml",
+    ".github",
+    ".ratignore",
+    "check-licenses.sh",
+]
+
+[tool.ruff]
+preview = true
+lint.pep8-naming.extend-ignore-names = [
+    "flowFile",
+    "getPropertyDescriptors",
+    "onScheduled",
+]
+lint.flake8-self.extend-ignore-names = [
+    "_standard_validators"
+]
+lint.extend-select = [
+    "CPY001"
+]
+lint.ignore = [
+    "G004", # Allow f-string for logging
+    "N999", # Allow Processor module names that do not follow pep8-naming
+    "PERF401", # Allow manual list comprehension
+    "RUF012", # Allow mutable class attributes without typing.ClassVar
+    "S105", # Avoid checking for hardcoded-password-string values
+]
+
+[tool.ruff.lint.flake8-copyright]
+notice-rgx = "# SPDX-License-Identifier: Apache-2.0\n"
diff --git a/src/__about__.py b/src/__about__.py
new file mode 100644
index 0000000..c442147
--- /dev/null
+++ b/src/__about__.py
@@ -0,0 +1,3 @@
+# SPDX-License-Identifier: Apache-2.0
+
+__version__ = "2.0.0.dev0"
diff --git a/src/__init__.py b/src/__init__.py
new file mode 100644
index 0000000..9881313
--- /dev/null
+++ b/src/__init__.py
@@ -0,0 +1 @@
+# SPDX-License-Identifier: Apache-2.0
diff --git a/src/extensions/chunking/ChunkDocument.py 
b/src/extensions/chunking/ChunkDocument.py
new file mode 100644
index 0000000..576e914
--- /dev/null
+++ b/src/extensions/chunking/ChunkDocument.py
@@ -0,0 +1,277 @@
+# SPDX-License-Identifier: Apache-2.0
+
+import json
+
+from langchain.text_splitter import Language
+from nifiapi.documentation import ProcessorConfiguration, 
multi_processor_use_case, use_case
+from nifiapi.flowfiletransform import FlowFileTransform, 
FlowFileTransformResult
+from nifiapi.properties import ExpressionLanguageScope, PropertyDependency, 
PropertyDescriptor, StandardValidators
+
+SPLIT_BY_CHARACTER = "Split by Character"
+SPLIT_CODE = "Split Code"
+RECURSIVELY_SPLIT_BY_CHARACTER = "Recursively Split by Character"
+
+TEXT_KEY = "text"
+METADATA_KEY = "metadata"
+
+
+@use_case(
+    description="Create chunks of text from a single larger chunk.",
+    notes="The input for this use case is expected to be a FlowFile whose 
content is a JSON Lines document, with each line having a 'text' and a 
'metadata' element.",
+    keywords=["embedding", "vector", "text", "rag", "retrieval augmented 
generation"],
+    configuration="""
+        Set "Input Format" to "Plain Text"
+        Set "Element Strategy" to "Single Document"
+        """,
+)
+@multi_processor_use_case(
+    description="""
+        Chunk Plaintext data in order to prepare it for storage in a vector 
store. The output is in "json-lines" format,
+        containing the chunked data as text, as well as metadata pertaining to 
the chunk.""",
+    notes="The input for this use case is expected to be a FlowFile whose 
content is a plaintext document.",
+    keywords=["embedding", "vector", "text", "rag", "retrieval augmented 
generation"],
+    configurations=[
+        ProcessorConfiguration(
+            processor_type="ParseDocument",
+            configuration="""
+                  Set "Input Format" to "Plain Text"
+                  Set "Element Strategy" to "Single Document"
+
+                  Connect the 'success' Relationship to ChunkDocument.
+                  """,
+        ),
+        ProcessorConfiguration(
+            processor_type="ChunkDocument",
+            configuration="""
+                  Set the following properties:
+                    "Chunking Strategy" = "Recursively Split by Character"
+                    "Separator" = "\\n\\n,\\n, ,"
+                    "Separator Format" = "Plain Text"
+                    "Chunk Size" = "4000"
+                    "Chunk Overlap" = "200"
+                    "Keep Separator" = "false"
+
+                  Connect the 'success' Relationship to the appropriate 
destination to store data in the desired vector store.
+                  """,
+        ),
+    ],
+)
+@multi_processor_use_case(
+    description="""
+        Parse and chunk the textual contents of a PDF document in order to 
prepare it for storage in a vector store. The output is in "json-lines" format,
+        containing the chunked data as text, as well as metadata pertaining to 
the chunk.""",
+    notes="The input for this use case is expected to be a FlowFile whose 
content is a PDF document.",
+    keywords=["pdf", "embedding", "vector", "text", "rag", "retrieval 
augmented generation"],
+    configurations=[
+        ProcessorConfiguration(
+            processor_type="ParseDocument",
+            configuration="""
+                  Set "Input Format" to "PDF"
+                  Set "Element Strategy" to "Single Document"
+                  Set "Include Extracted Metadata" to "false"
+
+                  Connect the 'success' Relationship to ChunkDocument.
+                  """,
+        ),
+        ProcessorConfiguration(
+            processor_type="ChunkDocument",
+            configuration="""
+                  Set the following properties:
+                    "Chunking Strategy" = "Recursively Split by Character"
+                    "Separator" = "\\n\\n,\\n, ,"
+                    "Separator Format" = "Plain Text"
+                    "Chunk Size" = "4000"
+                    "Chunk Overlap" = "200"
+                    "Keep Separator" = "false"
+
+                  Connect the 'success' Relationship to the appropriate 
destination to store data in the desired vector store.
+                  """,
+        ),
+    ],
+)
+class ChunkDocument(FlowFileTransform):
+    class Java:
+        implements = ["org.apache.nifi.python.processor.FlowFileTransform"]
+
+    class ProcessorDetails:
+        version = "2.0.0.dev0"
+        description = """Chunks incoming documents that are formatted as JSON 
Lines into chunks that are appropriately sized for creating Text Embeddings.
+            The input is expected to be in "json-lines" format, with each line 
having a 'text' and a 'metadata' element.
+            Each line will then be split into one or more lines in the 
output."""
+        tags = [
+            "text",
+            "split",
+            "chunk",
+            "langchain",
+            "embeddings",
+            "vector",
+            "machine learning",
+            "ML",
+            "artificial intelligence",
+            "ai",
+            "document",
+        ]
+        dependencies = ["langchain"]
+
+    CHUNK_STRATEGY = PropertyDescriptor(
+        name="Chunking Strategy",
+        description="Specifies which splitter should be used to split the 
text",
+        allowable_values=[RECURSIVELY_SPLIT_BY_CHARACTER, SPLIT_BY_CHARACTER, 
SPLIT_CODE],
+        required=True,
+        default_value=RECURSIVELY_SPLIT_BY_CHARACTER,
+    )
+    SEPARATOR = PropertyDescriptor(
+        name="Separator",
+        description="""Specifies the character sequence to use for splitting 
apart the text. If using a Chunking Strategy of Recursively Split by Character,
+                    it is a comma-separated list of character sequences. 
Meta-characters \\n, \\r and \\t are automatically un-escaped.""",
+        required=True,
+        default_value="\\n\\n,\\n, ,",
+        validators=[StandardValidators.NON_EMPTY_VALIDATOR],
+        dependencies=[PropertyDependency(CHUNK_STRATEGY, SPLIT_BY_CHARACTER, 
RECURSIVELY_SPLIT_BY_CHARACTER)],
+        expression_language_scope=ExpressionLanguageScope.FLOWFILE_ATTRIBUTES,
+    )
+    SEPARATOR_FORMAT = PropertyDescriptor(
+        name="Separator Format",
+        description="Specifies how to interpret the value of the <Separator> 
property",
+        required=True,
+        default_value="Plain Text",
+        allowable_values=["Plain Text", "Regular Expression"],
+        dependencies=[PropertyDependency(CHUNK_STRATEGY, SPLIT_BY_CHARACTER, 
RECURSIVELY_SPLIT_BY_CHARACTER)],
+    )
+    CHUNK_SIZE = PropertyDescriptor(
+        name="Chunk Size",
+        description="The maximum size of a chunk that should be returned",
+        required=True,
+        default_value="4000",
+        validators=[StandardValidators.POSITIVE_INTEGER_VALIDATOR],
+    )
+    CHUNK_OVERLAP = PropertyDescriptor(
+        name="Chunk Overlap",
+        description="The number of characters that should be overlapped 
between each chunk of text",
+        required=True,
+        default_value="200",
+        validators=[StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR],
+    )
+    KEEP_SEPARATOR = PropertyDescriptor(
+        name="Keep Separator",
+        description="Whether or not to keep the text separator in each chunk 
of data",
+        required=True,
+        default_value="false",
+        allowable_values=["true", "false"],
+        dependencies=[PropertyDependency(CHUNK_STRATEGY, SPLIT_BY_CHARACTER, 
RECURSIVELY_SPLIT_BY_CHARACTER)],
+    )
+    STRIP_WHITESPACE = PropertyDescriptor(
+        name="Strip Whitespace",
+        description="Whether or not to strip the whitespace at the beginning 
and end of each chunk",
+        required=True,
+        default_value="true",
+        allowable_values=["true", "false"],
+        dependencies=[PropertyDependency(CHUNK_STRATEGY, SPLIT_BY_CHARACTER, 
RECURSIVELY_SPLIT_BY_CHARACTER)],
+    )
+    LANGUAGE = PropertyDescriptor(
+        name="Language",
+        description="The language to use for the Code's syntax",
+        required=True,
+        default_value="python",
+        allowable_values=[e.value for e in Language],
+        dependencies=[PropertyDependency(CHUNK_STRATEGY, SPLIT_CODE)],
+    )
+
+    property_descriptors = [
+        CHUNK_STRATEGY,
+        SEPARATOR,
+        SEPARATOR_FORMAT,
+        CHUNK_SIZE,
+        CHUNK_OVERLAP,
+        KEEP_SEPARATOR,
+        STRIP_WHITESPACE,
+        LANGUAGE,
+    ]
+
+    def __init__(self, **kwargs):
+        pass
+
+    def getPropertyDescriptors(self):
+        return self.property_descriptors
+
+    def split_docs(self, context, flowfile, documents):
+        from langchain.text_splitter import CharacterTextSplitter, 
RecursiveCharacterTextSplitter
+
+        strategy = context.getProperty(self.CHUNK_STRATEGY).getValue()
+        if strategy == SPLIT_BY_CHARACTER:
+            text_splitter = CharacterTextSplitter(
+                
separator=context.getProperty(self.SEPARATOR).evaluateAttributeExpressions(flowfile).getValue(),
+                
keep_separator=context.getProperty(self.KEEP_SEPARATOR).asBoolean(),
+                
is_separator_regex=context.getProperty(self.SEPARATOR_FORMAT).getValue() == 
"Regular Expression",
+                chunk_size=context.getProperty(self.CHUNK_SIZE).asInteger(),
+                
chunk_overlap=context.getProperty(self.CHUNK_OVERLAP).asInteger(),
+                length_function=len,
+                
strip_whitespace=context.getProperty(self.STRIP_WHITESPACE).asBoolean(),
+            )
+        elif strategy == SPLIT_CODE:
+            text_splitter = RecursiveCharacterTextSplitter.from_language(
+                language=context.getProperty(self.LANGUAGE).getValue(),
+                chunk_size=context.getProperty(self.CHUNK_SIZE).asInteger(),
+                
chunk_overlap=context.getProperty(self.CHUNK_OVERLAP).asInteger(),
+            )
+        else:
+            separator_text = 
context.getProperty(self.SEPARATOR).evaluateAttributeExpressions(flowfile).getValue()
+            splits = separator_text.split(",")
+            unescaped = []
+            for split in splits:
+                unescaped.append(split.replace("\\n", "\n").replace("\\r", 
"\r").replace("\\t", "\t"))
+            text_splitter = RecursiveCharacterTextSplitter(
+                separators=unescaped,
+                
keep_separator=context.getProperty(self.KEEP_SEPARATOR).asBoolean(),
+                
is_separator_regex=context.getProperty(self.SEPARATOR_FORMAT).getValue() == 
"Regular Expression",
+                chunk_size=context.getProperty(self.CHUNK_SIZE).asInteger(),
+                
chunk_overlap=context.getProperty(self.CHUNK_OVERLAP).asInteger(),
+                length_function=len,
+                
strip_whitespace=context.getProperty(self.STRIP_WHITESPACE).asBoolean(),
+            )
+
+        return text_splitter.split_documents(documents)
+
+    def to_json(self, docs) -> str:
+        json_docs = []
+
+        for i, doc in enumerate(docs):
+            doc.metadata["chunk_index"] = i
+            doc.metadata["chunk_count"] = len(docs)
+
+            json_doc = json.dumps({TEXT_KEY: doc.page_content, METADATA_KEY: 
doc.metadata})
+            json_docs.append(json_doc)
+
+        return "\n".join(json_docs)
+
+    def load_docs(self, flowfile):
+        from langchain.schema import Document
+
+        flowfile_contents = flowfile.getContentsAsBytes().decode()
+        docs = []
+        for line in flowfile_contents.split("\n"):
+            stripped = line.strip()
+            if stripped == "":
+                continue
+
+            json_element = json.loads(stripped)
+            page_content = json_element.get(TEXT_KEY)
+            if page_content is None:
+                continue
+
+            metadata = json_element.get(METADATA_KEY)
+            if metadata is None:
+                metadata = {}
+
+            doc = Document(page_content=page_content, metadata=metadata)
+            docs.append(doc)
+
+        return docs
+
+    def transform(self, context, flowfile):
+        documents = self.load_docs(flowfile)
+        split_docs = self.split_docs(context, flowfile, documents)
+
+        output_json = self.to_json(split_docs)
+        attributes = {"document.count": str(len(split_docs))}
+        return FlowFileTransformResult("success", contents=output_json, 
attributes=attributes)
diff --git a/src/extensions/chunking/ParseDocument.py 
b/src/extensions/chunking/ParseDocument.py
new file mode 100644
index 0000000..2a2d05e
--- /dev/null
+++ b/src/extensions/chunking/ParseDocument.py
@@ -0,0 +1,314 @@
+# SPDX-License-Identifier: Apache-2.0
+
+import io
+import json
+
+from nifiapi.flowfiletransform import FlowFileTransform, 
FlowFileTransformResult
+from nifiapi.properties import PropertyDependency, PropertyDescriptor, 
StandardValidators
+
+PLAIN_TEXT = "Plain Text"
+HTML = "HTML"
+MARKDOWN = "Markdown"
+PDF = "PDF"
+EXCEL = "Microsoft Excel"
+POWERPOINT = "Microsoft PowerPoint"
+WORD = "Microsoft Word"
+
+PARSING_STRATEGY_AUTO = "Automatic"
+PARSING_STRATEGY_HIGH_RES = "High Resolution"
+PARSING_STRATEGY_OCR_ONLY = "OCR Only"
+PARSING_STRATEGY_FAST = "Fast"
+
+SINGLE_DOCUMENT = "Single Document"
+DOCUMENT_PER_ELEMENT = "Document Per Element"
+
+TEXT_KEY = "text"
+METADATA_KEY = "metadata"
+
+
+class ParseDocument(FlowFileTransform):
+    class Java:
+        implements = ["org.apache.nifi.python.processor.FlowFileTransform"]
+
+    class ProcessorDetails:
+        version = "2.0.0.dev0"
+        description = """Parses incoming unstructured text documents and 
performs optical character recognition (OCR) in order to extract text from PDF 
and image files.
+            The output is formatted as "json-lines" with two keys: 'text' and 
'metadata'.
+            Note that use of this Processor may require significant storage 
space and RAM utilization due to third-party dependencies necessary for 
processing PDF and image files.
+            Also note that in order to process PDF or Images, Tesseract and 
Poppler must be installed on the system."""
+        tags = [
+            "text",
+            "embeddings",
+            "vector",
+            "machine learning",
+            "ML",
+            "artificial intelligence",
+            "ai",
+            "document",
+            "langchain",
+            "pdf",
+            "html",
+            "markdown",
+            "word",
+            "excel",
+            "powerpoint",
+        ]
+        dependencies = [
+            "pikepdf==8.12.0",
+            "pypdf==4.0.1",
+            "langchain==0.1.7",
+            "unstructured==0.14.8",
+            "unstructured-inference==0.7.36",
+            "unstructured_pytesseract==0.3.12",
+            "pillow-heif==0.15.0",
+            "numpy==1.26.4",
+            "opencv-python==4.9.0.80",
+            "pdf2image==1.17.0",
+            "pdfminer.six==20221105",
+            "python-docx==1.1.0",
+            "openpyxl==3.1.2",
+            "python-pptx==0.6.23",
+        ]
+
+    INPUT_FORMAT = PropertyDescriptor(
+        name="Input Format",
+        description="""The format of the input FlowFile. This dictates which 
TextLoader will be used to parse the input.
+            Note that in order to process images or extract tables from PDF 
files,you must have both 'poppler' and 'tesseract' installed on your system.""",
+        allowable_values=[PLAIN_TEXT, HTML, MARKDOWN, PDF, WORD, EXCEL, 
POWERPOINT],
+        required=True,
+        default_value=PLAIN_TEXT,
+    )
+    PDF_PARSING_STRATEGY = PropertyDescriptor(
+        name="PDF Parsing Strategy",
+        display_name="Parsing Strategy",
+        description="Specifies the strategy to use when parsing a PDF",
+        allowable_values=[
+            PARSING_STRATEGY_AUTO,
+            PARSING_STRATEGY_HIGH_RES,
+            PARSING_STRATEGY_OCR_ONLY,
+            PARSING_STRATEGY_FAST,
+        ],
+        required=True,
+        default_value=PARSING_STRATEGY_AUTO,
+        dependencies=[PropertyDependency(INPUT_FORMAT, PDF)],
+    )
+    PDF_MODEL_NAME = PropertyDescriptor(
+        name="PDF Parsing Model",
+        description="The model to use for parsing. Different models will have 
their own strengths and weaknesses.",
+        allowable_values=["yolox", "detectron2_onnx", "chipper"],
+        required=True,
+        default_value="yolox",
+        dependencies=[PropertyDependency(INPUT_FORMAT, PDF)],
+    )
+    ELEMENT_STRATEGY = PropertyDescriptor(
+        name="Element Strategy",
+        description="Specifies whether the input should be loaded as a single 
Document, or if each element in the input should be separated out into its own 
Document",
+        allowable_values=[SINGLE_DOCUMENT, DOCUMENT_PER_ELEMENT],
+        required=True,
+        default_value=DOCUMENT_PER_ELEMENT,
+        dependencies=[PropertyDependency(INPUT_FORMAT, HTML, MARKDOWN)],
+    )
+    INCLUDE_PAGE_BREAKS = PropertyDescriptor(
+        name="Include Page Breaks",
+        description="Specifies whether or not page breaks should be considered 
when creating Documents from the input",
+        allowable_values=["true", "false"],
+        required=True,
+        default_value="false",
+        dependencies=[
+            PropertyDependency(INPUT_FORMAT, HTML, MARKDOWN),
+            PropertyDependency(ELEMENT_STRATEGY, DOCUMENT_PER_ELEMENT),
+        ],
+    )
+    PDF_INFER_TABLE_STRUCTURE = PropertyDescriptor(
+        name="Infer Table Structure",
+        description="If true, any table that is identified in the PDF will be 
parsed and translated into an HTML structure. The HTML of that table will then 
be added to the \
+                    Document's metadata in a key named 'text_as_html'. 
Regardless of the value of this property, the textual contents of the table 
will be written to the contents \
+                    without the structure.",
+        allowable_values=["true", "false"],
+        default_value="false",
+        required=True,
+        dependencies=[PropertyDependency(PDF_PARSING_STRATEGY, 
PARSING_STRATEGY_HIGH_RES)],
+    )
+    LANGUAGES = PropertyDescriptor(
+        name="Languages",
+        description="A comma-separated list of language codes that should be 
used when using OCR to determine the text.",
+        validators=[StandardValidators.NON_EMPTY_VALIDATOR],
+        default_value="Eng",
+        required=True,
+        dependencies=[PropertyDependency(INPUT_FORMAT, PDF)],
+    )
+    METADATA_FIELDS = PropertyDescriptor(
+        name="Metadata Fields",
+        description="A comma-separated list of FlowFile attributes that will 
be added to the Documents' Metadata",
+        validators=[StandardValidators.NON_EMPTY_VALIDATOR],
+        default_value="filename, uuid",
+        required=True,
+    )
+    EXTRACT_METADATA = PropertyDescriptor(
+        name="Include Extracted Metadata",
+        description="Whether or not to include the metadata that is extracted 
from the input in each of the Documents",
+        allowable_values=["true", "false"],
+        default_value="true",
+        required=True,
+    )
+
+    property_descriptors = [
+        INPUT_FORMAT,
+        PDF_PARSING_STRATEGY,
+        PDF_MODEL_NAME,
+        ELEMENT_STRATEGY,
+        INCLUDE_PAGE_BREAKS,
+        PDF_INFER_TABLE_STRUCTURE,
+        LANGUAGES,
+        METADATA_FIELDS,
+        EXTRACT_METADATA,
+    ]
+
+    def __init__(self, **kwargs):
+        pass
+
+    def getPropertyDescriptors(self):
+        return self.property_descriptors
+
+    def get_parsing_strategy(self, nifi_value: str, default_value: str) -> str:
+        if nifi_value == PARSING_STRATEGY_OCR_ONLY:
+            return "ocr_only"
+        if nifi_value == PARSING_STRATEGY_HIGH_RES:
+            return "hi_res"
+        if nifi_value == PARSING_STRATEGY_FAST:
+            return "fast"
+        if nifi_value == PARSING_STRATEGY_AUTO:
+            return "auto"
+        return default_value
+
+    def get_languages(self, nifi_value: str) -> list[str]:
+        return [lang.strip() for lang in nifi_value.split(",")]
+
+    def create_docs(self, context, flowFile):
+        from langchain.schema import Document
+
+        metadata = {}
+
+        for attribute_name in 
context.getProperty(self.METADATA_FIELDS).getValue().split(","):
+            trimmed = attribute_name.strip()
+            value = flowFile.getAttribute(trimmed)
+            metadata[trimmed] = value
+
+        input_format = 
context.getProperty(self.INPUT_FORMAT).evaluateAttributeExpressions(flowFile).getValue()
+        if input_format == PLAIN_TEXT:
+            return 
[Document(page_content=flowFile.getContentsAsBytes().decode("utf-8"), 
metadata=metadata)]
+
+        element_strategy = 
context.getProperty(self.ELEMENT_STRATEGY).getValue()
+        mode = "single" if element_strategy == SINGLE_DOCUMENT else "elements"
+
+        include_page_breaks = 
context.getProperty(self.INCLUDE_PAGE_BREAKS).asBoolean()
+        include_metadata = 
context.getProperty(self.EXTRACT_METADATA).asBoolean()
+
+        if input_format == HTML:
+            from langchain.document_loaders import UnstructuredHTMLLoader
+
+            loader = UnstructuredHTMLLoader(
+                None,
+                file=io.BytesIO(flowFile.getContentsAsBytes()),
+                mode=mode,
+                include_page_breaks=include_page_breaks,
+                include_metadata=include_metadata,
+            )
+
+        elif input_format == PDF:
+            from langchain.document_loaders import UnstructuredPDFLoader
+
+            infer_table_structure = 
context.getProperty(self.PDF_INFER_TABLE_STRUCTURE).asBoolean()
+            strategy = self.get_parsing_strategy(
+                context.getProperty(self.PDF_PARSING_STRATEGY).getValue(), 
PARSING_STRATEGY_AUTO
+            )
+            languages = 
self.get_languages(context.getProperty(self.LANGUAGES).getValue())
+            model_name = context.getProperty(self.PDF_MODEL_NAME).getValue()
+
+            loader = UnstructuredPDFLoader(
+                None,
+                file=io.BytesIO(flowFile.getContentsAsBytes()),
+                mode=mode,
+                infer_table_structure=infer_table_structure,
+                include_page_breaks=include_page_breaks,
+                languages=languages,
+                strategy=strategy,
+                include_metadata=include_metadata,
+                model_name=model_name,
+            )
+
+        elif input_format == MARKDOWN:
+            from langchain.document_loaders import UnstructuredMarkdownLoader
+
+            loader = UnstructuredMarkdownLoader(
+                None,
+                file=io.BytesIO(flowFile.getContentsAsBytes()),
+                mode=mode,
+                include_page_breaks=include_page_breaks,
+                include_metadata=include_metadata,
+            )
+
+        elif input_format == WORD:
+            from langchain.document_loaders import 
UnstructuredWordDocumentLoader
+
+            loader = UnstructuredWordDocumentLoader(
+                None,
+                file=io.BytesIO(flowFile.getContentsAsBytes()),
+                mode=mode,
+                include_page_breaks=include_page_breaks,
+                include_metadata=include_metadata,
+            )
+
+        elif input_format == EXCEL:
+            from langchain.document_loaders import UnstructuredExcelLoader
+
+            loader = UnstructuredExcelLoader(
+                None,
+                file=io.BytesIO(flowFile.getContentsAsBytes()),
+                mode=mode,
+                include_page_breaks=include_page_breaks,
+                include_metadata=include_metadata,
+            )
+
+        elif input_format == POWERPOINT:
+            from langchain.document_loaders import UnstructuredPowerPointLoader
+
+            loader = UnstructuredPowerPointLoader(
+                None,
+                file=io.BytesIO(flowFile.getContentsAsBytes()),
+                mode=mode,
+                include_page_breaks=include_page_breaks,
+                include_metadata=include_metadata,
+            )
+
+        else:
+            raise ValueError("Configured Input Format is invalid: " + 
input_format)
+
+        documents = loader.load()
+
+        if len(metadata) > 0:
+            for doc in documents:
+                if doc.metadata is None:
+                    doc.metadata = metadata
+                else:
+                    doc.metadata.update(metadata)
+
+        return documents
+
+    def to_json(self, docs) -> str:
+        json_docs = []
+
+        for i, doc in enumerate(docs):
+            doc.metadata["chunk_index"] = i
+            doc.metadata["chunk_count"] = len(docs)
+
+            json_doc = json.dumps({"text": doc.page_content, "metadata": 
doc.metadata})
+            json_docs.append(json_doc)
+
+        return "\n".join(json_docs)
+
+    def transform(self, context, flowFile):
+        documents = self.create_docs(context, flowFile)
+        output_json = self.to_json(documents)
+
+        return FlowFileTransformResult("success", contents=output_json, 
attributes={"mime.type": "application/json"})
diff --git a/src/extensions/chunking/__init__.py 
b/src/extensions/chunking/__init__.py
new file mode 100644
index 0000000..9881313
--- /dev/null
+++ b/src/extensions/chunking/__init__.py
@@ -0,0 +1 @@
+# SPDX-License-Identifier: Apache-2.0
diff --git a/src/extensions/openai/PromptChatGPT.py 
b/src/extensions/openai/PromptChatGPT.py
new file mode 100644
index 0000000..a8470f4
--- /dev/null
+++ b/src/extensions/openai/PromptChatGPT.py
@@ -0,0 +1,219 @@
+# SPDX-License-Identifier: Apache-2.0
+
+import json
+import re
+
+from nifiapi.flowfiletransform import FlowFileTransform, 
FlowFileTransformResult
+from nifiapi.properties import ExpressionLanguageScope, PropertyDescriptor, 
StandardValidators, TimeUnit
+
+FLOWFILE_CONTENT = "flowfile_content"
+FLOWFILE_CONTENT_REFERENCE = "{" + FLOWFILE_CONTENT + "}"
+# Regex to match { followed by any number of characters other than { or }, 
followed by }. But do not match if it starts with {{
+VAR_NAME_REGEX = r"(?<!{)\{([^{]*?)\}"
+
+
+class PromptChatGPT(FlowFileTransform):
+    class Java:
+        implements = ["org.apache.nifi.python.processor.FlowFileTransform"]
+
+    class ProcessorDetails:
+        version = "2.0.0.dev0"
+        description = "Submits a prompt to ChatGPT, writing the results either 
to a FlowFile attribute or to the contents of the FlowFile"
+        tags = [
+            "text",
+            "chatgpt",
+            "gpt",
+            "machine learning",
+            "ML",
+            "artificial intelligence",
+            "ai",
+            "document",
+            "langchain",
+        ]
+        dependencies = ["langchain==0.1.2", "openai==1.9.0", "jsonpath-ng"]
+
+    MODEL = PropertyDescriptor(
+        name="OpenAI Model Name",
+        description="The name of the OpenAI Model to use in order to answer 
the prompt",
+        default_value="gpt-3.5-turbo",
+        expression_language_scope=ExpressionLanguageScope.FLOWFILE_ATTRIBUTES,
+        validators=[StandardValidators.NON_EMPTY_VALIDATOR],
+        required=True,
+    )
+    PROMPT = PropertyDescriptor(
+        name="Prompt",
+        description="""The prompt to issue to ChatGPT. This may use FlowFile 
attributes via Expression Language and may also reference the FlowFile content 
by using the literal
+                    {flowfile_content} (including braces) in the prompt. If 
the FlowFile's content is JSON formatted, a reference may also include JSONPath 
Expressions
+                    to reference specific fields in the FlowFile content, such 
as {$.page_content}""",
+        validators=[StandardValidators.NON_EMPTY_VALIDATOR],
+        expression_language_scope=ExpressionLanguageScope.FLOWFILE_ATTRIBUTES,
+        required=True,
+    )
+    TEMPERATURE = PropertyDescriptor(
+        name="Temperature",
+        description="""The Temperature parameter to submit to OpenAI. A lower 
value will result in more consistent answers while a higher value will result 
in a more creative answer.
+                    "The value must be between 0 and 2, inclusive.""",
+        
validators=[StandardValidators._standard_validators.createNonNegativeFloatingPointValidator(2.0)],
+        expression_language_scope=ExpressionLanguageScope.FLOWFILE_ATTRIBUTES,
+        required=True,
+        default_value="1.0",
+    )
+    RESULT_ATTRIBUTE = PropertyDescriptor(
+        name="Result Attribute",
+        description="If specified, the result will be added to the attribute 
whose name is given. If not specified, the result will be written to the 
FlowFile's content",
+        validators=[StandardValidators.NON_EMPTY_VALIDATOR],
+        required=False,
+    )
+    API_KEY = PropertyDescriptor(
+        name="API Key",
+        description="The OpenAI API Key to use",
+        validators=[StandardValidators.NON_EMPTY_VALIDATOR],
+        required=True,
+        sensitive=True,
+    )
+    TIMEOUT = PropertyDescriptor(
+        name="Request Timeout",
+        description="The amount of time to wait before timing out the request",
+        validators=[StandardValidators.TIME_PERIOD_VALIDATOR],
+        default_value="60 secs",
+        required=True,
+    )
+    MAX_TOKENS = PropertyDescriptor(
+        name="Max Tokens to Generate",
+        description="The maximum number of tokens that ChatGPT should 
generate",
+        validators=[StandardValidators.POSITIVE_INTEGER_VALIDATOR],
+        required=False,
+    )
+    ORGANIZATION = PropertyDescriptor(
+        name="OpenAI Organization ID",
+        description="The OpenAI Organization ID",
+        validators=[StandardValidators.NON_EMPTY_VALIDATOR],
+        required=False,
+    )
+    API_BASE = PropertyDescriptor(
+        name="API Base URL Path",
+        description="The API Base URL to use for interacting with OpenAI. This 
should be populated only if using a proxy or an emulator.",
+        validators=[StandardValidators.NON_EMPTY_VALIDATOR],
+        required=False,
+    )
+
+    property_descriptors = [
+        MODEL,
+        PROMPT,
+        TEMPERATURE,
+        RESULT_ATTRIBUTE,
+        API_KEY,
+        TIMEOUT,
+        MAX_TOKENS,
+        ORGANIZATION,
+        API_BASE,
+    ]
+
+    def __init__(self, **kwargs):
+        pass
+
+    def getPropertyDescriptors(self):
+        return self.property_descriptors
+
+    def transform(self, context, flowFile):
+        from langchain import PromptTemplate
+        from langchain.chains.llm import LLMChain
+        from langchain.chat_models import ChatOpenAI
+
+        prompt = 
context.getProperty(self.PROMPT).evaluateAttributeExpressions(flowFile).getValue()
+
+        # We want to allow referencing FlowFile content using JSONPath 
Expressions.
+        # To do that, we allow the same {variable} syntax as Langchain. But 
Langchain does not allow '$' characters
+        # to exist in the variable names. So we need to replace those 
variables in the prompt with new variables, such as
+        # jsonpath_var_0, jsonpath_var_1, etc. To do this, we will use a Regex 
to detect any variables that are referenced
+        # and if it starts with a $ we will replace it with 
jsonpath_var_<index> and we will keep a mapping from that name to
+        # the substituted variable name so that we can later determine what 
the JSONPath expression was.
+        variable_references = list(set(re.findall(VAR_NAME_REGEX, prompt)))
+
+        input_variables = []
+        jsonpath_to_var_mapping = {}
+        index = 0
+        for ref in variable_references:
+            if ref.startswith("$"):
+                var_name = "jsonpath_var_" + str(index)
+                index += 1
+                input_variables.append(var_name)
+                jsonpath_to_var_mapping[ref] = var_name
+                prompt = prompt.replace("{" + ref + "}", "{" + var_name + "}")
+            elif ref == FLOWFILE_CONTENT:
+                input_variables.append(ref)
+            else:
+                raise ValueError(
+                    "Prompt contained an invalid variable reference: {"
+                    + ref
+                    + "}. Valid references are flowfile_content or any 
JSONPath expression."
+                )
+
+        temperature = 
context.getProperty(self.TEMPERATURE).evaluateAttributeExpressions(flowFile).asFloat()
+        model_name = 
context.getProperty(self.MODEL).evaluateAttributeExpressions(flowFile).getValue()
+        api_key = context.getProperty(self.API_KEY).getValue()
+        timeout = 
context.getProperty(self.TIMEOUT).asTimePeriod(TimeUnit.SECONDS)
+        max_tokens = context.getProperty(self.MAX_TOKENS).asInteger()
+        organization = context.getProperty(self.ORGANIZATION).getValue()
+        api_base = context.getProperty(self.API_BASE).getValue()
+
+        # Build out our LLMChain
+        llm = ChatOpenAI(
+            model_name=model_name,
+            temperature=temperature,
+            openai_api_key=api_key,
+            request_timeout=timeout,
+            max_retries=0,
+            max_tokens=max_tokens,
+            openai_organization=organization,
+            openai_api_base=api_base,
+        )
+
+        prompt_template = PromptTemplate(template=prompt, 
input_variables=input_variables)
+
+        llm_chain = LLMChain(llm=llm, prompt=prompt_template)
+
+        # Substitute in any JSON Path Expressions or references to 
{flowfile_content}.
+        llm_args = {}
+        json_content = None
+        for var_name in variable_references:
+            # If variable references {flowfile_content} substitute the content
+            if var_name == FLOWFILE_CONTENT:
+                llm_args[FLOWFILE_CONTENT] = 
flowFile.getContentsAsBytes().decode()
+            if var_name.startswith("$"):
+                # Load the FlowFile's contents into the json_content variable 
only once
+                if json_content is None:
+                    json_content = 
json.loads(flowFile.getContentsAsBytes().decode())
+
+                # Import jsonpath_ng so that we can evaluate JSONPath against 
the FlowFile content.
+                from jsonpath_ng import parse
+
+                try:
+                    jsonpath_expression = parse(var_name)
+                    matches = jsonpath_expression.find(json_content)
+                    variable_value = "\n".join([match.value for match in 
matches])
+                except:
+                    self.logger.exception(f"Invalid JSONPath reference in 
prompt: {var_name}")
+                    raise
+
+                # Insert the resolved value into llm_args
+                resolved_var_name = jsonpath_to_var_mapping.get(var_name)
+                llm_args[resolved_var_name] = variable_value
+
+        self.logger.debug(f"Evaluating prompt\nPrompt: {prompt}\nArgs: 
#{llm_args}")
+
+        # Run the LLM Chain in order to prompt ChatGPT
+        results = llm_chain(llm_args)
+
+        # Create the output content or FLowFile attribute
+        text = results["text"]
+        attribute_name = context.getProperty(self.RESULT_ATTRIBUTE).getValue()
+        if attribute_name is None:
+            output_content = text
+            output_attributes = None
+        else:
+            output_content = None
+            output_attributes = {attribute_name: text}
+
+        # Return the results
+        return FlowFileTransformResult("success", contents=output_content, 
attributes=output_attributes)
diff --git a/src/extensions/openai/__init__.py 
b/src/extensions/openai/__init__.py
new file mode 100644
index 0000000..9881313
--- /dev/null
+++ b/src/extensions/openai/__init__.py
@@ -0,0 +1 @@
+# SPDX-License-Identifier: Apache-2.0
diff --git a/src/extensions/vectorstores/ChromaUtils.py 
b/src/extensions/vectorstores/ChromaUtils.py
new file mode 100644
index 0000000..4a5ada0
--- /dev/null
+++ b/src/extensions/vectorstores/ChromaUtils.py
@@ -0,0 +1,138 @@
+# SPDX-License-Identifier: Apache-2.0
+
+from nifiapi.properties import ExpressionLanguageScope, PropertyDependency, 
PropertyDescriptor, StandardValidators
+
+# Connection Strategies
+LOCAL_DISK = "Local Disk"
+REMOTE_SERVER = "Remote Chroma Server"
+
+# Authentication Strategies
+TOKEN = "Token Authentication"
+BASIC_AUTH = "Basic Authentication"
+NONE = "None"
+
+# Transport Protocols
+HTTP = "http"
+HTTPS = "https"
+
+CONNECTION_STRATEGY = PropertyDescriptor(
+    name="Connection Strategy",
+    description="Specifies how to connect to the Chroma server",
+    allowable_values=[LOCAL_DISK, REMOTE_SERVER],
+    default_value=REMOTE_SERVER,
+    required=True,
+)
+DIRECTORY = PropertyDescriptor(
+    name="Directory",
+    description="The Directory that Chroma should use to persist data",
+    validators=[StandardValidators.NON_EMPTY_VALIDATOR],
+    required=True,
+    default_value="./chroma",
+    dependencies=[PropertyDependency(CONNECTION_STRATEGY, LOCAL_DISK)],
+)
+HOSTNAME = PropertyDescriptor(
+    name="Hostname",
+    description="The hostname to connect to in order to communicate with 
Chroma",
+    validators=[StandardValidators.NON_EMPTY_VALIDATOR],
+    default_value="localhost",
+    required=True,
+    dependencies=[PropertyDependency(CONNECTION_STRATEGY, REMOTE_SERVER)],
+)
+PORT = PropertyDescriptor(
+    name="Port",
+    description="The port that the Chroma server is listening on",
+    validators=[StandardValidators.PORT_VALIDATOR],
+    default_value="8000",
+    required=True,
+    dependencies=[PropertyDependency(CONNECTION_STRATEGY, REMOTE_SERVER)],
+)
+TRANSPORT_PROTOCOL = PropertyDescriptor(
+    name="Transport Protocol",
+    description="Specifies whether connections should be made over http or 
https",
+    allowable_values=[HTTP, HTTPS],
+    default_value=HTTPS,
+    required=True,
+    dependencies=[PropertyDependency(CONNECTION_STRATEGY, REMOTE_SERVER)],
+)
+AUTH_STRATEGY = PropertyDescriptor(
+    name="Authentication Strategy",
+    description="Specifies how to authenticate to Chroma server",
+    allowable_values=[TOKEN, BASIC_AUTH, NONE],
+    default_value=TOKEN,
+    required=True,
+    dependencies=[PropertyDependency(CONNECTION_STRATEGY, REMOTE_SERVER)],
+)
+AUTH_TOKEN = PropertyDescriptor(
+    name="Authentication Token",
+    description="The token to use for authenticating to Chroma server",
+    validators=[StandardValidators.NON_EMPTY_VALIDATOR],
+    required=True,
+    sensitive=True,
+    dependencies=[PropertyDependency(AUTH_STRATEGY, TOKEN)],
+)
+USERNAME = PropertyDescriptor(
+    name="Username",
+    description="The username to use for authenticating to Chroma server",
+    validators=[StandardValidators.NON_EMPTY_VALIDATOR],
+    required=True,
+    dependencies=[PropertyDependency(AUTH_STRATEGY, BASIC_AUTH)],
+)
+PASSWORD = PropertyDescriptor(
+    name="Password",
+    description="The password to use for authenticating to Chroma server",
+    validators=[StandardValidators.NON_EMPTY_VALIDATOR],
+    required=True,
+    sensitive=True,
+    dependencies=[PropertyDependency(AUTH_STRATEGY, BASIC_AUTH)],
+)
+COLLECTION_NAME = PropertyDescriptor(
+    name="Collection Name",
+    description="The name of the Chroma Collection",
+    validators=[StandardValidators.NON_EMPTY_VALIDATOR],
+    required=True,
+    default_value="nifi",
+    expression_language_scope=ExpressionLanguageScope.FLOWFILE_ATTRIBUTES,
+)
+
+PROPERTIES = [
+    CONNECTION_STRATEGY,
+    DIRECTORY,
+    HOSTNAME,
+    PORT,
+    TRANSPORT_PROTOCOL,
+    AUTH_STRATEGY,
+    AUTH_TOKEN,
+    USERNAME,
+    PASSWORD,
+    COLLECTION_NAME,
+]
+
+
+def create_client(context):
+    import chromadb
+    from chromadb import Settings
+
+    connection_strategy = context.getProperty(CONNECTION_STRATEGY).getValue()
+    if connection_strategy == LOCAL_DISK:
+        directory = context.getProperty(DIRECTORY).getValue()
+        return chromadb.PersistentClient(directory)
+    hostname = context.getProperty(HOSTNAME).getValue()
+    port = context.getProperty(PORT).asInteger()
+    headers = {}
+    ssl = context.getProperty(TRANSPORT_PROTOCOL).getValue() == HTTPS
+
+    auth_strategy = context.getProperty(AUTH_STRATEGY).getValue()
+    if auth_strategy == TOKEN:
+        auth_provider = "chromadb.auth.token.TokenAuthClientProvider"
+        credentials = context.getProperty(AUTH_TOKEN).getValue()
+    elif auth_strategy == BASIC_AUTH:
+        auth_provider = "chromadb.auth.basic.BasicAuthClientProvider"
+        username = context.getProperty(USERNAME).getValue()
+        password = context.getProperty(PASSWORD).getValue()
+        credentials = username + ":" + password
+    else:
+        auth_provider = None
+        credentials = None
+
+    settings = Settings(chroma_client_auth_provider=auth_provider, 
chroma_client_auth_credentials=credentials)
+    return chromadb.HttpClient(hostname, port, ssl, headers, settings)
diff --git a/src/extensions/vectorstores/EmbeddingUtils.py 
b/src/extensions/vectorstores/EmbeddingUtils.py
new file mode 100644
index 0000000..df45086
--- /dev/null
+++ b/src/extensions/vectorstores/EmbeddingUtils.py
@@ -0,0 +1,165 @@
+# SPDX-License-Identifier: Apache-2.0
+
+from langchain.embeddings.huggingface import HuggingFaceInferenceAPIEmbeddings
+from langchain.embeddings.openai import OpenAIEmbeddings
+from nifiapi.properties import PropertyDependency, PropertyDescriptor, 
StandardValidators
+
+# Embedding Functions
+ONNX_ALL_MINI_LM_L6_V2 = "ONNX all-MiniLM-L6-v2 Model"
+HUGGING_FACE = "Hugging Face Model"
+OPENAI = "OpenAI Model"
+SENTENCE_TRANSFORMERS = "Sentence Transformers"
+
+
+EMBEDDING_FUNCTION = PropertyDescriptor(
+    name="Embedding Function",
+    description="Specifies which embedding function should be used in order to 
create embeddings from incoming Documents",
+    allowable_values=[ONNX_ALL_MINI_LM_L6_V2, HUGGING_FACE, OPENAI, 
SENTENCE_TRANSFORMERS],
+    default_value=ONNX_ALL_MINI_LM_L6_V2,
+    required=True,
+)
+HUGGING_FACE_MODEL_NAME = PropertyDescriptor(
+    name="HuggingFace Model Name",
+    description="The name of the HuggingFace model to use",
+    validators=[StandardValidators.NON_EMPTY_VALIDATOR],
+    default_value="sentence-transformers/all-MiniLM-L6-v2",
+    required=True,
+    dependencies=[PropertyDependency(EMBEDDING_FUNCTION, HUGGING_FACE)],
+)
+HUGGING_FACE_API_KEY = PropertyDescriptor(
+    name="HuggingFace API Key",
+    description="The API Key for interacting with HuggingFace",
+    validators=[StandardValidators.NON_EMPTY_VALIDATOR],
+    required=True,
+    sensitive=True,
+    dependencies=[PropertyDependency(EMBEDDING_FUNCTION, HUGGING_FACE)],
+)
+OPENAI_API_KEY = PropertyDescriptor(
+    name="OpenAI API Key",
+    description="The API Key for interacting with OpenAI",
+    validators=[StandardValidators.NON_EMPTY_VALIDATOR],
+    required=True,
+    sensitive=True,
+    dependencies=[PropertyDependency(EMBEDDING_FUNCTION, OPENAI)],
+)
+OPENAI_MODEL_NAME = PropertyDescriptor(
+    name="OpenAI Model Name",
+    description="The name of the OpenAI model to use",
+    validators=[StandardValidators.NON_EMPTY_VALIDATOR],
+    default_value="text-embedding-ada-002",
+    required=True,
+    dependencies=[PropertyDependency(EMBEDDING_FUNCTION, OPENAI)],
+)
+OPENAI_ORGANIZATION = PropertyDescriptor(
+    name="OpenAI Organization ID",
+    description="The OpenAI Organization ID",
+    validators=[StandardValidators.NON_EMPTY_VALIDATOR],
+    required=False,
+    dependencies=[PropertyDependency(EMBEDDING_FUNCTION, OPENAI)],
+)
+OPENAI_API_BASE = PropertyDescriptor(
+    name="OpenAI API Base Path",
+    description="The API Base to use for interacting with OpenAI. This is used 
for interacting with different deployments, such as an Azure deployment.",
+    validators=[StandardValidators.NON_EMPTY_VALIDATOR],
+    required=False,
+    dependencies=[PropertyDependency(EMBEDDING_FUNCTION, OPENAI)],
+)
+OPENAI_API_TYPE = PropertyDescriptor(
+    name="OpenAI API Deployment Type",
+    description="The type of the OpenAI API Deployment. This is used for 
interacting with different deployments, such as an Azure deployment.",
+    validators=[StandardValidators.NON_EMPTY_VALIDATOR],
+    required=False,
+    dependencies=[PropertyDependency(EMBEDDING_FUNCTION, OPENAI)],
+)
+OPENAI_API_VERSION = PropertyDescriptor(
+    name="OpenAI API Version",
+    description="The OpenAI API Version. This is used for interacting with 
different deployments, such as an Azure deployment.",
+    validators=[StandardValidators.NON_EMPTY_VALIDATOR],
+    required=False,
+    dependencies=[PropertyDependency(EMBEDDING_FUNCTION, OPENAI)],
+)
+SENTENCE_TRANSFORMER_MODEL_NAME = PropertyDescriptor(
+    name="Sentence Transformer Model Name",
+    description="The name of the Sentence Transformer model to use",
+    validators=[StandardValidators.NON_EMPTY_VALIDATOR],
+    default_value="all-MiniLM-L6-v2",
+    required=True,
+    dependencies=[PropertyDependency(EMBEDDING_FUNCTION, 
SENTENCE_TRANSFORMERS)],
+)
+SENTENCE_TRANSFORMER_DEVICE = PropertyDescriptor(
+    name="Sentence Transformer Device Type",
+    description="""The type of device to use for performing the embeddings 
using the Sentence Transformer, such as 'cpu', 'cuda', 'mps', 'cuda:0', etc.
+                   If not specified, a GPU will be used if possible, otherwise 
a CPU.""",
+    validators=[StandardValidators.NON_EMPTY_VALIDATOR],
+    required=False,
+    dependencies=[PropertyDependency(EMBEDDING_FUNCTION, 
SENTENCE_TRANSFORMERS)],
+)
+EMBEDDING_MODEL = PropertyDescriptor(
+    name="Embedding Model",
+    description="Specifies which embedding model should be used in order to 
create embeddings from incoming Documents. Default model is OpenAI.",
+    allowable_values=[HUGGING_FACE, OPENAI],
+    default_value=OPENAI,
+    required=True,
+)
+PROPERTIES = [
+    EMBEDDING_FUNCTION,
+    HUGGING_FACE_MODEL_NAME,
+    HUGGING_FACE_API_KEY,
+    OPENAI_MODEL_NAME,
+    OPENAI_API_KEY,
+    OPENAI_ORGANIZATION,
+    OPENAI_API_BASE,
+    OPENAI_API_TYPE,
+    OPENAI_API_VERSION,
+    SENTENCE_TRANSFORMER_MODEL_NAME,
+    SENTENCE_TRANSFORMER_DEVICE,
+    EMBEDDING_MODEL,
+]
+
+
+def create_embedding_function(context):
+    from chromadb.utils.embedding_functions import (
+        HuggingFaceEmbeddingFunction,
+        ONNXMiniLM_L6_V2,
+        OpenAIEmbeddingFunction,
+        SentenceTransformerEmbeddingFunction,
+    )
+
+    function_name = context.getProperty(EMBEDDING_FUNCTION).getValue()
+    if function_name == ONNX_ALL_MINI_LM_L6_V2:
+        return ONNXMiniLM_L6_V2()
+
+    if function_name == OPENAI:
+        api_key = context.getProperty(OPENAI_API_KEY).getValue()
+        model_name = context.getProperty(OPENAI_MODEL_NAME).getValue()
+        organization_id = context.getProperty(OPENAI_ORGANIZATION).getValue()
+        api_base = context.getProperty(OPENAI_API_BASE).getValue()
+        api_type = context.getProperty(OPENAI_API_TYPE).getValue()
+        api_version = context.getProperty(OPENAI_API_VERSION).getValue()
+        return OpenAIEmbeddingFunction(
+            api_key=api_key,
+            model_name=model_name,
+            organization_id=organization_id,
+            api_base=api_base,
+            api_type=api_type,
+            api_version=api_version,
+        )
+
+    if function_name == HUGGING_FACE:
+        api_key = context.getProperty(HUGGING_FACE_API_KEY).getValue()
+        model_name = context.getProperty(HUGGING_FACE_MODEL_NAME).getValue()
+        return HuggingFaceEmbeddingFunction(api_key=api_key, 
model_name=model_name)
+
+    model_name = 
context.getProperty(SENTENCE_TRANSFORMER_MODEL_NAME).getValue()
+    device = context.getProperty(SENTENCE_TRANSFORMER_DEVICE).getValue()
+    return SentenceTransformerEmbeddingFunction(model_name=model_name, 
device=device)
+
+
+def create_embedding_service(context):
+    embedding_service = context.getProperty(EMBEDDING_MODEL).getValue()
+
+    if embedding_service == OPENAI:
+        openai_api_key = context.getProperty(OPENAI_API_KEY).getValue()
+        return OpenAIEmbeddings(openai_api_key=openai_api_key)
+    huggingface_api_key = context.getProperty(HUGGING_FACE_API_KEY).getValue()
+    return HuggingFaceInferenceAPIEmbeddings(api_key=huggingface_api_key)
diff --git a/src/extensions/vectorstores/OpenSearchVectorUtils.py 
b/src/extensions/vectorstores/OpenSearchVectorUtils.py
new file mode 100644
index 0000000..4f527e0
--- /dev/null
+++ b/src/extensions/vectorstores/OpenSearchVectorUtils.py
@@ -0,0 +1,130 @@
+# SPDX-License-Identifier: Apache-2.0
+
+from EmbeddingUtils import EMBEDDING_MODEL, HUGGING_FACE, OPENAI
+from nifiapi.properties import ExpressionLanguageScope, PropertyDependency, 
PropertyDescriptor, StandardValidators
+
+# Space types
+L2 = ("L2 (Euclidean distance)", "l2")
+L1 = ("L1 (Manhattan distance)", "l1")
+LINF = ("L-infinity (chessboard) distance", "linf")
+COSINESIMIL = ("Cosine similarity", "cosinesimil")
+
+HUGGING_FACE_API_KEY = PropertyDescriptor(
+    name="HuggingFace API Key",
+    description="The API Key for interacting with HuggingFace",
+    required=True,
+    sensitive=True,
+    validators=[StandardValidators.NON_EMPTY_VALIDATOR],
+    dependencies=[PropertyDependency(EMBEDDING_MODEL, HUGGING_FACE)],
+)
+HUGGING_FACE_MODEL = PropertyDescriptor(
+    name="HuggingFace Model",
+    description="The name of the HuggingFace model to use",
+    default_value="sentence-transformers/all-MiniLM-L6-v2",
+    required=True,
+    validators=[StandardValidators.NON_EMPTY_VALIDATOR],
+    dependencies=[PropertyDependency(EMBEDDING_MODEL, HUGGING_FACE)],
+)
+OPENAI_API_KEY = PropertyDescriptor(
+    name="OpenAI API Key",
+    description="The API Key for OpenAI in order to create embeddings",
+    required=True,
+    sensitive=True,
+    validators=[StandardValidators.NON_EMPTY_VALIDATOR],
+    dependencies=[PropertyDependency(EMBEDDING_MODEL, OPENAI)],
+)
+OPENAI_API_MODEL = PropertyDescriptor(
+    name="OpenAI Model",
+    description="The API Key for OpenAI in order to create embeddings",
+    default_value="text-embedding-ada-002",
+    required=True,
+    validators=[StandardValidators.NON_EMPTY_VALIDATOR],
+    dependencies=[PropertyDependency(EMBEDDING_MODEL, OPENAI)],
+)
+HTTP_HOST = PropertyDescriptor(
+    name="HTTP Host",
+    description="URL where OpenSearch is hosted.",
+    default_value="http://localhost:9200";,
+    required=True,
+    validators=[StandardValidators.URL_VALIDATOR],
+)
+USERNAME = PropertyDescriptor(
+    name="Username",
+    description="The username to use for authenticating to OpenSearch server",
+    required=False,
+    validators=[StandardValidators.NON_EMPTY_VALIDATOR],
+)
+PASSWORD = PropertyDescriptor(
+    name="Password",
+    description="The password to use for authenticating to OpenSearch server",
+    required=False,
+    sensitive=True,
+    validators=[StandardValidators.NON_EMPTY_VALIDATOR],
+)
+INDEX_NAME = PropertyDescriptor(
+    name="Index Name",
+    description="The name of the OpenSearch index.",
+    sensitive=False,
+    required=True,
+    validators=[StandardValidators.NON_EMPTY_VALIDATOR],
+    expression_language_scope=ExpressionLanguageScope.FLOWFILE_ATTRIBUTES,
+)
+VECTOR_FIELD = PropertyDescriptor(
+    name="Vector Field Name",
+    description="The name of field in the document where the embeddings are 
stored. This field need to be a 'knn_vector' typed field.",
+    default_value="vector_field",
+    required=True,
+    validators=[StandardValidators.NON_EMPTY_VALIDATOR],
+    expression_language_scope=ExpressionLanguageScope.FLOWFILE_ATTRIBUTES,
+)
+TEXT_FIELD = PropertyDescriptor(
+    name="Text Field Name",
+    description="The name of field in the document where the text is stored.",
+    default_value="text",
+    required=True,
+    validators=[StandardValidators.NON_EMPTY_VALIDATOR],
+    expression_language_scope=ExpressionLanguageScope.FLOWFILE_ATTRIBUTES,
+)
+
+
+def create_authentication_params(context):
+    username = context.getProperty(USERNAME).getValue()
+    password = context.getProperty(PASSWORD).getValue()
+
+    params = {"verify_certs": "true"}
+
+    if username is not None and password is not None:
+        params["http_auth"] = (username, password)
+
+    return params
+
+
+def parse_documents(json_lines, id_field_name, file_name):
+    import json
+
+    texts = []
+    metadatas = []
+    ids = []
+    for i, line in enumerate(json_lines.split("\n"), start=1):
+        try:
+            doc = json.loads(line)
+        except Exception as e:
+            message = f"Could not parse line {i} as JSON"
+            raise ValueError(message) from e
+
+        text = doc.get("text")
+        metadata = doc.get("metadata")
+        texts.append(text)
+
+        # Remove any null values, or it will cause the embedding to fail
+        filtered_metadata = {key: value for key, value in metadata.items() if 
value is not None}
+        metadatas.append(filtered_metadata)
+
+        doc_id = None
+        if id_field_name is not None:
+            doc_id = metadata.get(id_field_name)
+        if doc_id is None:
+            doc_id = file_name + "-" + str(i)
+        ids.append(doc_id)
+
+    return {"texts": texts, "metadatas": metadatas, "ids": ids}
diff --git a/src/extensions/vectorstores/PutChroma.py 
b/src/extensions/vectorstores/PutChroma.py
new file mode 100644
index 0000000..f9aad49
--- /dev/null
+++ b/src/extensions/vectorstores/PutChroma.py
@@ -0,0 +1,128 @@
+# SPDX-License-Identifier: Apache-2.0
+
+import json
+
+import ChromaUtils
+import EmbeddingUtils
+from nifiapi.flowfiletransform import FlowFileTransform, 
FlowFileTransformResult
+from nifiapi.properties import ExpressionLanguageScope, PropertyDescriptor, 
StandardValidators
+
+
+class PutChroma(FlowFileTransform):
+    class Java:
+        implements = ["org.apache.nifi.python.processor.FlowFileTransform"]
+
+    class ProcessorDetails:
+        version = "2.0.0.dev0"
+        description = """Publishes JSON data to a Chroma VectorDB. The 
Incoming data must be in single JSON per Line format, each with two keys: 
'text' and 'metadata'.
+                       The text must be a string, while metadata must be a map 
with strings for values. Any additional fields will be ignored. If the 
collection name specified
+                       does not exist, the Processor will automatically create 
the collection."""
+        tags = [
+            "chroma",
+            "vector",
+            "vectordb",
+            "embeddings",
+            "ai",
+            "artificial intelligence",
+            "ml",
+            "machine learning",
+            "text",
+            "LLM",
+        ]
+
+    STORE_TEXT = PropertyDescriptor(
+        name="Store Document Text",
+        description="""Specifies whether or not the text of the document 
should be stored in Chroma. If so, both the document's text and its embedding 
will be stored. If not,
+                    only the vector/embedding will be stored.""",
+        allowable_values=["true", "false"],
+        required=True,
+        default_value="true",
+    )
+    DISTANCE_METHOD = PropertyDescriptor(
+        name="Distance Method",
+        description="If the specified collection does not exist, it will be 
created using this Distance Method. If the collection exists, this property 
will be ignored.",
+        allowable_values=["cosine", "l2", "ip"],
+        default_value="cosine",
+        required=True,
+    )
+    DOC_ID_FIELD_NAME = PropertyDescriptor(
+        name="Document ID Field Name",
+        description="""Specifies the name of the field in the 'metadata' 
element of each document where the document's ID can be found.
+                    If not specified, an ID will be generated based on the 
FlowFile's filename and a one-up number.""",
+        required=False,
+        validators=[StandardValidators.NON_EMPTY_VALIDATOR],
+        expression_language_scope=ExpressionLanguageScope.FLOWFILE_ATTRIBUTES,
+    )
+
+    client = None
+    embedding_function = None
+
+    def __init__(self):
+        self.property_descriptors = list(ChromaUtils.PROPERTIES) + [
+            prop for prop in EmbeddingUtils.PROPERTIES if prop != 
EmbeddingUtils.EMBEDDING_MODEL
+        ]
+        self.property_descriptors.append(self.STORE_TEXT)
+        self.property_descriptors.append(self.DISTANCE_METHOD)
+        self.property_descriptors.append(self.DOC_ID_FIELD_NAME)
+
+    def getPropertyDescriptors(self):
+        return self.property_descriptors
+
+    def onScheduled(self, context):
+        self.client = ChromaUtils.create_client(context)
+        self.embedding_function = 
EmbeddingUtils.create_embedding_function(context)
+
+    def transform(self, context, flowfile):
+        client = self.client
+        embedding_function = self.embedding_function
+        collection_name = (
+            
context.getProperty(ChromaUtils.COLLECTION_NAME).evaluateAttributeExpressions(flowfile).getValue()
+        )
+        distance_method = context.getProperty(self.DISTANCE_METHOD).getValue()
+        id_field_name = 
context.getProperty(self.DOC_ID_FIELD_NAME).evaluateAttributeExpressions(flowfile).getValue()
+
+        collection = client.get_or_create_collection(
+            name=collection_name, embedding_function=embedding_function, 
metadata={"hnsw:space": distance_method}
+        )
+
+        json_lines = flowfile.getContentsAsBytes().decode()
+        i = 0
+        texts = []
+        metadatas = []
+        ids = []
+        for line in json_lines.split("\n"):
+            doc = json.loads(line)
+            text = doc.get("text")
+            metadata = doc.get("metadata")
+            texts.append(text)
+
+            # Remove any null values, or it will cause the embedding to fail
+            filtered_metadata = {}
+            for key, value in metadata.items():
+                if value is not None:
+                    if isinstance(value, list):
+                        for i, element in enumerate(value):
+                            element_count = i + 1
+                            indexed_key = f"{key}_{element_count}"
+                            filtered_metadata[indexed_key] = element
+                    else:
+                        filtered_metadata[key] = value
+
+            metadatas.append(filtered_metadata)
+
+            doc_id = None
+            if id_field_name is not None:
+                doc_id = metadata.get(id_field_name)
+            if doc_id is None:
+                doc_id = flowfile.getAttribute("filename") + "-" + str(i)
+            ids.append(doc_id)
+
+            i += 1
+
+        embeddings = embedding_function(texts)
+        if not context.getProperty(self.STORE_TEXT).asBoolean():
+            texts = None
+
+        collection.upsert(ids, embeddings, metadatas, texts)
+
+        return FlowFileTransformResult(relationship="success")
diff --git a/src/extensions/vectorstores/PutOpenSearchVector.py 
b/src/extensions/vectorstores/PutOpenSearchVector.py
new file mode 100644
index 0000000..92821db
--- /dev/null
+++ b/src/extensions/vectorstores/PutOpenSearchVector.py
@@ -0,0 +1,267 @@
+# SPDX-License-Identifier: Apache-2.0
+
+from EmbeddingUtils import EMBEDDING_MODEL, create_embedding_service
+from langchain.vectorstores import OpenSearchVectorSearch
+from nifiapi.documentation import use_case
+from nifiapi.flowfiletransform import FlowFileTransform, 
FlowFileTransformResult
+from nifiapi.properties import ExpressionLanguageScope, PropertyDependency, 
PropertyDescriptor, StandardValidators
+from OpenSearchVectorUtils import (
+    COSINESIMIL,
+    HTTP_HOST,
+    HUGGING_FACE_API_KEY,
+    HUGGING_FACE_MODEL,
+    INDEX_NAME,
+    L1,
+    L2,
+    LINF,
+    OPENAI_API_KEY,
+    OPENAI_API_MODEL,
+    PASSWORD,
+    TEXT_FIELD,
+    USERNAME,
+    VECTOR_FIELD,
+    create_authentication_params,
+    parse_documents,
+)
+
+
+@use_case(
+    description="Create vectors/embeddings that represent text content and 
send the vectors to OpenSearch",
+    notes="This use case assumes that the data has already been formatted in 
JSONL format with the text to store in OpenSearch provided in the 'text' 
field.",
+    keywords=["opensearch", "embedding", "vector", "text", "vectorstore", 
"insert"],
+    configuration="""
+                Configure the 'HTTP Host' to an appropriate URL where 
OpenSearch is accessible.
+                Configure 'Embedding Model' to indicate whether OpenAI 
embeddings should be used or a HuggingFace embedding model should be used: 
'Hugging Face Model' or 'OpenAI Model'
+                Configure the 'OpenAI API Key' or 'HuggingFace API Key', 
depending on the chosen Embedding Model.
+                Set 'Index Name' to the name of your OpenSearch Index.
+                Set 'Vector Field Name' to the name of the field in the 
document which will store the vector data.
+                Set 'Text Field Name' to the name of the field in the document 
which will store the text data.
+
+                If the documents to send to OpenSearch contain a unique 
identifier, set the 'Document ID Field Name' property to the name of the field 
that contains the document ID.
+                This property can be left blank, in which case a unique ID 
will be generated based on the FlowFile's filename.
+
+                If the provided index does not exists in OpenSearch then the 
processor is capable to create it. The 'New Index Strategy' property defines
+                that the index needs to be created from the default template 
or it should be configured with custom values.
+                """,
+)
+@use_case(
+    description="Update vectors/embeddings in OpenSearch",
+    notes="This use case assumes that the data has already been formatted in 
JSONL format with the text to store in OpenSearch provided in the 'text' 
field.",
+    keywords=["opensearch", "embedding", "vector", "text", "vectorstore", 
"update", "upsert"],
+    configuration="""
+                Configure the 'HTTP Host' to an appropriate URL where 
OpenSearch is accessible.
+                Configure 'Embedding Model' to indicate whether OpenAI 
embeddings should be used or a HuggingFace embedding model should be used: 
'Hugging Face Model' or 'OpenAI Model'
+                Configure the 'OpenAI API Key' or 'HuggingFace API Key', 
depending on the chosen Embedding Model.
+                Set 'Index Name' to the name of your OpenSearch Index.
+                Set 'Vector Field Name' to the name of the field in the 
document which will store the vector data.
+                Set 'Text Field Name' to the name of the field in the document 
which will store the text data.
+                Set the 'Document ID Field Name' property to the name of the 
field that contains the identifier of the document in OpenSearch to update.
+                """,
+)
+class PutOpenSearchVector(FlowFileTransform):
+    class Java:
+        implements = ["org.apache.nifi.python.processor.FlowFileTransform"]
+
+    class ProcessorDetails:
+        version = "2.0.0.dev0"
+        description = """Publishes JSON data to OpenSearch. The Incoming data 
must be in single JSON per Line format, each with two keys: 'text' and 
'metadata'.
+                       The text must be a string, while metadata must be a map 
with strings for values. Any additional fields will be ignored."""
+        tags = [
+            "opensearch",
+            "vector",
+            "vectordb",
+            "vectorstore",
+            "embeddings",
+            "ai",
+            "artificial intelligence",
+            "ml",
+            "machine learning",
+            "text",
+            "LLM",
+        ]
+
+    # Engine types
+    NMSLIB = ("nmslib (Non-Metric Space Library)", "nmslib")
+    FAISS = ("faiss (Facebook AI Similarity Search)", "faiss")
+    LUCENE = ("lucene", "lucene")
+
+    ENGINE_VALUES = dict([NMSLIB, FAISS, LUCENE])
+
+    # Space types
+    INNERPRODUCT = ("Inner product", "innerproduct")
+
+    NMSLIB_SPACE_TYPE_VALUES = dict([L2, L1, LINF, COSINESIMIL, INNERPRODUCT])
+    FAISS_SPACE_TYPE_VALUES = dict([L2, INNERPRODUCT])
+    LUCENE_SPACE_TYPE_VALUES = dict([L2, COSINESIMIL])
+
+    # New Index Mapping Strategy
+    DEFAULT_INDEX_MAPPING = "Default index mapping"
+    CUSTOM_INDEX_MAPPING = "Custom index mapping"
+
+    DOC_ID_FIELD_NAME = PropertyDescriptor(
+        name="Document ID Field Name",
+        description="""Specifies the name of the field in the 'metadata' 
element of each document where the document's ID can be found.
+                    If not specified, an ID will be generated based on the 
FlowFile's filename and a one-up number.""",
+        required=False,
+        validators=[StandardValidators.NON_EMPTY_VALIDATOR],
+        expression_language_scope=ExpressionLanguageScope.FLOWFILE_ATTRIBUTES,
+    )
+    NEW_INDEX_STRATEGY = PropertyDescriptor(
+        name="New Index Strategy",
+        description="""Specifies the Mapping strategy to use for new index 
creation. The default template values are the following:
+                    {engine: nmslib, space_type: l2, ef_search: 512, 
ef_construction: 512, m: 16}""",
+        allowable_values=[DEFAULT_INDEX_MAPPING, CUSTOM_INDEX_MAPPING],
+        default_value=DEFAULT_INDEX_MAPPING,
+        required=False,
+    )
+    ENGINE = PropertyDescriptor(
+        name="Engine",
+        description="The approximate k-NN library to use for indexing and 
search.",
+        allowable_values=ENGINE_VALUES.keys(),
+        default_value=NMSLIB[0],
+        required=False,
+        dependencies=[PropertyDependency(NEW_INDEX_STRATEGY, 
CUSTOM_INDEX_MAPPING)],
+    )
+    NMSLIB_SPACE_TYPE = PropertyDescriptor(
+        name="NMSLIB Space Type",
+        description="The vector space used to calculate the distance between 
vectors.",
+        allowable_values=NMSLIB_SPACE_TYPE_VALUES.keys(),
+        default_value=L2[0],
+        required=False,
+        dependencies=[
+            PropertyDependency(NEW_INDEX_STRATEGY, CUSTOM_INDEX_MAPPING),
+            PropertyDependency(ENGINE, NMSLIB[0]),
+        ],
+    )
+    FAISS_SPACE_TYPE = PropertyDescriptor(
+        name="FAISS Space Type",
+        description="The vector space used to calculate the distance between 
vectors.",
+        allowable_values=FAISS_SPACE_TYPE_VALUES.keys(),
+        default_value=L2[0],
+        required=False,
+        dependencies=[
+            PropertyDependency(NEW_INDEX_STRATEGY, CUSTOM_INDEX_MAPPING),
+            PropertyDependency(ENGINE, FAISS[0]),
+        ],
+    )
+    LUCENE_SPACE_TYPE = PropertyDescriptor(
+        name="Lucene Space Type",
+        description="The vector space used to calculate the distance between 
vectors.",
+        allowable_values=LUCENE_SPACE_TYPE_VALUES.keys(),
+        default_value=L2[0],
+        required=False,
+        dependencies=[
+            PropertyDependency(NEW_INDEX_STRATEGY, CUSTOM_INDEX_MAPPING),
+            PropertyDependency(ENGINE, LUCENE[0]),
+        ],
+    )
+    EF_SEARCH = PropertyDescriptor(
+        name="EF Search",
+        description="The size of the dynamic list used during k-NN searches. 
Higher values lead to more accurate but slower searches.",
+        default_value="512",
+        required=False,
+        validators=[StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR],
+        dependencies=[PropertyDependency(NEW_INDEX_STRATEGY, 
CUSTOM_INDEX_MAPPING)],
+    )
+    EF_CONSTRUCTION = PropertyDescriptor(
+        name="EF Construction",
+        description="The size of the dynamic list used during k-NN graph 
creation. Higher values lead to a more accurate graph but slower indexing 
speed.",
+        default_value="512",
+        required=False,
+        validators=[StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR],
+        dependencies=[PropertyDependency(NEW_INDEX_STRATEGY, 
CUSTOM_INDEX_MAPPING)],
+    )
+    M = PropertyDescriptor(
+        name="M",
+        description="The number of bidirectional links that the plugin creates 
for each new element. Increasing and "
+        "decreasing this value can have a large impact on memory consumption. 
Keep this value between 2 and 100.",
+        default_value="16",
+        required=False,
+        
validators=[StandardValidators._standard_validators.createLongValidator(2, 100, 
True)],
+        dependencies=[PropertyDependency(NEW_INDEX_STRATEGY, 
CUSTOM_INDEX_MAPPING)],
+    )
+
+    properties = [
+        EMBEDDING_MODEL,
+        OPENAI_API_KEY,
+        OPENAI_API_MODEL,
+        HUGGING_FACE_API_KEY,
+        HUGGING_FACE_MODEL,
+        HTTP_HOST,
+        USERNAME,
+        PASSWORD,
+        INDEX_NAME,
+        DOC_ID_FIELD_NAME,
+        VECTOR_FIELD,
+        TEXT_FIELD,
+        NEW_INDEX_STRATEGY,
+        ENGINE,
+        NMSLIB_SPACE_TYPE,
+        FAISS_SPACE_TYPE,
+        LUCENE_SPACE_TYPE,
+        EF_SEARCH,
+        EF_CONSTRUCTION,
+        M,
+    ]
+
+    embeddings = None
+
+    def __init__(self, **kwargs):
+        pass
+
+    def getPropertyDescriptors(self):
+        return self.properties
+
+    def onScheduled(self, context):
+        self.embeddings = create_embedding_service(context)
+
+    def transform(self, context, flowfile):
+        file_name = flowfile.getAttribute("filename")
+        http_host = 
context.getProperty(HTTP_HOST).evaluateAttributeExpressions(flowfile).getValue()
+        index_name = 
context.getProperty(INDEX_NAME).evaluateAttributeExpressions(flowfile).getValue()
+        id_field_name = 
context.getProperty(self.DOC_ID_FIELD_NAME).evaluateAttributeExpressions(flowfile).getValue()
+        vector_field = 
context.getProperty(VECTOR_FIELD).evaluateAttributeExpressions(flowfile).getValue()
+        text_field = 
context.getProperty(TEXT_FIELD).evaluateAttributeExpressions(flowfile).getValue()
+        new_index_strategy = 
context.getProperty(self.NEW_INDEX_STRATEGY).evaluateAttributeExpressions().getValue()
+
+        params = {"vector_field": vector_field, "text_field": text_field}
+        params.update(create_authentication_params(context))
+
+        if new_index_strategy == self.CUSTOM_INDEX_MAPPING:
+            engine = 
context.getProperty(self.ENGINE).evaluateAttributeExpressions().getValue()
+            params["engine"] = self.ENGINE_VALUES.get(engine)
+
+            if engine == self.NMSLIB[0]:
+                space_type = 
context.getProperty(self.NMSLIB_SPACE_TYPE).evaluateAttributeExpressions().getValue()
+                params["space_type"] = 
self.NMSLIB_SPACE_TYPE_VALUES.get(space_type)
+            if engine == self.FAISS[0]:
+                space_type = 
context.getProperty(self.FAISS_SPACE_TYPE).evaluateAttributeExpressions().getValue()
+                params["space_type"] = 
self.FAISS_SPACE_TYPE_VALUES.get(space_type)
+            if engine == self.LUCENE[0]:
+                space_type = 
context.getProperty(self.LUCENE_SPACE_TYPE).evaluateAttributeExpressions().getValue()
+                params["space_type"] = 
self.LUCENE_SPACE_TYPE_VALUES.get(space_type)
+
+            ef_search = 
context.getProperty(self.EF_SEARCH).evaluateAttributeExpressions().asInteger()
+            params["ef_search"] = ef_search
+
+            ef_construction = 
context.getProperty(self.EF_CONSTRUCTION).evaluateAttributeExpressions().asInteger()
+            params["ef_construction"] = ef_construction
+
+            m = 
context.getProperty(self.M).evaluateAttributeExpressions().asInteger()
+            params["m"] = m
+
+        # Read the FlowFile content as "json-lines".
+        json_lines = flowfile.getContentsAsBytes().decode()
+        parsed_documents = parse_documents(json_lines, id_field_name, 
file_name)
+
+        vectorstore = OpenSearchVectorSearch(
+            opensearch_url=http_host, index_name=index_name, 
embedding_function=self.embeddings, **params
+        )
+        vectorstore.add_texts(
+            texts=parsed_documents["texts"],
+            metadatas=parsed_documents["metadatas"],
+            ids=parsed_documents["ids"],
+            **params,
+        )
+
+        return FlowFileTransformResult(relationship="success")
diff --git a/src/extensions/vectorstores/PutPinecone.py 
b/src/extensions/vectorstores/PutPinecone.py
new file mode 100644
index 0000000..8625082
--- /dev/null
+++ b/src/extensions/vectorstores/PutPinecone.py
@@ -0,0 +1,220 @@
+# SPDX-License-Identifier: Apache-2.0
+
+import json
+
+import langchain.vectorstores
+from EmbeddingUtils import EMBEDDING_MODEL, HUGGING_FACE, OPENAI, 
create_embedding_service
+from nifiapi.documentation import use_case
+from nifiapi.flowfiletransform import FlowFileTransform, 
FlowFileTransformResult
+from nifiapi.properties import ExpressionLanguageScope, PropertyDependency, 
PropertyDescriptor, StandardValidators
+from pinecone import Pinecone
+
+
+@use_case(
+    description="Create vectors/embeddings that represent text content and 
send the vectors to Pinecone",
+    notes="This use case assumes that the data has already been formatted in 
JSONL format with the text to store in Pinecone provided in the 'text' field.",
+    keywords=["pinecone", "embedding", "vector", "text", "vectorstore", 
"insert"],
+    configuration="""
+                Configure the 'Pinecone API Key' to the appropriate 
authentication token for interacting with Pinecone.
+                Configure 'Embedding Model' to indicate whether OpenAI 
embeddings should be used or a HuggingFace embedding model should be used: 
'Hugging Face Model' or 'OpenAI Model'
+                Configure the 'OpenAI API Key' or 'HuggingFace API Key', 
depending on the chosen Embedding Model.
+                Set 'Pinecone Environment' to the name of your Pinecone 
environment
+                Set 'Index Name' to the name of your Pinecone Index.
+                Set 'Namespace' to appropriate namespace, or leave it empty to 
use the default Namespace.
+
+                If the documents to send to Pinecone contain a unique 
identifier, set the 'Document ID Field Name' property to the name of the field 
that contains the document ID.
+                This property can be left blank, in which case a unique ID 
will be generated based on the FlowFile's filename.
+                """,
+)
+@use_case(
+    description="Update vectors/embeddings in Pinecone",
+    notes="This use case assumes that the data has already been formatted in 
JSONL format with the text to store in Pinecone provided in the 'text' field.",
+    keywords=["pinecone", "embedding", "vector", "text", "vectorstore", 
"update", "upsert"],
+    configuration="""
+                Configure the 'Pinecone API Key' to the appropriate 
authentication token for interacting with Pinecone.
+                Configure 'Embedding Model' to indicate whether OpenAI 
embeddings should be used or a HuggingFace embedding model should be used: 
'Hugging Face Model' or 'OpenAI Model'
+                Configure the 'OpenAI API Key' or 'HuggingFace API Key', 
depending on the chosen Embedding Model.
+                Set 'Pinecone Environment' to the name of your Pinecone 
environment
+                Set 'Index Name' to the name of your Pinecone Index.
+                Set 'Namespace' to appropriate namespace, or leave it empty to 
use the default Namespace.
+                Set the 'Document ID Field Name' property to the name of the 
field that contains the identifier of the document in Pinecone to update.
+                """,
+)
+class PutPinecone(FlowFileTransform):
+    class Java:
+        implements = ["org.apache.nifi.python.processor.FlowFileTransform"]
+
+    class ProcessorDetails:
+        version = "2.0.0.dev0"
+        description = """Publishes JSON data to Pinecone. The Incoming data 
must be in single JSON per Line format, each with two keys: 'text' and 
'metadata'.
+                       The text must be a string, while metadata must be a map 
with strings for values. Any additional fields will be ignored."""
+        tags = [
+            "pinecone",
+            "vector",
+            "vectordb",
+            "vectorstore",
+            "embeddings",
+            "ai",
+            "artificial intelligence",
+            "ml",
+            "machine learning",
+            "text",
+            "LLM",
+        ]
+
+    PINECONE_API_KEY = PropertyDescriptor(
+        name="Pinecone API Key",
+        description="The API Key to use in order to authentication with 
Pinecone",
+        sensitive=True,
+        required=True,
+        validators=[StandardValidators.NON_EMPTY_VALIDATOR],
+    )
+    HUGGING_FACE_API_KEY = PropertyDescriptor(
+        name="HuggingFace API Key",
+        description="The API Key for interacting with HuggingFace",
+        validators=[StandardValidators.NON_EMPTY_VALIDATOR],
+        required=True,
+        sensitive=True,
+        dependencies=[PropertyDependency(EMBEDDING_MODEL, HUGGING_FACE)],
+    )
+    HUGGING_FACE_MODEL = PropertyDescriptor(
+        name="HuggingFace Model",
+        description="The name of the HuggingFace model to use",
+        validators=[StandardValidators.NON_EMPTY_VALIDATOR],
+        required=True,
+        default_value="sentence-transformers/all-MiniLM-L6-v2",
+        dependencies=[PropertyDependency(EMBEDDING_MODEL, HUGGING_FACE)],
+    )
+    OPENAI_API_KEY = PropertyDescriptor(
+        name="OpenAI API Key",
+        description="The API Key for OpenAI in order to create embeddings",
+        sensitive=True,
+        required=True,
+        validators=[StandardValidators.NON_EMPTY_VALIDATOR],
+        dependencies=[PropertyDependency(EMBEDDING_MODEL, OPENAI)],
+    )
+    OPENAI_API_MODEL = PropertyDescriptor(
+        name="OpenAI Model",
+        description="The API Key for OpenAI in order to create embeddings",
+        required=True,
+        validators=[StandardValidators.NON_EMPTY_VALIDATOR],
+        default_value="text-embedding-ada-002",
+        dependencies=[PropertyDependency(EMBEDDING_MODEL, OPENAI)],
+    )
+    PINECONE_ENV = PropertyDescriptor(
+        name="Pinecone Environment",
+        description="The name of the Pinecone Environment. This can be found 
in the Pinecone console next to the API Key.",
+        sensitive=False,
+        required=True,
+        validators=[StandardValidators.NON_EMPTY_VALIDATOR],
+    )
+    INDEX_NAME = PropertyDescriptor(
+        name="Index Name",
+        description="The name of the Pinecone index.",
+        sensitive=False,
+        required=True,
+        validators=[StandardValidators.NON_EMPTY_VALIDATOR],
+        expression_language_scope=ExpressionLanguageScope.FLOWFILE_ATTRIBUTES,
+    )
+    TEXT_KEY = PropertyDescriptor(
+        name="Text Key",
+        description="The key in the document that contains the text to create 
embeddings for.",
+        required=True,
+        validators=[StandardValidators.NON_EMPTY_VALIDATOR],
+        default_value="text",
+        expression_language_scope=ExpressionLanguageScope.FLOWFILE_ATTRIBUTES,
+    )
+    NAMESPACE = PropertyDescriptor(
+        name="Namespace",
+        description="The name of the Pinecone Namespace to put the documents 
to.",
+        required=False,
+        validators=[StandardValidators.NON_EMPTY_VALIDATOR],
+        expression_language_scope=ExpressionLanguageScope.FLOWFILE_ATTRIBUTES,
+    )
+    DOC_ID_FIELD_NAME = PropertyDescriptor(
+        name="Document ID Field Name",
+        description="""Specifies the name of the field in the 'metadata' 
element of each document where the document's ID can be found.
+                    If not specified, an ID will be generated based on the 
FlowFile's filename and a one-up number.""",
+        required=False,
+        validators=[StandardValidators.NON_EMPTY_VALIDATOR],
+        expression_language_scope=ExpressionLanguageScope.FLOWFILE_ATTRIBUTES,
+    )
+
+    properties = [
+        PINECONE_API_KEY,
+        EMBEDDING_MODEL,
+        OPENAI_API_KEY,
+        OPENAI_API_MODEL,
+        HUGGING_FACE_API_KEY,
+        HUGGING_FACE_MODEL,
+        PINECONE_ENV,
+        INDEX_NAME,
+        TEXT_KEY,
+        NAMESPACE,
+        DOC_ID_FIELD_NAME,
+    ]
+
+    embeddings = None
+    pc = None
+
+    def __init__(self, **kwargs):
+        pass
+
+    def getPropertyDescriptors(self):
+        return self.properties
+
+    def onScheduled(self, context):
+        # initialize pinecone
+        self.pc = Pinecone(
+            api_key=context.getProperty(self.PINECONE_API_KEY).getValue(),
+            environment=context.getProperty(self.PINECONE_ENV).getValue(),
+        )
+        # initialize embedding service
+        self.embeddings = create_embedding_service(context)
+
+    def transform(self, context, flowfile):
+        # First, check if our index already exists. If it doesn't, we create it
+        index_name = 
context.getProperty(self.INDEX_NAME).evaluateAttributeExpressions(flowfile).getValue()
+        namespace = 
context.getProperty(self.NAMESPACE).evaluateAttributeExpressions(flowfile).getValue()
+        id_field_name = 
context.getProperty(self.DOC_ID_FIELD_NAME).evaluateAttributeExpressions(flowfile).getValue()
+
+        index = self.pc.Index(index_name)
+
+        # Read the FlowFile content as "json-lines".
+        json_lines = flowfile.getContentsAsBytes().decode()
+        i = 1
+        texts = []
+        metadatas = []
+        ids = []
+        for line in json_lines.split("\n"):
+            try:
+                doc = json.loads(line)
+            except Exception as e:
+                message = f"Could not parse line {i} as JSON"
+                raise ValueError(message) from e
+
+            text = doc.get("text")
+            metadata = doc.get("metadata")
+            texts.append(text)
+
+            # Remove any null values, or it will cause the embedding to fail
+            filtered_metadata = {}
+            for key, value in metadata.items():
+                if value is not None:
+                    filtered_metadata[key] = value
+
+            metadatas.append(filtered_metadata)
+
+            doc_id = None
+            if id_field_name is not None:
+                doc_id = metadata.get(id_field_name)
+            if doc_id is None:
+                doc_id = flowfile.getAttribute("filename") + "-" + str(i)
+            ids.append(doc_id)
+
+            i += 1
+
+        text_key = 
context.getProperty(self.TEXT_KEY).evaluateAttributeExpressions().getValue()
+        vectorstore = langchain.vectorstores.Pinecone(index, 
self.embeddings.embed_query, text_key)
+        vectorstore.add_texts(texts=texts, metadatas=metadatas, ids=ids, 
namespace=namespace)
+        return FlowFileTransformResult(relationship="success")
diff --git a/src/extensions/vectorstores/PutQdrant.py 
b/src/extensions/vectorstores/PutQdrant.py
new file mode 100644
index 0000000..42e7d7c
--- /dev/null
+++ b/src/extensions/vectorstores/PutQdrant.py
@@ -0,0 +1,151 @@
+# SPDX-License-Identifier: Apache-2.0
+
+import json
+
+import QdrantUtils
+from EmbeddingUtils import (
+    create_embedding_service,
+)
+from langchain.vectorstores.qdrant import Qdrant
+from nifiapi.documentation import use_case
+from nifiapi.flowfiletransform import FlowFileTransform, 
FlowFileTransformResult
+from nifiapi.properties import (
+    ExpressionLanguageScope,
+    PropertyDescriptor,
+    StandardValidators,
+)
+from qdrant_client.models import Distance
+
+
+@use_case(
+    description="Create embeddings that semantically represent text content 
and upload to Qdrant - https://qdrant.tech/";,
+    notes="This processor assumes that the data has already been formatted in 
JSONL format with the text to store in Qdrant provided in the 'text' field.",
+    keywords=["qdrant", "embedding", "vector", "text", "vectorstore", 
"insert"],
+    configuration="""
+                Configure 'Collection Name' to the name of the Qdrant 
collection to use.
+                Configure 'Qdrant URL' to the fully qualified URL of the 
Qdrant instance.
+                Configure 'Qdrant API Key' to the API Key to use in order to 
authenticate with Qdrant.
+                Configure 'Prefer gRPC' to True if you want to use gRPC for 
interfacing with Qdrant.
+                Configure 'Use HTTPS' to True if you want to use TLS(HTTPS) 
while interfacing with Qdrant.
+                Configure 'Embedding Model' to indicate whether OpenAI 
embeddings should be used or a HuggingFace embedding model should be used: 
'Hugging Face Model' or 'OpenAI Model'
+                Configure 'HuggingFace API Key' or 'OpenAI API Key', depending 
on the chosen Embedding Model.
+                Configure 'HuggingFace Model' or 'OpenAI Model' to the name of 
the model to use.
+                Configure 'Force Recreate Collection' to True if you want to 
recreate the collection if it already exists.
+                Configure 'Similarity Metric' to the similarity metric to use 
when querying Qdrant.
+
+                If the documents to send to Qdrant contain a unique 
identifier(UUID), set the 'Document ID Field Name' property to the name of the 
field that contains the document ID.
+                This property can be left blank, in which case a UUID will be 
generated based on the FlowFile's filename.
+                """,
+)
+class PutQdrant(FlowFileTransform):
+    class Java:
+        implements = ["org.apache.nifi.python.processor.FlowFileTransform"]
+
+    class ProcessorDetails:
+        version = "2.0.0.dev0"
+        description = """Publishes JSON data to Qdrant. The Incoming data must 
be in single JSON per Line format, each with two keys: 'text' and 'metadata'.
+                       The text must be a string, while metadata must be a map 
with strings for values. Any additional fields will be ignored."""
+        tags = [
+            "qdrant",
+            "vector",
+            "vectordb",
+            "vectorstore",
+            "embeddings",
+            "ai",
+            "artificial intelligence",
+            "ml",
+            "machine learning",
+            "text",
+            "LLM",
+        ]
+
+    DOC_ID_FIELD_NAME = PropertyDescriptor(
+        name="Document ID Field Name",
+        description="""Specifies the name of the field in the 'metadata' 
element of each document where the document's ID can be found.
+                    If not specified, a UUID will be generated based on the 
FlowFile's filename and an incremental number.""",
+        required=False,
+        validators=[StandardValidators.NON_EMPTY_VALIDATOR],
+        expression_language_scope=ExpressionLanguageScope.FLOWFILE_ATTRIBUTES,
+    )
+    FORCE_RECREATE_COLLECTION = PropertyDescriptor(
+        name="Force Recreate Collection",
+        description="Specifies whether to recreate the collection if it 
already exists. Essentially clearing the existing data.",
+        required=True,
+        default_value="False",
+        allowable_values=["True", "False"],
+        validators=[StandardValidators.BOOLEAN_VALIDATOR],
+    )
+    SIMILARITY_METRIC = PropertyDescriptor(
+        name="Similarity Metric",
+        description="Specifies the similarity metric when creating the 
collection.",
+        required=True,
+        default_value=Distance.COSINE,
+        allowable_values=[
+            Distance.COSINE,
+            Distance.EUCLID,
+            Distance.DOT,
+            Distance.MANHATTAN,
+        ],
+        validators=[StandardValidators.NON_EMPTY_VALIDATOR],
+    )
+
+    properties = (
+        QdrantUtils.QDRANT_PROPERTIES
+        + QdrantUtils.EMBEDDING_MODEL_PROPERTIES
+        + [
+            FORCE_RECREATE_COLLECTION,
+            SIMILARITY_METRIC,
+            DOC_ID_FIELD_NAME,
+        ]
+    )
+
+    def __init__(self, **kwargs):
+        pass
+
+    def getPropertyDescriptors(self):
+        return self.properties
+
+    def onScheduled(self, context):
+        # The Qdrant#construct_instance() internally checks if the collection 
exists
+        # and creates it if it doesn't with the appropriate dimesions and 
configurations.
+        self.vector_store = Qdrant.construct_instance(
+            texts=["Some text to obtain the embeddings dimension when creating 
the collection"],
+            embedding=create_embedding_service(context),
+            
collection_name=context.getProperty(QdrantUtils.COLLECTION_NAME).getValue(),
+            url=context.getProperty(QdrantUtils.QDRANT_URL).getValue(),
+            api_key=context.getProperty(QdrantUtils.QDRANT_API_KEY).getValue(),
+            
prefer_grpc=context.getProperty(QdrantUtils.PREFER_GRPC).asBoolean(),
+            https=context.getProperty(QdrantUtils.HTTPS).asBoolean(),
+            
force_recreate=context.getProperty(self.FORCE_RECREATE_COLLECTION).asBoolean(),
+            
distance_func=context.getProperty(self.SIMILARITY_METRIC).getValue(),
+        )
+
+    def transform(self, context, flowfile):
+        id_field_name = 
context.getProperty(self.DOC_ID_FIELD_NAME).evaluateAttributeExpressions(flowfile).getValue()
+
+        # Read the FlowFile content as "json-lines".
+        json_lines = flowfile.getContentsAsBytes().decode()
+        i = 1
+        texts, metadatas, ids = [], [], []
+        for line in json_lines.split("\n"):
+            try:
+                doc = json.loads(line)
+            except Exception as e:
+                message = f"Could not parse line {i} as JSON"
+                raise ValueError(message) from e
+
+            metadata = doc.get("metadata")
+            texts.append(doc.get("text"))
+            metadatas.append(metadata)
+
+            doc_id = None
+            if id_field_name is not None:
+                doc_id = metadata.get(id_field_name)
+            if doc_id is None:
+                doc_id = 
QdrantUtils.convert_id(flowfile.getAttribute("filename") + "-" + str(i))
+            ids.append(doc_id)
+
+            i += 1
+
+        self.vector_store.add_texts(texts=texts, metadatas=metadatas, ids=ids)
+        return FlowFileTransformResult(relationship="success")
diff --git a/src/extensions/vectorstores/QdrantUtils.py 
b/src/extensions/vectorstores/QdrantUtils.py
new file mode 100644
index 0000000..7435124
--- /dev/null
+++ b/src/extensions/vectorstores/QdrantUtils.py
@@ -0,0 +1,112 @@
+# SPDX-License-Identifier: Apache-2.0
+
+import uuid
+
+from EmbeddingUtils import (
+    EMBEDDING_MODEL,
+    HUGGING_FACE,
+    OPENAI,
+)
+from nifiapi.properties import (
+    ExpressionLanguageScope,
+    PropertyDependency,
+    PropertyDescriptor,
+    StandardValidators,
+)
+
+DEFAULT_COLLECTION_NAME = "apache-nifi"
+
+
+COLLECTION_NAME = PropertyDescriptor(
+    name="Collection Name",
+    description="The name of the Qdrant collection to use.",
+    sensitive=False,
+    required=True,
+    default_value=DEFAULT_COLLECTION_NAME,
+    validators=[StandardValidators.NON_EMPTY_VALIDATOR],
+    expression_language_scope=ExpressionLanguageScope.FLOWFILE_ATTRIBUTES,
+)
+QDRANT_URL = PropertyDescriptor(
+    name="Qdrant URL",
+    description="The fully qualified URL to the Qdrant instance.",
+    sensitive=False,
+    required=True,
+    default_value="http://localhost:6333";,
+    validators=[StandardValidators.URL_VALIDATOR],
+)
+QDRANT_API_KEY = PropertyDescriptor(
+    name="Qdrant API Key",
+    description="The API Key to use in order to authentication with Qdrant. 
Can be empty.",
+    sensitive=True,
+    required=True,
+)
+
+PREFER_GRPC = PropertyDescriptor(
+    name="Prefer gRPC",
+    description="Specifies whether to use gRPC for interfacing with Qdrant.",
+    required=True,
+    default_value=False,
+    allowable_values=["True", "False"],
+    validators=[StandardValidators.BOOLEAN_VALIDATOR],
+)
+HTTPS = PropertyDescriptor(
+    name="Use HTTPS",
+    description="Specifies whether to TLS(HTTPS) while interfacing with 
Qdrant.",
+    required=True,
+    default_value=False,
+    allowable_values=["True", "False"],
+    validators=[StandardValidators.BOOLEAN_VALIDATOR],
+)
+
+QDRANT_PROPERTIES = [COLLECTION_NAME, QDRANT_URL, QDRANT_API_KEY, PREFER_GRPC, 
HTTPS]
+
+HUGGING_FACE_API_KEY = PropertyDescriptor(
+    name="HuggingFace API Key",
+    description="The API Key for interacting with HuggingFace",
+    validators=[StandardValidators.NON_EMPTY_VALIDATOR],
+    required=True,
+    sensitive=True,
+    dependencies=[PropertyDependency(EMBEDDING_MODEL, HUGGING_FACE)],
+)
+HUGGING_FACE_MODEL = PropertyDescriptor(
+    name="HuggingFace Model",
+    description="The name of the HuggingFace model to use.",
+    validators=[StandardValidators.NON_EMPTY_VALIDATOR],
+    required=True,
+    default_value="sentence-transformers/all-MiniLM-L6-v2",
+    dependencies=[PropertyDependency(EMBEDDING_MODEL, HUGGING_FACE)],
+)
+OPENAI_API_KEY = PropertyDescriptor(
+    name="OpenAI API Key",
+    description="The API Key for OpenAI in order to create embeddings.",
+    sensitive=True,
+    required=True,
+    validators=[StandardValidators.NON_EMPTY_VALIDATOR],
+    dependencies=[PropertyDependency(EMBEDDING_MODEL, OPENAI)],
+)
+OPENAI_API_MODEL = PropertyDescriptor(
+    name="OpenAI Model",
+    description="The name of the OpenAI model to use.",
+    required=True,
+    validators=[StandardValidators.NON_EMPTY_VALIDATOR],
+    default_value="text-embedding-ada-002",
+    dependencies=[PropertyDependency(EMBEDDING_MODEL, OPENAI)],
+)
+
+EMBEDDING_MODEL_PROPERTIES = [
+    EMBEDDING_MODEL,
+    HUGGING_FACE_API_KEY,
+    HUGGING_FACE_MODEL,
+    OPENAI_API_KEY,
+    OPENAI_API_MODEL,
+]
+
+
+def convert_id(_id: str) -> str:
+    """
+    Converts any string into a UUID string deterministically.
+
+    Qdrant accepts UUID strings and unsigned integers as point ID.
+    This allows us to overwrite the same point with the original ID.
+    """
+    return str(uuid.uuid5(uuid.NAMESPACE_DNS, _id))
diff --git a/src/extensions/vectorstores/QueryChroma.py 
b/src/extensions/vectorstores/QueryChroma.py
new file mode 100644
index 0000000..3d28ccf
--- /dev/null
+++ b/src/extensions/vectorstores/QueryChroma.py
@@ -0,0 +1,181 @@
+# SPDX-License-Identifier: Apache-2.0
+
+import json
+
+import ChromaUtils
+import EmbeddingUtils
+import QueryUtils
+from nifiapi.flowfiletransform import FlowFileTransform, 
FlowFileTransformResult
+from nifiapi.properties import ExpressionLanguageScope, PropertyDescriptor, 
StandardValidators
+
+
+class QueryChroma(FlowFileTransform):
+    class Java:
+        implements = ["org.apache.nifi.python.processor.FlowFileTransform"]
+
+    class ProcessorDetails:
+        version = "2.0.0.dev0"
+        description = "Queries a Chroma Vector Database in order to gather a 
specified number of documents that are most closely related to the given query."
+        tags = [
+            "chroma",
+            "vector",
+            "vectordb",
+            "embeddings",
+            "enrich",
+            "enrichment",
+            "ai",
+            "artificial intelligence",
+            "ml",
+            "machine learning",
+            "text",
+            "LLM",
+        ]
+
+    QUERY = PropertyDescriptor(
+        name="Query",
+        description="""The query to issue to the Chroma VectorDB. The query is 
always converted into embeddings using the configured embedding function, and 
the embedding is
+                    then sent to Chroma. The text itself is not sent to 
Chroma.""",
+        required=True,
+        validators=[StandardValidators.NON_EMPTY_VALIDATOR],
+        expression_language_scope=ExpressionLanguageScope.FLOWFILE_ATTRIBUTES,
+    )
+    NUMBER_OF_RESULTS = PropertyDescriptor(
+        name="Number of Results",
+        description="The number of results to return from Chroma",
+        required=True,
+        validators=[StandardValidators.POSITIVE_INTEGER_VALIDATOR],
+        default_value="10",
+        expression_language_scope=ExpressionLanguageScope.FLOWFILE_ATTRIBUTES,
+    )
+    METADATA_FILTER = PropertyDescriptor(
+        name="Metadata Filter",
+        description="""A JSON representation of a Metadata Filter that can be 
applied against the Chroma documents in order to narrow down the documents that 
can be returned.
+                    For example: { "metadata_field": "some_value" }""",
+        validators=[StandardValidators.NON_EMPTY_VALIDATOR],
+        expression_language_scope=ExpressionLanguageScope.FLOWFILE_ATTRIBUTES,
+        required=False,
+    )
+    DOCUMENT_FILTER = PropertyDescriptor(
+        name="Document Filter",
+        description="""A JSON representation of a Document Filter that can be 
applied against the Chroma documents' text in order to narrow down the 
documents that can be returned.
+                    For example: { "$contains": "search_string" }""",
+        validators=[StandardValidators.NON_EMPTY_VALIDATOR],
+        expression_language_scope=ExpressionLanguageScope.FLOWFILE_ATTRIBUTES,
+        required=False,
+    )
+
+    client = None
+    embedding_function = None
+    include_ids = None
+    include_metadatas = None
+    include_documents = None
+    include_distances = None
+    include_embeddings = None
+    results_field = None
+
+    property_descriptors = (
+        list(ChromaUtils.PROPERTIES)
+        + [prop for prop in EmbeddingUtils.PROPERTIES if prop != 
EmbeddingUtils.EMBEDDING_MODEL]
+        + [
+            QUERY,
+            NUMBER_OF_RESULTS,
+            QueryUtils.OUTPUT_STRATEGY,
+            QueryUtils.RESULTS_FIELD,
+            METADATA_FILTER,
+            DOCUMENT_FILTER,
+            QueryUtils.INCLUDE_IDS,
+            QueryUtils.INCLUDE_METADATAS,
+            QueryUtils.INCLUDE_DOCUMENTS,
+            QueryUtils.INCLUDE_DISTANCES,
+            QueryUtils.INCLUDE_EMBEDDINGS,
+        ]
+    )
+
+    def __init__(self, **kwargs):
+        pass
+
+    def getPropertyDescriptors(self):
+        return self.property_descriptors
+
+    def onScheduled(self, context):
+        self.client = ChromaUtils.create_client(context)
+        self.embedding_function = 
EmbeddingUtils.create_embedding_function(context)
+        self.include_ids = 
context.getProperty(QueryUtils.INCLUDE_IDS).asBoolean()
+        self.include_metadatas = 
context.getProperty(QueryUtils.INCLUDE_METADATAS).asBoolean()
+        self.include_documents = 
context.getProperty(QueryUtils.INCLUDE_DOCUMENTS).asBoolean()
+        self.include_distances = 
context.getProperty(QueryUtils.INCLUDE_DISTANCES).asBoolean()
+        self.include_embeddings = 
context.getProperty(QueryUtils.INCLUDE_EMBEDDINGS).asBoolean()
+        self.results_field = 
context.getProperty(QueryUtils.RESULTS_FIELD).getValue()
+        self.query_utils = QueryUtils.QueryUtils(context)
+
+    def transform(self, context, flowfile):
+        client = self.client
+        embedding_function = self.embedding_function
+        collection_name = (
+            
context.getProperty(ChromaUtils.COLLECTION_NAME).evaluateAttributeExpressions(flowfile).getValue()
+        )
+
+        collection = client.get_collection(name=collection_name, 
embedding_function=embedding_function)
+
+        query_text = 
context.getProperty(self.QUERY).evaluateAttributeExpressions(flowfile).getValue()
+        embeddings = embedding_function([query_text])
+
+        included_fields = []
+        if self.include_distances:
+            included_fields.append("distances")
+        if self.include_documents:
+            included_fields.append("documents")
+        if self.include_embeddings:
+            included_fields.append("embeddings")
+        if self.include_metadatas:
+            included_fields.append("metadatas")
+
+        where = None
+        where_clause = 
context.getProperty(self.METADATA_FILTER).evaluateAttributeExpressions(flowfile).getValue()
+        if where_clause is not None:
+            where = json.loads(where_clause)
+
+        where_document = None
+        where_document_clause = (
+            
context.getProperty(self.DOCUMENT_FILTER).evaluateAttributeExpressions(flowfile).getValue()
+        )
+        if where_document_clause is not None:
+            where_document = json.loads(where_document_clause)
+
+        query_results = collection.query(
+            query_embeddings=embeddings,
+            
n_results=context.getProperty(self.NUMBER_OF_RESULTS).evaluateAttributeExpressions(flowfile).asInteger(),
+            include=included_fields,
+            where_document=where_document,
+            where=where,
+        )
+
+        ids = query_results["ids"][0]
+        distances = (
+            None
+            if (not self.include_distances or query_results["distances"] is 
None)
+            else query_results["distances"][0]
+        )
+        metadatas = (
+            None
+            if (not self.include_metadatas or query_results["metadatas"] is 
None)
+            else query_results["metadatas"][0]
+        )
+        documents = (
+            None
+            if (not self.include_documents or query_results["documents"] is 
None)
+            else query_results["documents"][0]
+        )
+        embeddings = (
+            None
+            if (not self.include_embeddings or query_results["embeddings"] is 
None)
+            else query_results["embeddings"][0]
+        )
+
+        (output_contents, mime_type) = self.query_utils.create_json(
+            flowfile, documents, metadatas, embeddings, distances, ids
+        )
+
+        # Return the results
+        attributes = {"mime.type": mime_type}
+        return FlowFileTransformResult(relationship="success", 
contents=output_contents, attributes=attributes)
diff --git a/src/extensions/vectorstores/QueryOpenSearchVector.py 
b/src/extensions/vectorstores/QueryOpenSearchVector.py
new file mode 100644
index 0000000..365f824
--- /dev/null
+++ b/src/extensions/vectorstores/QueryOpenSearchVector.py
@@ -0,0 +1,242 @@
+# SPDX-License-Identifier: Apache-2.0
+
+import json
+
+from EmbeddingUtils import EMBEDDING_MODEL, create_embedding_service
+from langchain.vectorstores import OpenSearchVectorSearch
+from nifiapi.flowfiletransform import FlowFileTransform, 
FlowFileTransformResult
+from nifiapi.properties import ExpressionLanguageScope, PropertyDependency, 
PropertyDescriptor, StandardValidators
+from OpenSearchVectorUtils import (
+    COSINESIMIL,
+    HTTP_HOST,
+    HUGGING_FACE_API_KEY,
+    HUGGING_FACE_MODEL,
+    INDEX_NAME,
+    L1,
+    L2,
+    LINF,
+    OPENAI_API_KEY,
+    OPENAI_API_MODEL,
+    PASSWORD,
+    TEXT_FIELD,
+    USERNAME,
+    VECTOR_FIELD,
+    create_authentication_params,
+)
+from QueryUtils import INCLUDE_DISTANCES, INCLUDE_METADATAS, OUTPUT_STRATEGY, 
RESULTS_FIELD, QueryUtils
+
+
+class QueryOpenSearchVector(FlowFileTransform):
+    class Java:
+        implements = ["org.apache.nifi.python.processor.FlowFileTransform"]
+
+    class ProcessorDetails:
+        version = "2.0.0.dev0"
+        description = "Queries OpenSearch in order to gather a specified 
number of documents that are most closely related to the given query."
+        tags = [
+            "opensearch",
+            "vector",
+            "vectordb",
+            "vectorstore",
+            "embeddings",
+            "ai",
+            "artificial intelligence",
+            "ml",
+            "machine learning",
+            "text",
+            "LLM",
+        ]
+
+    # Search types
+    APPROXIMATE_SEARCH = ("Approximate Search", "approximate_search")
+    SCRIPT_SCORING_SEARCH = ("Script Scoring Search", "script_scoring")
+    PAINLESS_SCRIPTING_SEARCH = ("Painless Scripting Search", 
"painless_scripting")
+
+    SEARCH_TYPE_VALUES = dict([APPROXIMATE_SEARCH, SCRIPT_SCORING_SEARCH, 
PAINLESS_SCRIPTING_SEARCH])
+
+    # Script Scoring Search space types
+    HAMMINGBIT = ("Hamming distance", "hammingbit")
+
+    SCRIPT_SCORING_SPACE_TYPE_VALUES = dict([L2, L1, LINF, COSINESIMIL, 
HAMMINGBIT])
+
+    # Painless Scripting Search space types
+    L2_SQUARED = ("L2 (Euclidean distance)", "l2Squared")
+    L1_NORM = ("L1 (Manhattan distance)", "l1Norm")
+    COSINE_SIMILARITY = ("Cosine similarity", "cosineSimilarity")
+
+    PAINLESS_SCRIPTING_SPACE_TYPE_VALUES = dict([L2_SQUARED, L1_NORM, 
COSINE_SIMILARITY])
+
+    QUERY = PropertyDescriptor(
+        name="Query",
+        description="The text of the query to send to OpenSearch.",
+        required=True,
+        validators=[StandardValidators.NON_EMPTY_VALIDATOR],
+        expression_language_scope=ExpressionLanguageScope.FLOWFILE_ATTRIBUTES,
+    )
+    NUMBER_OF_RESULTS = PropertyDescriptor(
+        name="Number of Results",
+        description="The number of results to return from OpenSearch",
+        default_value="10",
+        required=True,
+        validators=[StandardValidators.POSITIVE_INTEGER_VALIDATOR],
+        expression_language_scope=ExpressionLanguageScope.FLOWFILE_ATTRIBUTES,
+    )
+    SEARCH_TYPE = PropertyDescriptor(
+        name="Search Type",
+        description="Specifies the type of the search to be performed.",
+        allowable_values=SEARCH_TYPE_VALUES.keys(),
+        default_value=APPROXIMATE_SEARCH[0],
+        required=True,
+    )
+    SCRIPT_SCORING_SPACE_TYPE = PropertyDescriptor(
+        name="Script Scoring Space Type",
+        description="Used to measure the distance between two points in order 
to determine the k-nearest neighbors.",
+        allowable_values=SCRIPT_SCORING_SPACE_TYPE_VALUES.keys(),
+        default_value=L2[0],
+        required=False,
+        dependencies=[PropertyDependency(SEARCH_TYPE, 
SCRIPT_SCORING_SEARCH[0])],
+    )
+    PAINLESS_SCRIPTING_SPACE_TYPE = PropertyDescriptor(
+        name="Painless Scripting Space Type",
+        description="Used to measure the distance between two points in order 
to determine the k-nearest neighbors.",
+        allowable_values=PAINLESS_SCRIPTING_SPACE_TYPE_VALUES.keys(),
+        default_value=L2_SQUARED[0],
+        required=False,
+        dependencies=[PropertyDependency(SEARCH_TYPE, 
PAINLESS_SCRIPTING_SEARCH[0])],
+    )
+    BOOLEAN_FILTER = PropertyDescriptor(
+        name="Boolean Filter",
+        description="A Boolean filter is a post filter consists of a Boolean 
query that contains a k-NN query and a filter. "
+        "The value of the field must be a JSON representation of the filter.",
+        required=False,
+        validators=[StandardValidators.NON_EMPTY_VALIDATOR],
+        dependencies=[PropertyDependency(SEARCH_TYPE, APPROXIMATE_SEARCH[0])],
+    )
+    EFFICIENT_FILTER = PropertyDescriptor(
+        name="Efficient Filter",
+        description="The Lucene Engine or Faiss Engine decides whether to 
perform an exact k-NN search with "
+        "pre-filtering or an approximate search with modified post-filtering. 
The value of the field must "
+        "be a JSON representation of the filter.",
+        required=False,
+        validators=[StandardValidators.NON_EMPTY_VALIDATOR],
+        dependencies=[PropertyDependency(SEARCH_TYPE, APPROXIMATE_SEARCH[0])],
+    )
+    PRE_FILTER = PropertyDescriptor(
+        name="Pre Filter",
+        description="Script Score query to pre-filter documents before 
identifying nearest neighbors. The value of "
+        "the field must be a JSON representation of the filter.",
+        default_value='{"match_all": {}}',
+        required=False,
+        validators=[StandardValidators.NON_EMPTY_VALIDATOR],
+        dependencies=[PropertyDependency(SEARCH_TYPE, 
SCRIPT_SCORING_SEARCH[0], PAINLESS_SCRIPTING_SEARCH[0])],
+    )
+
+    properties = [
+        EMBEDDING_MODEL,
+        OPENAI_API_KEY,
+        OPENAI_API_MODEL,
+        HUGGING_FACE_API_KEY,
+        HUGGING_FACE_MODEL,
+        HTTP_HOST,
+        USERNAME,
+        PASSWORD,
+        INDEX_NAME,
+        QUERY,
+        VECTOR_FIELD,
+        TEXT_FIELD,
+        NUMBER_OF_RESULTS,
+        SEARCH_TYPE,
+        SCRIPT_SCORING_SPACE_TYPE,
+        PAINLESS_SCRIPTING_SPACE_TYPE,
+        BOOLEAN_FILTER,
+        EFFICIENT_FILTER,
+        PRE_FILTER,
+        OUTPUT_STRATEGY,
+        RESULTS_FIELD,
+        INCLUDE_METADATAS,
+        INCLUDE_DISTANCES,
+    ]
+
+    embeddings = None
+    query_utils = None
+
+    def __init__(self, **kwargs):
+        pass
+
+    def getPropertyDescriptors(self):
+        return self.properties
+
+    def onScheduled(self, context):
+        # initialize embedding service
+        self.embeddings = create_embedding_service(context)
+        self.query_utils = QueryUtils(context)
+
+    def transform(self, context, flowfile):
+        http_host = 
context.getProperty(HTTP_HOST).evaluateAttributeExpressions(flowfile).getValue()
+        index_name = 
context.getProperty(INDEX_NAME).evaluateAttributeExpressions(flowfile).getValue()
+        query = 
context.getProperty(self.QUERY).evaluateAttributeExpressions(flowfile).getValue()
+        num_results = 
context.getProperty(self.NUMBER_OF_RESULTS).evaluateAttributeExpressions(flowfile).asInteger()
+        vector_field = 
context.getProperty(VECTOR_FIELD).evaluateAttributeExpressions(flowfile).getValue()
+        text_field = 
context.getProperty(TEXT_FIELD).evaluateAttributeExpressions(flowfile).getValue()
+        search_type = 
context.getProperty(self.SEARCH_TYPE).evaluateAttributeExpressions().getValue()
+
+        params = {
+            "vector_field": vector_field,
+            "text_field": text_field,
+            "search_type": self.SEARCH_TYPE_VALUES.get(search_type),
+        }
+        params.update(create_authentication_params(context))
+
+        if search_type == self.APPROXIMATE_SEARCH[0]:
+            boolean_filter = 
context.getProperty(self.BOOLEAN_FILTER).evaluateAttributeExpressions().getValue()
+            if boolean_filter is not None:
+                params["boolean_filter"] = json.loads(boolean_filter)
+
+            efficient_filter = 
context.getProperty(self.EFFICIENT_FILTER).evaluateAttributeExpressions().getValue()
+            if efficient_filter is not None:
+                params["efficient_filter"] = json.loads(efficient_filter)
+        else:
+            pre_filter = 
context.getProperty(self.PRE_FILTER).evaluateAttributeExpressions().getValue()
+            if pre_filter is not None:
+                params["pre_filter"] = json.loads(pre_filter)
+            if search_type == self.SCRIPT_SCORING_SEARCH[0]:
+                space_type = (
+                    
context.getProperty(self.SCRIPT_SCORING_SPACE_TYPE).evaluateAttributeExpressions().getValue()
+                )
+                params["space_type"] = 
self.SCRIPT_SCORING_SPACE_TYPE_VALUES.get(space_type)
+            elif search_type == self.PAINLESS_SCRIPTING_SEARCH[0]:
+                space_type = (
+                    
context.getProperty(self.PAINLESS_SCRIPTING_SPACE_TYPE).evaluateAttributeExpressions().getValue()
+                )
+                params["space_type"] = 
self.PAINLESS_SCRIPTING_SPACE_TYPE_VALUES.get(space_type)
+
+        vectorstore = OpenSearchVectorSearch(
+            index_name=index_name, embedding_function=self.embeddings, 
opensearch_url=http_host, **params
+        )
+
+        results = vectorstore.similarity_search_with_score(query=query, 
k=num_results, **params)
+
+        documents = []
+        for result in results:
+            documents.append(result[0].page_content)
+
+        if context.getProperty(INCLUDE_METADATAS):
+            metadatas = []
+            for result in results:
+                metadatas.append(result[0].metadata)
+        else:
+            metadatas = None
+
+        if context.getProperty(INCLUDE_DISTANCES):
+            distances = []
+            for result in results:
+                distances.append(result[1])
+        else:
+            distances = None
+
+        (output_contents, mime_type) = self.query_utils.create_json(
+            flowfile, documents, metadatas, None, distances, None
+        )
+        attributes = {"mime.type": mime_type}
+
+        return FlowFileTransformResult(relationship="success", 
contents=output_contents, attributes=attributes)
diff --git a/src/extensions/vectorstores/QueryPinecone.py 
b/src/extensions/vectorstores/QueryPinecone.py
new file mode 100644
index 0000000..aeb416a
--- /dev/null
+++ b/src/extensions/vectorstores/QueryPinecone.py
@@ -0,0 +1,205 @@
+# SPDX-License-Identifier: Apache-2.0
+
+import json
+
+import langchain.vectorstores
+import QueryUtils
+from EmbeddingUtils import EMBEDDING_MODEL, HUGGING_FACE, OPENAI, 
create_embedding_service
+from nifiapi.flowfiletransform import FlowFileTransform, 
FlowFileTransformResult
+from nifiapi.properties import ExpressionLanguageScope, PropertyDependency, 
PropertyDescriptor, StandardValidators
+from pinecone import Pinecone
+
+
+class QueryPinecone(FlowFileTransform):
+    class Java:
+        implements = ["org.apache.nifi.python.processor.FlowFileTransform"]
+
+    class ProcessorDetails:
+        version = "2.0.0.dev0"
+        description = "Queries Pinecone in order to gather a specified number 
of documents that are most closely related to the given query."
+        tags = [
+            "pinecone",
+            "vector",
+            "vectordb",
+            "vectorstore",
+            "embeddings",
+            "ai",
+            "artificial intelligence",
+            "ml",
+            "machine learning",
+            "text",
+            "LLM",
+        ]
+
+    PINECONE_API_KEY = PropertyDescriptor(
+        name="Pinecone API Key",
+        description="The API Key to use in order to authentication with 
Pinecone",
+        sensitive=True,
+        required=True,
+        validators=[StandardValidators.NON_EMPTY_VALIDATOR],
+    )
+    OPENAI_API_KEY = PropertyDescriptor(
+        name="OpenAI API Key",
+        description="The API Key for OpenAI in order to create embeddings",
+        sensitive=True,
+        required=True,
+        validators=[StandardValidators.NON_EMPTY_VALIDATOR],
+        dependencies=[PropertyDependency(EMBEDDING_MODEL, OPENAI)],
+    )
+    HUGGING_FACE_API_KEY = PropertyDescriptor(
+        name="HuggingFace API Key",
+        description="The API Key for interacting with HuggingFace",
+        validators=[StandardValidators.NON_EMPTY_VALIDATOR],
+        required=True,
+        sensitive=True,
+        dependencies=[PropertyDependency(EMBEDDING_MODEL, HUGGING_FACE)],
+    )
+    OPENAI_MODEL = PropertyDescriptor(
+        name="OpenAI Model",
+        description="The API Key for OpenAI in order to create embeddings",
+        required=True,
+        validators=[StandardValidators.NON_EMPTY_VALIDATOR],
+        default_value="text-embedding-ada-002",
+        dependencies=[PropertyDependency(EMBEDDING_MODEL, OPENAI)],
+    )
+    HUGGING_FACE_MODEL = PropertyDescriptor(
+        name="HuggingFace Model",
+        description="The name of the HuggingFace model to use",
+        validators=[StandardValidators.NON_EMPTY_VALIDATOR],
+        required=True,
+        default_value="sentence-transformers/all-MiniLM-L6-v2",
+        dependencies=[PropertyDependency(EMBEDDING_MODEL, HUGGING_FACE)],
+    )
+    PINECONE_ENV = PropertyDescriptor(
+        name="Pinecone Environment",
+        description="The name of the Pinecone Environment. This can be found 
in the Pinecone console next to the API Key.",
+        sensitive=False,
+        required=True,
+        validators=[StandardValidators.NON_EMPTY_VALIDATOR],
+    )
+    INDEX_NAME = PropertyDescriptor(
+        name="Index Name",
+        description="The name of the Pinecone index.",
+        sensitive=False,
+        required=True,
+        validators=[StandardValidators.NON_EMPTY_VALIDATOR],
+        expression_language_scope=ExpressionLanguageScope.FLOWFILE_ATTRIBUTES,
+    )
+    QUERY = PropertyDescriptor(
+        name="Query",
+        description="The text of the query to send to Pinecone.",
+        required=True,
+        validators=[StandardValidators.NON_EMPTY_VALIDATOR],
+        expression_language_scope=ExpressionLanguageScope.FLOWFILE_ATTRIBUTES,
+    )
+    NUMBER_OF_RESULTS = PropertyDescriptor(
+        name="Number of Results",
+        description="The number of results to return from Pinecone",
+        required=True,
+        validators=[StandardValidators.POSITIVE_INTEGER_VALIDATOR],
+        default_value="10",
+        expression_language_scope=ExpressionLanguageScope.FLOWFILE_ATTRIBUTES,
+    )
+    TEXT_KEY = PropertyDescriptor(
+        name="Text Key",
+        description="The key in the document that contains the text to create 
embeddings for.",
+        required=True,
+        validators=[StandardValidators.NON_EMPTY_VALIDATOR],
+        default_value="text",
+        expression_language_scope=ExpressionLanguageScope.FLOWFILE_ATTRIBUTES,
+    )
+    NAMESPACE = PropertyDescriptor(
+        name="Namespace",
+        description="The name of the Pinecone Namespace to query into.",
+        required=False,
+        validators=[StandardValidators.NON_EMPTY_VALIDATOR],
+        expression_language_scope=ExpressionLanguageScope.FLOWFILE_ATTRIBUTES,
+    )
+    FILTER = PropertyDescriptor(
+        name="Metadata Filter",
+        description='Optional metadata filter to apply with the query. For 
example: { "author": {"$eq": "john.doe"} }',
+        required=False,
+        validators=[StandardValidators.NON_EMPTY_VALIDATOR],
+        expression_language_scope=ExpressionLanguageScope.FLOWFILE_ATTRIBUTES,
+    )
+
+    properties = [
+        PINECONE_API_KEY,
+        EMBEDDING_MODEL,
+        OPENAI_API_KEY,
+        OPENAI_MODEL,
+        HUGGING_FACE_API_KEY,
+        HUGGING_FACE_MODEL,
+        PINECONE_ENV,
+        INDEX_NAME,
+        QUERY,
+        FILTER,
+        NUMBER_OF_RESULTS,
+        NAMESPACE,
+        TEXT_KEY,
+        QueryUtils.OUTPUT_STRATEGY,
+        QueryUtils.RESULTS_FIELD,
+        QueryUtils.INCLUDE_METADATAS,
+        QueryUtils.INCLUDE_DISTANCES,
+    ]
+
+    embeddings = None
+    query_utils = None
+    pc = None
+
+    def __init__(self, **kwargs):
+        pass
+
+    def getPropertyDescriptors(self):
+        return self.properties
+
+    def onScheduled(self, context):
+        # initialize pinecone
+        self.pc = Pinecone(
+            api_key=context.getProperty(self.PINECONE_API_KEY).getValue(),
+            environment=context.getProperty(self.PINECONE_ENV).getValue(),
+        )
+        # initialize embedding service
+        self.embeddings = create_embedding_service(context)
+        self.query_utils = QueryUtils.QueryUtils(context)
+
+    def transform(self, context, flowfile):
+        # First, check if our index already exists. If it doesn't, we create it
+        index_name = 
context.getProperty(self.INDEX_NAME).evaluateAttributeExpressions(flowfile).getValue()
+        query = 
context.getProperty(self.QUERY).evaluateAttributeExpressions(flowfile).getValue()
+        namespace = 
context.getProperty(self.NAMESPACE).evaluateAttributeExpressions(flowfile).getValue()
+        num_results = 
context.getProperty(self.NUMBER_OF_RESULTS).evaluateAttributeExpressions(flowfile).asInteger()
+
+        index = self.pc.Index(index_name)
+
+        text_key = 
context.getProperty(self.TEXT_KEY).evaluateAttributeExpressions().getValue()
+        filter_definition = 
context.getProperty(self.FILTER).evaluateAttributeExpressions(flowfile).getValue()
+        vectorstore = langchain.vectorstores.Pinecone(index, 
self.embeddings.embed_query, text_key, namespace=namespace)
+        results = vectorstore.similarity_search_with_score(
+            query, num_results, filter=None if filter_definition is None else 
json.loads(filter_definition)
+        )
+
+        documents = []
+        for result in results:
+            documents.append(result[0].page_content)
+
+        if context.getProperty(QueryUtils.INCLUDE_METADATAS):
+            metadatas = []
+            for result in results:
+                metadatas.append(result[0].metadata)
+        else:
+            metadatas = None
+
+        if context.getProperty(QueryUtils.INCLUDE_DISTANCES):
+            distances = []
+            for result in results:
+                distances.append(result[1])
+        else:
+            distances = None
+
+        (output_contents, mime_type) = self.query_utils.create_json(
+            flowfile, documents, metadatas, None, distances, None
+        )
+        attributes = {"mime.type": mime_type}
+
+        return FlowFileTransformResult(relationship="success", 
contents=output_contents, attributes=attributes)
diff --git a/src/extensions/vectorstores/QueryQdrant.py 
b/src/extensions/vectorstores/QueryQdrant.py
new file mode 100644
index 0000000..a7a63e3
--- /dev/null
+++ b/src/extensions/vectorstores/QueryQdrant.py
@@ -0,0 +1,161 @@
+# SPDX-License-Identifier: Apache-2.0
+
+import json
+
+import QdrantUtils
+import QueryUtils
+from EmbeddingUtils import (
+    create_embedding_service,
+)
+from langchain.vectorstores.qdrant import Qdrant
+from nifiapi.documentation import use_case
+from nifiapi.flowfiletransform import FlowFileTransform, 
FlowFileTransformResult
+from nifiapi.properties import (
+    ExpressionLanguageScope,
+    PropertyDescriptor,
+    StandardValidators,
+)
+from qdrant_client import QdrantClient
+
+
+@use_case(
+    description="Semantically search for documents stored in Qdrant - 
https://qdrant.tech/";,
+    keywords=["qdrant", "embedding", "vector", "text", "vectorstore", 
"search"],
+    configuration="""
+                Configure 'Collection Name' to the name of the Qdrant 
collection to use.
+                Configure 'Qdrant URL' to the fully qualified URL of the 
Qdrant instance.
+                Configure 'Qdrant API Key' to the API Key to use in order to 
authenticate with Qdrant.
+                Configure 'Prefer gRPC' to True if you want to use gRPC for 
interfacing with Qdrant.
+                Configure 'Use HTTPS' to True if you want to use TLS(HTTPS) 
while interfacing with Qdrant.
+                Configure 'Embedding Model' to indicate whether OpenAI 
embeddings should be used or a HuggingFace embedding model should be used: 
'Hugging Face Model' or 'OpenAI Model'
+                Configure 'HuggingFace API Key' or 'OpenAI API Key', depending 
on the chosen Embedding Model.
+                Configure 'HuggingFace Model' or 'OpenAI Model' to the name of 
the model to use.
+                Configure 'Query' to the text of the query to send to Qdrant.
+                Configure 'Number of Results' to the number of results to 
return from Qdrant.
+                Configure 'Metadata Filter' to apply an optional metadata 
filter with the query. For example: { "author": "john.doe" }
+                Configure 'Output Strategy' to indicate how the output should 
be formatted: 'Row-Oriented', 'Text', or 'Column-Oriented'.
+                Configure 'Results Field' to the name of the field to insert 
the results, if the input FlowFile is JSON Formatted,.
+                Configure 'Include Metadatas' to True if metadata should be 
included in the output.
+                Configure 'Include Distances' to True if distances should be 
included in the output.
+                """,
+)
+class QueryQdrant(FlowFileTransform):
+    class Java:
+        implements = ["org.apache.nifi.python.processor.FlowFileTransform"]
+
+    class ProcessorDetails:
+        version = "2.0.0.dev0"
+        description = "Queries Qdrant in order to gather a specified number of 
documents that are most closely related to the given query."
+        tags = [
+            "qdrant",
+            "vector",
+            "vectordb",
+            "vectorstore",
+            "embeddings",
+            "ai",
+            "artificial intelligence",
+            "ml",
+            "machine learning",
+            "text",
+            "LLM",
+        ]
+
+    QUERY = PropertyDescriptor(
+        name="Query",
+        description="The text of the query to send to Qdrant.",
+        required=True,
+        validators=[StandardValidators.NON_EMPTY_VALIDATOR],
+        expression_language_scope=ExpressionLanguageScope.FLOWFILE_ATTRIBUTES,
+    )
+    NUMBER_OF_RESULTS = PropertyDescriptor(
+        name="Number of Results",
+        description="The number of results to return from Qdrant.",
+        required=True,
+        validators=[StandardValidators.POSITIVE_INTEGER_VALIDATOR],
+        default_value="10",
+        expression_language_scope=ExpressionLanguageScope.FLOWFILE_ATTRIBUTES,
+    )
+    FILTER = PropertyDescriptor(
+        name="Metadata Filter",
+        description='Optional metadata filter to apply with the query. For 
example: { "author": "john.doe" }',
+        required=False,
+        validators=[StandardValidators.NON_EMPTY_VALIDATOR],
+        expression_language_scope=ExpressionLanguageScope.FLOWFILE_ATTRIBUTES,
+    )
+
+    properties = (
+        QdrantUtils.QDRANT_PROPERTIES
+        + QdrantUtils.EMBEDDING_MODEL_PROPERTIES
+        + [
+            QUERY,
+            FILTER,
+            NUMBER_OF_RESULTS,
+            QueryUtils.OUTPUT_STRATEGY,
+            QueryUtils.RESULTS_FIELD,
+            QueryUtils.INCLUDE_METADATAS,
+            QueryUtils.INCLUDE_DISTANCES,
+        ]
+    )
+
+    embeddings = None
+    query_utils = None
+    client = None
+
+    def __init__(self, **kwargs):
+        pass
+
+    def getPropertyDescriptors(self):
+        return self.properties
+
+    def onScheduled(self, context):
+        self.client = QdrantClient(
+            url=context.getProperty(QdrantUtils.QDRANT_URL).getValue(),
+            api_key=context.getProperty(QdrantUtils.QDRANT_API_KEY).getValue(),
+            
prefer_grpc=context.getProperty(QdrantUtils.PREFER_GRPC).asBoolean(),
+            https=context.getProperty(QdrantUtils.HTTPS).asBoolean(),
+        )
+        self.embeddings = create_embedding_service(context)
+        self.query_utils = QueryUtils.QueryUtils(context)
+
+    def transform(self, context, flowfile):
+        collection_name = (
+            
context.getProperty(QdrantUtils.COLLECTION_NAME).evaluateAttributeExpressions(flowfile).getValue()
+        )
+        query = 
context.getProperty(self.QUERY).evaluateAttributeExpressions(flowfile).getValue()
+        num_results = 
context.getProperty(self.NUMBER_OF_RESULTS).evaluateAttributeExpressions(flowfile).asInteger()
+        filter_definition = 
context.getProperty(self.FILTER).evaluateAttributeExpressions(flowfile).getValue()
+        vector_store = Qdrant(
+            client=self.client,
+            collection_name=collection_name,
+            embeddings=self.embeddings,
+        )
+        results = vector_store.similarity_search_with_score(
+            query=query,
+            k=num_results,
+            filter=None if filter is None else json.loads(filter_definition),
+        )
+
+        documents = []
+        for result in results:
+            documents.append(result[0].page_content)
+
+        if context.getProperty(QueryUtils.INCLUDE_METADATAS).asBoolean():
+            metadatas = []
+            for result in results:
+                metadatas.append(result[0].metadata)
+        else:
+            metadatas = None
+
+        if context.getProperty(QueryUtils.INCLUDE_DISTANCES).asBoolean():
+            distances = []
+            for result in results:
+                distances.append(result[1])
+        else:
+            distances = None
+
+        (output_contents, mime_type) = self.query_utils.create_json(
+            flowfile, documents, metadatas, None, distances, None
+        )
+        attributes = {"mime.type": mime_type}
+
+        return FlowFileTransformResult(relationship="success", 
contents=output_contents, attributes=attributes)
diff --git a/src/extensions/vectorstores/QueryUtils.py 
b/src/extensions/vectorstores/QueryUtils.py
new file mode 100644
index 0000000..c9d9a73
--- /dev/null
+++ b/src/extensions/vectorstores/QueryUtils.py
@@ -0,0 +1,171 @@
+# SPDX-License-Identifier: Apache-2.0
+
+import json
+
+from nifiapi.properties import PropertyDependency, PropertyDescriptor, 
StandardValidators
+
+ROW_ORIENTED = "Row-Oriented"
+TEXT = "Text"
+COLUMN_ORIENTED = "Column-Oriented"
+
+
+OUTPUT_STRATEGY = PropertyDescriptor(
+    name="Output Strategy",
+    description="""Specifies whether the output should contain only the text 
of the documents (each document separated by \\n\\n), or if it
+                should be formatted as either single column-oriented JSON 
object,
+                consisting of a keys 'ids', 'embeddings', 'documents', 
'distances', and 'metadatas'; or if the results should be row-oriented,
+                a JSON per line, each consisting of a single id, document, 
metadata, embedding, and distance.""",
+    allowable_values=[ROW_ORIENTED, TEXT, COLUMN_ORIENTED],
+    default_value=ROW_ORIENTED,
+    required=True,
+)
+RESULTS_FIELD = PropertyDescriptor(
+    name="Results Field",
+    description="""If the input FlowFile is JSON Formatted, this represents 
the name of the field to insert the results. This allows the results to be 
inserted into
+                "an existing input in order to enrich it. If this property is 
unset, the results will be written to the FlowFile contents, overwriting any 
pre-existing content.""",
+    validators=[StandardValidators.NON_EMPTY_VALIDATOR],
+    required=False,
+)
+
+INCLUDE_IDS = PropertyDescriptor(
+    name="Include Document IDs",
+    description="Whether or not to include the Documents' IDs in the response",
+    allowable_values=["true", "false"],
+    default_value="true",
+    required=False,
+    dependencies=[PropertyDependency(OUTPUT_STRATEGY, ROW_ORIENTED, 
COLUMN_ORIENTED)],
+)
+INCLUDE_METADATAS = PropertyDescriptor(
+    name="Include Metadata",
+    description="Whether or not to include the Documents' Metadata in the 
response",
+    allowable_values=["true", "false"],
+    default_value="true",
+    required=False,
+    dependencies=[PropertyDependency(OUTPUT_STRATEGY, ROW_ORIENTED, 
COLUMN_ORIENTED)],
+)
+INCLUDE_DOCUMENTS = PropertyDescriptor(
+    name="Include Document",
+    description="Whether or not to include the Documents' Text in the 
response",
+    allowable_values=["true", "false"],
+    default_value="true",
+    required=False,
+    dependencies=[PropertyDependency(OUTPUT_STRATEGY, ROW_ORIENTED, 
COLUMN_ORIENTED)],
+)
+INCLUDE_DISTANCES = PropertyDescriptor(
+    name="Include Distances",
+    description="Whether or not to include the Documents' Distances (i.e., how 
far the Document was away from the query) in the response",
+    allowable_values=["true", "false"],
+    default_value="true",
+    required=False,
+    dependencies=[PropertyDependency(OUTPUT_STRATEGY, ROW_ORIENTED, 
COLUMN_ORIENTED)],
+)
+INCLUDE_EMBEDDINGS = PropertyDescriptor(
+    name="Include Embeddings",
+    description="Whether or not to include the Documents' Embeddings in the 
response",
+    allowable_values=["true", "false"],
+    default_value="false",
+    required=False,
+    dependencies=[PropertyDependency(OUTPUT_STRATEGY, ROW_ORIENTED, 
COLUMN_ORIENTED)],
+)
+
+
+class QueryUtils:
+    context = None
+
+    def __init__(self, context):
+        self.context = context
+        self.results_field = context.getProperty(RESULTS_FIELD).getValue()
+        self.output_strategy = context.getProperty(OUTPUT_STRATEGY).getValue()
+
+        ids_property = context.getProperty(INCLUDE_IDS)
+        self.include_ids = ids_property.asBoolean() if ids_property else False
+
+        embeddings_property = context.getProperty(INCLUDE_EMBEDDINGS)
+        self.include_embeddings = embeddings_property.asBoolean() if 
embeddings_property else False
+
+        self.include_distances = 
context.getProperty(INCLUDE_DISTANCES).asBoolean()
+
+        documents_property = context.getProperty(INCLUDE_DOCUMENTS)
+        self.include_documents = documents_property.asBoolean() if 
documents_property else True
+        self.include_metadatas = 
context.getProperty(INCLUDE_METADATAS).asBoolean()
+
+    def create_json(self, flowfile, documents, metadatas, embeddings, 
distances, ids) -> tuple[str, str]:
+        input_json = None if self.results_field is None else 
json.loads(flowfile.getContentsAsBytes().decode())
+
+        if self.output_strategy == TEXT:
+            # Delete any document that is None or an empty-string
+            documents = [doc for doc in documents if doc is not None and doc 
!= ""]
+
+            # Join the documents with two newlines
+            text = "\n\n".join(documents)
+
+            # Create either JSON or text output, based on whether or not an 
results field was specified
+            if input_json is None:
+                mime_type = "text/plain"
+                output_contents = text
+            else:
+                input_json[self.results_field] = text
+                output_contents = json.dumps(input_json)
+                mime_type = "application/json"
+        elif self.output_strategy == COLUMN_ORIENTED:
+            doc = {}
+            if self.include_ids:
+                doc["ids"] = ids
+            if self.include_distances:
+                doc["distances"] = distances
+            if self.include_documents:
+                doc["documents"] = documents
+            if self.include_metadatas:
+                doc["metadatas"] = metadatas
+            if self.include_embeddings:
+                doc["embeddings"] = embeddings
+
+            # Create the JSON from the Document
+            if input_json is None:
+                output_contents = json.dumps(doc)
+            else:
+                input_json[self.results_field] = doc
+                output_contents = json.dumps(input_json)
+
+            mime_type = "application/json"
+        else:
+            # Build the Documents
+            docs = []
+
+            count = len(ids) if ids else len(documents)
+            for i in range(count):
+                doc_id = None if ids is None else ids[i]
+                distance = None if distances is None else distances[i]
+                metadata = None if metadatas is None else metadatas[i]
+                document = None if documents is None else documents[i]
+                embedding = None if embeddings is None else embeddings[i]
+
+                # Create the document but do not include any key that we don't 
want to include in the output.
+                doc = {}
+                if self.include_ids:
+                    doc["id"] = doc_id
+                if self.include_distances:
+                    doc["distance"] = distance
+                if self.include_documents:
+                    doc["document"] = document
+                if self.include_metadatas:
+                    doc["metadata"] = metadata
+                if self.include_embeddings:
+                    doc["embedding"] = embedding
+
+                docs.append(doc)
+
+            # If input_json is None, we just create JSON based on the 
Documents.
+            # If input_json is populated, we insert the documents into the 
input JSON using the specified key.
+            if input_json is None:
+                jsons = []
+                for doc in docs:
+                    jsons.append(json.dumps(doc))
+                output_contents = "\n".join(jsons)
+            else:
+                input_json[self.results_field] = docs
+                output_contents = json.dumps(input_json)
+
+            mime_type = "application/json"
+
+        return output_contents, mime_type
diff --git a/src/extensions/vectorstores/__init__.py 
b/src/extensions/vectorstores/__init__.py
new file mode 100644
index 0000000..9881313
--- /dev/null
+++ b/src/extensions/vectorstores/__init__.py
@@ -0,0 +1 @@
+# SPDX-License-Identifier: Apache-2.0
diff --git a/src/extensions/vectorstores/requirements.txt 
b/src/extensions/vectorstores/requirements.txt
new file mode 100644
index 0000000..31e215a
--- /dev/null
+++ b/src/extensions/vectorstores/requirements.txt
@@ -0,0 +1,24 @@
+# SPDX-License-Identifier: Apache-2.0
+
+# Shared requirements
+openai==1.9.0
+tiktoken
+langchain==0.1.11
+
+# Chroma requirements
+chromadb==0.4.22
+onnxruntime
+tokenizers
+tqdm
+requests
+
+# Pinecone requirements
+pinecone-client==3.0.1
+tiktoken
+langchain==0.1.11
+
+# OpenSearch requirements
+opensearch-py==2.5.0
+
+# Qdrant requirements
+qdrant-client==1.9.1


Reply via email to